Skip to content

Commit

Permalink
refactor: Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
bmtcril committed Dec 19, 2023
1 parent a70b0fb commit 42a3945
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 151 deletions.
113 changes: 49 additions & 64 deletions xapi_db_load/backends/clickhouse_lake.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,18 @@ class XAPILakeClickhouse:

client = None

def __init__(
self,
db_host="localhost",
db_port=18123,
db_username="default",
db_password=None,
db_name=None,
db_event_sink_name=None,
s3_key=None,
s3_secret=None,
):
self.host = db_host
self.port = db_port
self.username = db_username
self.database = db_name
self.event_sink_database = db_event_sink_name
self.db_password = db_password
self.s3_key = s3_key
self.s3_secret = s3_secret

self.event_raw_table_name = "xapi_events_all"
self.event_table_name = "xapi_events_all_parsed"
self.event_table_name_mv = "xapi_events_all_parsed_mv"
self.get_org_function_name = "get_org_from_course_url"
def __init__(self, config):
self.host = config.get("db_host", "localhost")
self.port = config.get("db_port", "18123")
self.username = config.get("db_username", "default")
self.database = config.get("db_name", "xapi")
self.event_sink_database = config.get("db_event_sink_name", "event_sink")
self.db_password = config.get("db_password")
self.s3_key = config.get("s3_key")
self.s3_secret = config.get("s3_secret")

self.event_raw_table_name = config.get("event_raw_table_name", "xapi_events_all")
self.event_table_name = config.get("event_table_name", "xapi_events_all_parsed")
self.set_client()

def set_client(self):
Expand Down Expand Up @@ -193,46 +181,43 @@ def insert_event_sink_actor_data(self, actors):
for actor in actors:
dump_id = str(uuid.uuid4())
dump_time = datetime.utcnow()
try:
id_row = f"""(
'{actor.id}',
'xapi',
'{actor.username}',
'{actor.user_id}',
'{dump_id}',
'{dump_time}'
)"""
out_external_id.append(id_row)

# This first column is usually the MySQL row pk, we just
# user this for now to have a unique id.
profile_row = f"""(
'{actor.user_id}',
'{actor.user_id}',
'{actor.name}',
'{actor.meta}',
'{actor.courseware}',
'{actor.language}',
'{actor.location}',
'{actor.year_of_birth}',
'{actor.gender}',
'{actor.level_of_education}',
'{actor.mailing_address}',
'{actor.city}',
'{actor.country}',
'{actor.state}',
'{actor.goals}',
'{actor.bio}',
'{actor.profile_image_uploaded_at}',
'{actor.phone_number}',
'{dump_id}',
'{dump_time}'
)"""

out_profile.append(profile_row)
except Exception:
print(actor)
raise
id_row = f"""(
'{actor.id}',
'xapi',
'{actor.username}',
'{actor.user_id}',
'{dump_id}',
'{dump_time}'
)"""
out_external_id.append(id_row)

# This first column is usually the MySQL row pk, we just
# user this for now to have a unique id.
profile_row = f"""(
'{actor.user_id}',
'{actor.user_id}',
'{actor.name}',
'{actor.meta}',
'{actor.courseware}',
'{actor.language}',
'{actor.location}',
'{actor.year_of_birth}',
'{actor.gender}',
'{actor.level_of_education}',
'{actor.mailing_address}',
'{actor.city}',
'{actor.country}',
'{actor.state}',
'{actor.goals}',
'{actor.bio}',
'{actor.profile_image_uploaded_at}',
'{actor.phone_number}',
'{dump_id}',
'{dump_time}'
)"""

out_profile.append(profile_row)

# Now do the actual inserts...
vals = ",".join(out_external_id)
Expand Down Expand Up @@ -342,7 +327,7 @@ def do_queries(self, event_generator):
)

self._run_query_and_print(
"Count of enrollments for this actpr",
"Count of enrollments for this actor",
f"""
select count(*)
from {self.event_table_name}
Expand Down
5 changes: 3 additions & 2 deletions xapi_db_load/backends/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ class XAPILakeCSV:
CSV fake data lake implementation.
"""

def __init__(self, output_destination):
# This isn't really a database, so just faking out all of this.
def __init__(self, config):
output_destination = config['csv_output_destination']

self.xapi_csv_handle, self.xapi_csv_writer = self._get_csv_handle("xapi", output_destination)
self.course_csv_handle, self.course_csv_writer = self._get_csv_handle("courses", output_destination)
self.blocks_csv_handle, self.blocks_csv_writer = self._get_csv_handle("blocks", output_destination)
Expand Down
21 changes: 5 additions & 16 deletions xapi_db_load/backends/ralph_lrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,11 @@ class XAPILRSRalphClickhouse(XAPILakeClickhouse):
Wraps the XAPILakeClickhouse backend so that queries can be run against it while using Ralph to do the insertion.
"""

def __init__(
self,
db_host,
lrs_url,
lrs_username,
lrs_password,
db_port=18123,
db_username="default",
db_password=None,
db_name="xapi",
db_event_sink_name="event_sink",
):
super().__init__(db_host, db_port, db_username, db_password, db_name, db_event_sink_name)
self.lrs_url = lrs_url
self.lrs_username = lrs_username
self.lrs_password = lrs_password
def __init__(self, config):
super().__init__(config)
self.lrs_url = config["lrs_url"]
self.lrs_username = config["lrs_username"]
self.lrs_password = config["lrs_password"]

def batch_insert(self, events):
"""
Expand Down
27 changes: 13 additions & 14 deletions xapi_db_load/course_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,9 @@ def _serialize_block(self, block_type, block_id, cnt):
"org": self.org,
"course_key": self.course_id,
"location": block_id.split("/xblock/")[-1],
"display_name": f"{block_type} {cnt}",
# This is a catchall field, we don't currently use it
"xblock_data_json": "{}",
"display_name": f"{block_type.title()} {cnt}",
# This gets appended with location data below
"xblock_data_json": {"block_type": block_type},
"order": cnt,
"edited_on": self.end_date
}
Expand All @@ -261,8 +261,8 @@ def _serialize_course_block(self):
"course_key": self.course_id,
"location": f"block-v1:{location_course_id}+type@course+block@course",
"display_name": f"Course {self.course_uuid[:5]}",
# This is a catchall field, we don't currently use it
"xblock_data_json": "{}",
# This gets appended with location data below
"xblock_data_json": {"block_type": "course"},
"order": 1,
"edited_on": self.end_date
}
Expand Down Expand Up @@ -291,16 +291,16 @@ def serialize_block_data_for_event_sink(self):

# Get all of our blocks in order
for v in self.video_ids:
blocks.append(self._serialize_block("Video", v, cnt))
blocks.append(self._serialize_block("video", v, cnt))
cnt += 1
for p in self.problem_ids:
blocks.append(self._serialize_block("Problem", p, cnt))
blocks.append(self._serialize_block("problem", p, cnt))
cnt += 1

course_structure = [self._serialize_course_block()]

for c in self.chapter_ids:
course_structure.append(self._serialize_block("Chapter", c, cnt))
course_structure.append(self._serialize_block("chapter", c, cnt))
cnt += 1

for s in self.sequential_ids:
Expand All @@ -309,7 +309,7 @@ def serialize_block_data_for_event_sink(self):
# Start at 2 here to make sure it's after the course and first
# chapter block
random.randint(2, len(course_structure)),
self._serialize_block("Sequential", s, cnt)
self._serialize_block("sequential", s, cnt)
)
cnt += 1

Expand All @@ -319,7 +319,7 @@ def serialize_block_data_for_event_sink(self):
# Start at 3 here to make sure it's after the course and first
# chapter block and first sequential block
random.randint(2, len(course_structure)),
self._serialize_block("Vertical", v, cnt)
self._serialize_block("vertical", v, cnt)
)
cnt += 1

Expand Down Expand Up @@ -349,13 +349,12 @@ def serialize_block_data_for_event_sink(self):
elif block["display_name"].startswith("Vertical"):
unit_idx += 1

# In event-sink-clickhouse block_type is also included, but I'm
# omitting for now since we don't currently use it and this is
# already way too expensive an operation.
block["xblock_data_json"] = json.dumps({
block["xblock_data_json"].update({
"section": section_idx,
"subsection": subsection_idx,
"unit": unit_idx,
})

block["xblock_data_json"] = json.dumps(block["xblock_data_json"])

return course_structure
3 changes: 1 addition & 2 deletions xapi_db_load/generate_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ def setup_actors(self):
Random samplings of these will be passed into courses.
"""
for i in range(self.config["num_actors"]):
self.actors.append(Actor(i))
self.actors = [Actor(i) for i in range(self.config["num_actors"])]

def get_batch_events(self):
"""
Expand Down
63 changes: 10 additions & 53 deletions xapi_db_load/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,73 +6,30 @@
import os
from datetime import datetime

import click

from xapi_db_load.backends import clickhouse_lake as clickhouse
from xapi_db_load.backends import csv
from xapi_db_load.backends import ralph_lrs as ralph

timing = logging.getLogger("timing")


def get_backend_from_config(config):
class ConfigurationError(Exception):
"""
Return an instantiated backend from the given config dict.
Exception raised by backends when a configuration file is invalid.
"""
return get_backend(
config["backend"],
config.get("db_host"),
config.get("db_port"),
config.get("db_username"),
config.get("db_password"),
config.get("db_name"),
config.get("db_event_sink_name"),
config.get("s3_key"),
config.get("s3_secret"),
config.get("lrs_url"),
config.get("lrs_username"),
config.get("lrs_password"),
config.get("csv_output_destination"),
)


def get_backend(
backend, db_host, db_port, db_username, db_password, db_name, db_event_sink_name,
s3_key=None, s3_secret=None, lrs_url=None, lrs_username=None, lrs_password=None,
csv_output_destination=None
):


def get_backend_from_config(config):
"""
Return an instantiated backend from the given arguments.
Return an instantiated backend from the given config dict.
"""
backend = config["backend"]
if backend == "clickhouse":
lake = clickhouse.XAPILakeClickhouse(
db_host=db_host,
db_port=db_port,
db_username=db_username,
db_password=db_password,
db_name=db_name,
db_event_sink_name=db_event_sink_name,
s3_key=s3_key,
s3_secret=s3_secret
)
lake = clickhouse.XAPILakeClickhouse(config)
elif backend == "ralph_clickhouse":
lake = ralph.XAPILRSRalphClickhouse(
db_host=db_host,
db_port=db_port,
db_username=db_username,
db_password=db_password,
db_name=db_name,
db_event_sink_name=db_event_sink_name,
lrs_url=lrs_url,
lrs_username=lrs_username,
lrs_password=lrs_password,
)
lake = ralph.XAPILRSRalphClickhouse(config)
elif backend == "csv_file":
if not csv_output_destination:
raise click.UsageError(
"--csv_output_destination must be provided for this backend."
)
lake = csv.XAPILakeCSV(output_destination=csv_output_destination)
lake = csv.XAPILakeCSV(config)
else:
raise NotImplementedError(f"Unknown backend {backend}.")

Expand Down

0 comments on commit 42a3945

Please sign in to comment.