Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions example_configs/vector_small_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Use the vector backend to emit log statements that Vector will use to load the data
# to ClickHouse with insert statements

# Vector Backend configuration
# ################################
backend: vector
db_host: localhost
db_port: 8123
db_name: xapi
db_event_sink_name: event_sink
db_username: ch_admin
db_password: ...
s3_key: ...
s3_secret: ...

# Run options
log_dir: logs
num_xapi_batches: 3
batch_size: 100

# This number is used for each QueueBackend that use workers, so the number of threads if
# multiplicative. Generally this performs best less than 10, as more threads will cost more
# in context switching than they save.
num_workers: 4

# Overall start and end date for the entire run
start_date: 2014-01-01
end_date: 2023-11-27

# All courses will be this long, and be fit into the start / end dates
# This must be less than end_date - start_date days.
course_length_days: 120

# The size of the test
num_organizations: 3
num_actors: 10

# This replicates users updating their profiles several times, creating
# more rows
num_actor_profile_changes: 5

# How many of each size course to create. The sum of these is the total number
# of courses created for the test.
num_course_sizes:
small: 1
medium: 1
large: 1
huge: 1

# How many times each course will be "published", this creates a more realistic
# distribution of course blocks where each course can be published dozens or
# hundreds of times while it is being developed.
num_course_publishes: 100

# Course size configurations, how many of each type of object are created for
# each course of this size. "actors" must be less than or equal to "num_actors".
# For a course of this size to be created it needs to exist both here and in
# "num_course_sizes".
course_size_makeup:
small:
actors: 5
problems: 20
videos: 10
chapters: 3
sequences: 10
verticals: 20
forum_posts: 20
medium:
actors: 7
problems: 40
videos: 20
chapters: 4
sequences: 20
verticals: 30
forum_posts: 40
large:
actors: 10
problems: 80
videos: 30
chapters: 5
sequences: 40
verticals: 80
forum_posts: 200
huge:
actors: 10
problems: 160
videos: 40
chapters: 10
sequences: 50
verticals: 100
forum_posts: 1000
82 changes: 82 additions & 0 deletions xapi_db_load/backends/vector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""
A backend that simply logs the statements to a xapi_tracking logger.

Vector just reads the log statements, so all we need to do is emit them.
All other tasks use the raw Clickhouse inserts.
"""

import logging
import sys
from logging import Logger, getLogger
from typing import List

from xapi_db_load.backends.base_async_backend import (
BaseBackendTasks,
)
from xapi_db_load.backends.clickhouse import (
InsertBlocks,
InsertCourses,
InsertExternalIDs,
InsertInitialEnrollments,
InsertObjectTags,
InsertProfiles,
InsertTags,
InsertTaxonomies,
InsertXAPIEvents,
)
from xapi_db_load.generate_load_async import EventGenerator


class AsyncVectorTasks(BaseBackendTasks):
def __repr__(self) -> str:
return f"AsyncVectorTasks: {self.config['lrs_url']} -> {self.config['db_host']}"

def get_test_data_tasks(self):
"""
Return the tasks to be run.
"""
return [
self.event_generator,
InsertInitialEnrollments(self.config, self.logger, self.event_generator),
InsertCourses(self.config, self.logger, self.event_generator),
InsertBlocks(self.config, self.logger, self.event_generator),
InsertObjectTags(self.config, self.logger, self.event_generator),
InsertTaxonomies(self.config, self.logger, self.event_generator),
InsertTags(self.config, self.logger, self.event_generator),
InsertExternalIDs(self.config, self.logger, self.event_generator),
InsertProfiles(self.config, self.logger, self.event_generator),
# This is the only change from the ClickHouse backend
InsertXAPIEventsVector(self.config, self.logger, self.event_generator),
]


class InsertXAPIEventsVector(InsertXAPIEvents):
"""
Wraps the ClickHouse direct backend so that the rest of the metadata can be sent while using
Ralph to do the xAPI the insertion.
"""

def __init__(self, config: dict, logger: Logger, event_generator: EventGenerator):
super().__init__(config, logger, event_generator)

stream_handler = logging.StreamHandler(sys.stdout)
# This formatter is different from what the LMS uses, but is the smallest possible
# format that passes Vector's regex
formatter = logging.Formatter(" [{name}] [] {message}", style="{")
stream_handler.setFormatter(formatter)
self.xapi_logger = getLogger("xapi_tracking")
self.xapi_logger.setLevel(logging.INFO)
self.xapi_logger.addHandler(stream_handler)

def _format_row(self, row: dict):
"""
This overrides the ClickHouse backend's method to format the row for Ralph.
"""
return row["event"]

async def _do_insert(self, out_data: List):
"""
POST a batch of rows to Ralph instead of inserting directly to ClickHouse.
"""
for event_json in out_data:
self.xapi_logger.info(event_json)
5 changes: 5 additions & 0 deletions xapi_db_load/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from xapi_db_load.backends.clickhouse import AsyncClickHouseTasks
from xapi_db_load.backends.csv import AsyncCSVTasks
from xapi_db_load.backends.ralph import AsyncRalphTasks
from xapi_db_load.backends.vector import AsyncVectorTasks
from xapi_db_load.generate_load_async import EventGenerator


Expand Down Expand Up @@ -42,6 +43,10 @@ def set_backend(self, backend):
self.backend = AsyncRalphTasks(
self.config, self.logger, self.event_generator
)
elif backend == "vector":
self.backend = AsyncVectorTasks(
self.config, self.logger, self.event_generator
)
else:
raise ValueError("Invalid backend")

Expand Down
54 changes: 54 additions & 0 deletions xapi_db_load/tests/fixtures/small_vector_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Test configuration for Ralph / ClickHouse
# #########################################
backend: vector
db_host: localhost
db_port: 8123
db_name: xapi
db_username: ch_admin
db_password: foo

# Run options
log_dir: logs
num_xapi_batches: 3
batch_size: 100

# This number is used for each QueueBackend that use workers, so the number of threads if
# multiplicative. Generally this performs best less than 10, as more threads will cost more
# in context switching than they save.
num_workers: 4

# Overall start and end date for the entire run
start_date: 2014-01-01
end_date: 2023-11-27

# All courses will be this long, and be fit into the start / end dates
# This must be less than end_date - start_date days.
course_length_days: 120

# The size of the test
num_organizations: 3
num_actors: 10

# This replicates users updating their profiles several times, creating
# more rows
num_actor_profile_changes: 5

# How many of each size course to create. The sum of these is the total number
# of courses created for the test.
num_course_sizes:
small: 1

# How many times each course will be "published", this creates a more realistic
# distribution of course blocks where each course can be published dozens or
# hundreds of times while it is being developed.
num_course_publishes: 10

course_size_makeup:
small:
actors: 5
problems: 20
videos: 10
chapters: 3
sequences: 10
verticals: 20
forum_posts: 20
62 changes: 62 additions & 0 deletions xapi_db_load/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
"""

import gzip
import json
import os
import re
from contextlib import contextmanager
from unittest.mock import AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -120,6 +122,66 @@ def test_clickhouse_backend(_, tmp_path):
assert "Run duration was" in result.output


@patch(
"xapi_db_load.backends.base_async_backend.clickhouse_connect",
new_callable=AsyncMock,
)
@patch(
"xapi_db_load.backends.vector.getLogger",
new_callable=MagicMock,
)
def test_vector_backend(mock_get_logger, _, tmp_path):
"""
Run a test through the Vector backend, currently this just checks that the
output indicates success.
"""
test_path = "xapi_db_load/tests/fixtures/small_vector_config.yaml"

runner = CliRunner()

with override_config(test_path, tmp_path):
result = runner.invoke(
load_db,
f"--config_file {test_path}",
catch_exceptions=False,
)

# This test should create 300 xAPI log statemetns
assert mock_get_logger.return_value.info.call_count == 300

last_logged_statement = mock_get_logger.return_value.info.call_args.args[0]

# We check to make sure Vector's regex will parse what we're sending. We want it to match both
# the LMS and our local logger formatter.
# This is how things are generally formatted in the LMS
test_str_1 = f"2026-02-24 20:26:13,006 INFO 42 [xapi_tracking] [user None] [ip 172.19.0.1] logger.py:41 - {last_logged_statement}"

# This returns our message formatted with the abbreviated version we use for size and speed purposes
formatter = mock_get_logger.return_value.addHandler.call_args.args[0].formatter
test_str_2 = formatter._fmt.format(
name="xapi_tracking", message=last_logged_statement
)

# This is a direct copy and paste from Aspects' Vector common-post.toml
msg_regex = r"^.* \[xapi_tracking\] [^{}]* (?P<tracking_message>\{.*\})$"

# Quick test to make sure that what's being stored is at least parseable
for s in (test_str_1, test_str_2):
try:
statement = re.match(msg_regex, s).groups()[0]
json.loads(statement)
except Exception as e:
print(e)
print("Exception! Regex testing: ")
print(s)
raise

assert "Insert xAPI Events complete." in result.output
assert "Insert Initial Enrollments complete." in result.output
assert "ALL TASKS DONE!" in result.output
assert "Run duration was" in result.output


@patch("xapi_db_load.backends.ralph.requests", new_callable=AsyncMock)
@patch(
"xapi_db_load.backends.base_async_backend.clickhouse_connect",
Expand Down