From 10c3be6be9d8782d5d039f00838ec8bf02433512 Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Tue, 10 Mar 2026 13:43:10 -0400 Subject: [PATCH 1/2] feat: Add Vector backend --- example_configs/vector_small_example.yaml | 91 +++++++++++++++++++++++ xapi_db_load/backends/vector.py | 80 ++++++++++++++++++++ xapi_db_load/runner.py | 5 ++ 3 files changed, 176 insertions(+) create mode 100644 example_configs/vector_small_example.yaml create mode 100644 xapi_db_load/backends/vector.py diff --git a/example_configs/vector_small_example.yaml b/example_configs/vector_small_example.yaml new file mode 100644 index 0000000..25dfaf9 --- /dev/null +++ b/example_configs/vector_small_example.yaml @@ -0,0 +1,91 @@ +# Use the vector backend to emit log statements that Vector will use to load the data +# to ClickHouse with insert statements + +# Vector Backend configuration +# ################################ +backend: vector +db_host: localhost +db_port: 8123 +db_name: xapi +db_event_sink_name: event_sink +db_username: ch_admin +db_password: ... +s3_key: ... +s3_secret: ... + +# Run options +log_dir: logs +num_xapi_batches: 3 +batch_size: 100 + +# This number is used for each QueueBackend that use workers, so the number of threads if +# multiplicative. Generally this performs best less than 10, as more threads will cost more +# in context switching than they save. +num_workers: 4 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# This replicates users updating their profiles several times, creating +# more rows +num_actor_profile_changes: 5 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + medium: 1 + large: 1 + huge: 1 + +# How many times each course will be "published", this creates a more realistic +# distribution of course blocks where each course can be published dozens or +# hundreds of times while it is being developed. +num_course_publishes: 100 + +# Course size configurations, how many of each type of object are created for +# each course of this size. "actors" must be less than or equal to "num_actors". +# For a course of this size to be created it needs to exist both here and in +# "num_course_sizes". +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 + medium: + actors: 7 + problems: 40 + videos: 20 + chapters: 4 + sequences: 20 + verticals: 30 + forum_posts: 40 + large: + actors: 10 + problems: 80 + videos: 30 + chapters: 5 + sequences: 40 + verticals: 80 + forum_posts: 200 + huge: + actors: 10 + problems: 160 + videos: 40 + chapters: 10 + sequences: 50 + verticals: 100 + forum_posts: 1000 diff --git a/xapi_db_load/backends/vector.py b/xapi_db_load/backends/vector.py new file mode 100644 index 0000000..09baa42 --- /dev/null +++ b/xapi_db_load/backends/vector.py @@ -0,0 +1,80 @@ +""" +A backend that simply logs the statements to a xapi_tracking logger. + +Vector just reads the log statements, so all we need to do is emit them. +All other tasks use the raw Clickhouse inserts. +""" + +import logging +import sys +from logging import Logger, getLogger +from typing import List + +from xapi_db_load.backends.base_async_backend import ( + BaseBackendTasks, +) +from xapi_db_load.backends.clickhouse import ( + InsertBlocks, + InsertCourses, + InsertExternalIDs, + InsertInitialEnrollments, + InsertObjectTags, + InsertProfiles, + InsertTags, + InsertTaxonomies, + InsertXAPIEvents, +) +from xapi_db_load.generate_load_async import EventGenerator + + +class AsyncVectorTasks(BaseBackendTasks): + def __repr__(self) -> str: + return f"AsyncVectorTasks: {self.config['lrs_url']} -> {self.config['db_host']}" + + def get_test_data_tasks(self): + """ + Return the tasks to be run. + """ + return [ + self.event_generator, + InsertInitialEnrollments(self.config, self.logger, self.event_generator), + InsertCourses(self.config, self.logger, self.event_generator), + InsertBlocks(self.config, self.logger, self.event_generator), + InsertObjectTags(self.config, self.logger, self.event_generator), + InsertTaxonomies(self.config, self.logger, self.event_generator), + InsertTags(self.config, self.logger, self.event_generator), + InsertExternalIDs(self.config, self.logger, self.event_generator), + InsertProfiles(self.config, self.logger, self.event_generator), + # This is the only change from the ClickHouse backend + InsertXAPIEventsVector(self.config, self.logger, self.event_generator), + ] + + +class InsertXAPIEventsVector(InsertXAPIEvents): + """ + Wraps the ClickHouse direct backend so that the rest of the metadata can be sent while using + Ralph to do the xAPI the insertion. + """ + + def __init__(self, config: dict, logger: Logger, event_generator: EventGenerator): + super().__init__(config, logger, event_generator) + + stream_handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("[%(name)s] %(message)s") + stream_handler.setFormatter(formatter) + self.xapi_logger = getLogger("xapi_tracking") + self.xapi_logger.setLevel(logging.INFO) + self.xapi_logger.addHandler(stream_handler) + + def _format_row(self, row: dict): + """ + This overrides the ClickHouse backend's method to format the row for Ralph. + """ + return row["event"] + + async def _do_insert(self, out_data: List): + """ + POST a batch of rows to Ralph instead of inserting directly to ClickHouse. + """ + for event_json in out_data: + self.xapi_logger.info(event_json) diff --git a/xapi_db_load/runner.py b/xapi_db_load/runner.py index 933275a..0d102d6 100644 --- a/xapi_db_load/runner.py +++ b/xapi_db_load/runner.py @@ -7,6 +7,7 @@ from xapi_db_load.backends.clickhouse import AsyncClickHouseTasks from xapi_db_load.backends.csv import AsyncCSVTasks from xapi_db_load.backends.ralph import AsyncRalphTasks +from xapi_db_load.backends.vector import AsyncVectorTasks from xapi_db_load.generate_load_async import EventGenerator @@ -42,6 +43,10 @@ def set_backend(self, backend): self.backend = AsyncRalphTasks( self.config, self.logger, self.event_generator ) + elif backend == "vector": + self.backend = AsyncVectorTasks( + self.config, self.logger, self.event_generator + ) else: raise ValueError("Invalid backend") From fca2cab85cb16c3aae07739fe04eef99a29acc0f Mon Sep 17 00:00:00 2001 From: Tycho Hob Date: Tue, 10 Mar 2026 15:29:16 -0400 Subject: [PATCH 2/2] fix: Fix Vector formatter, add tests --- xapi_db_load/backends/vector.py | 4 +- .../tests/fixtures/small_vector_config.yaml | 54 ++++++++++++++++ xapi_db_load/tests/test_backends.py | 62 +++++++++++++++++++ 3 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 xapi_db_load/tests/fixtures/small_vector_config.yaml diff --git a/xapi_db_load/backends/vector.py b/xapi_db_load/backends/vector.py index 09baa42..2139d72 100644 --- a/xapi_db_load/backends/vector.py +++ b/xapi_db_load/backends/vector.py @@ -60,7 +60,9 @@ def __init__(self, config: dict, logger: Logger, event_generator: EventGenerator super().__init__(config, logger, event_generator) stream_handler = logging.StreamHandler(sys.stdout) - formatter = logging.Formatter("[%(name)s] %(message)s") + # This formatter is different from what the LMS uses, but is the smallest possible + # format that passes Vector's regex + formatter = logging.Formatter(" [{name}] [] {message}", style="{") stream_handler.setFormatter(formatter) self.xapi_logger = getLogger("xapi_tracking") self.xapi_logger.setLevel(logging.INFO) diff --git a/xapi_db_load/tests/fixtures/small_vector_config.yaml b/xapi_db_load/tests/fixtures/small_vector_config.yaml new file mode 100644 index 0000000..0bc4df6 --- /dev/null +++ b/xapi_db_load/tests/fixtures/small_vector_config.yaml @@ -0,0 +1,54 @@ +# Test configuration for Ralph / ClickHouse +# ######################################### +backend: vector +db_host: localhost +db_port: 8123 +db_name: xapi +db_username: ch_admin +db_password: foo + +# Run options +log_dir: logs +num_xapi_batches: 3 +batch_size: 100 + +# This number is used for each QueueBackend that use workers, so the number of threads if +# multiplicative. Generally this performs best less than 10, as more threads will cost more +# in context switching than they save. +num_workers: 4 + +# Overall start and end date for the entire run +start_date: 2014-01-01 +end_date: 2023-11-27 + +# All courses will be this long, and be fit into the start / end dates +# This must be less than end_date - start_date days. +course_length_days: 120 + +# The size of the test +num_organizations: 3 +num_actors: 10 + +# This replicates users updating their profiles several times, creating +# more rows +num_actor_profile_changes: 5 + +# How many of each size course to create. The sum of these is the total number +# of courses created for the test. +num_course_sizes: + small: 1 + +# How many times each course will be "published", this creates a more realistic +# distribution of course blocks where each course can be published dozens or +# hundreds of times while it is being developed. +num_course_publishes: 10 + +course_size_makeup: + small: + actors: 5 + problems: 20 + videos: 10 + chapters: 3 + sequences: 10 + verticals: 20 + forum_posts: 20 diff --git a/xapi_db_load/tests/test_backends.py b/xapi_db_load/tests/test_backends.py index e61d60f..abbee14 100644 --- a/xapi_db_load/tests/test_backends.py +++ b/xapi_db_load/tests/test_backends.py @@ -3,7 +3,9 @@ """ import gzip +import json import os +import re from contextlib import contextmanager from unittest.mock import AsyncMock, MagicMock, patch @@ -120,6 +122,66 @@ def test_clickhouse_backend(_, tmp_path): assert "Run duration was" in result.output +@patch( + "xapi_db_load.backends.base_async_backend.clickhouse_connect", + new_callable=AsyncMock, +) +@patch( + "xapi_db_load.backends.vector.getLogger", + new_callable=MagicMock, +) +def test_vector_backend(mock_get_logger, _, tmp_path): + """ + Run a test through the Vector backend, currently this just checks that the + output indicates success. + """ + test_path = "xapi_db_load/tests/fixtures/small_vector_config.yaml" + + runner = CliRunner() + + with override_config(test_path, tmp_path): + result = runner.invoke( + load_db, + f"--config_file {test_path}", + catch_exceptions=False, + ) + + # This test should create 300 xAPI log statemetns + assert mock_get_logger.return_value.info.call_count == 300 + + last_logged_statement = mock_get_logger.return_value.info.call_args.args[0] + + # We check to make sure Vector's regex will parse what we're sending. We want it to match both + # the LMS and our local logger formatter. + # This is how things are generally formatted in the LMS + test_str_1 = f"2026-02-24 20:26:13,006 INFO 42 [xapi_tracking] [user None] [ip 172.19.0.1] logger.py:41 - {last_logged_statement}" + + # This returns our message formatted with the abbreviated version we use for size and speed purposes + formatter = mock_get_logger.return_value.addHandler.call_args.args[0].formatter + test_str_2 = formatter._fmt.format( + name="xapi_tracking", message=last_logged_statement + ) + + # This is a direct copy and paste from Aspects' Vector common-post.toml + msg_regex = r"^.* \[xapi_tracking\] [^{}]* (?P\{.*\})$" + + # Quick test to make sure that what's being stored is at least parseable + for s in (test_str_1, test_str_2): + try: + statement = re.match(msg_regex, s).groups()[0] + json.loads(statement) + except Exception as e: + print(e) + print("Exception! Regex testing: ") + print(s) + raise + + assert "Insert xAPI Events complete." in result.output + assert "Insert Initial Enrollments complete." in result.output + assert "ALL TASKS DONE!" in result.output + assert "Run duration was" in result.output + + @patch("xapi_db_load.backends.ralph.requests", new_callable=AsyncMock) @patch( "xapi_db_load.backends.base_async_backend.clickhouse_connect",