diff --git a/pyproject.toml b/pyproject.toml index aa9d7087..1274573c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ bugtracker = "https://github.com/ad-freiburg/qlever/issues" [project.scripts] "qlever" = "qlever.qlever_main:main" +"qoxigraph" = "qlever.qlever_main:main" [tool.setuptools] package-data = { "qlever" = ["Qleverfiles/*"] } diff --git a/src/qlever/commands/index.py b/src/qlever/commands/index.py index d47f616f..5f0cb4b5 100644 --- a/src/qlever/commands/index.py +++ b/src/qlever/commands/index.py @@ -8,6 +8,7 @@ from qlever.command import QleverCommand from qlever.containerize import Containerize from qlever.log import log +from qlever.memory_monitor import MemoryMonitor from qlever.util import ( binary_exists, get_existing_index_files, @@ -322,7 +323,13 @@ def execute(self, args) -> bool: # Run the index command. try: - run_command(index_cmd, show_output=True) + with MemoryMonitor( + dataset=args.name, + cmdline_regex=args.index_binary, + container=args.index_container, + system=args.system, + ): + run_command(index_cmd, show_output=True) except Exception as e: log.error(f"Building the index failed: {e}") return False diff --git a/src/qlever/memory_monitor.py b/src/qlever/memory_monitor.py new file mode 100644 index 00000000..ea33df2c --- /dev/null +++ b/src/qlever/memory_monitor.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import json +import re +import threading +import time +from datetime import datetime +from pathlib import Path + +import psutil + +from qlever import engine_name +from qlever.containerize import Containerize +from qlever.log import log +from qlever.util import format_size, run_command + + +def parse_container_mem_usage(usage: str) -> int: + """ + Parse a memory usage string from ``docker stats`` or ``podman stats`` + into bytes. Docker reports binary units (GiB, MiB) while Podman + reports decimal units (GB, MB). + """ + usage = usage.strip() + units = { + "TIB": 1024**4, + "TB": 1000**4, + "GIB": 1024**3, + "GB": 1000**3, + "MIB": 1024**2, + "MB": 1000**2, + "KIB": 1024, + "KB": 1000, + "B": 1, + } + for suffix, multiplier in units.items(): + if usage.upper().endswith(suffix): + number = float(usage[: len(usage) - len(suffix)]) + return int(number * multiplier) + return 0 + + +class MemoryMonitor: + """ + Monitor memory usage of an index-building process. Works in both + native mode (via psutil) and container mode (via docker/podman stats). + + Usage as a context manager: + + with MemoryMonitor(dataset="wikidata", cmdline_regex="qlever-index"): + run_command(cmd, show_output=True) + + # For container mode: + with MemoryMonitor(dataset="wikidata", + cmdline_regex="qlever-index", + container="qlever.index.wikidata", + system="docker"): + run_command(cmd, show_output=True) + """ + + def __init__( + self, + dataset: str, + cmdline_regex: str, + container: str | None = None, + system: str | None = None, + interval: float = 1.0, + output_dir: Path = Path.cwd(), + ): + """ + Args: + dataset: Name of the dataset being indexed. + cmdline_regex: Regex matched against child process command + lines to identify the index process (native + mode only). + container: Container name to query for memory stats. + When set together with ``system``, sampling + uses ``docker/podman stats`` instead of + psutil. + system: Container runtime ("docker" or "podman"). + interval: Seconds between samples (default 1.0). + output_dir: Directory for the JSON memory log file. + """ + self.engine = engine_name + self.dataset = dataset + self.cmdline_regex = cmdline_regex + self.container = container + self.system = system + self.interval = interval + self.output_dir = Path(output_dir) + self.peak_rss = 0 + self.samples = [] + self.stop_event = threading.Event() + self.thread = None + self.start_time = 0 + + def sample_native(self) -> int: + """ + Find the index process among our children by matching its + command line, then sum RSS of that process and all its + descendants. + """ + me = psutil.Process() + for child in me.children(recursive=True): + try: + cmdline = " ".join(child.cmdline()) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + if re.search(self.cmdline_regex, cmdline): + rss = child.memory_info().rss + for grandchild in child.children(recursive=True): + try: + rss += grandchild.memory_info().rss + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + return rss + return 0 + + def sample_container(self) -> int: + """ + Query the container runtime for the memory usage of the + index container. + """ + try: + output = run_command( + f"{self.system} stats --no-stream" + f" --format '{{{{.MemUsage}}}}' {self.container}", + return_output=True, + ) + usage = output.strip().split("/")[0].strip() + return parse_container_mem_usage(usage) + except Exception: + return 0 + + def run_loop(self): + """ + Polling loop that runs on a background thread. Selects the + appropriate sampling method (native or container) and collects + (elapsed_seconds, rss_bytes) tuples until the stop event is set. + """ + sample = ( + self.sample_container + if self.system in Containerize.supported_systems() + else self.sample_native + ) + while not self.stop_event.is_set(): + rss = sample() + self.peak_rss = max(self.peak_rss, rss) + elapsed = time.monotonic() - self.start_time + self.samples.append((elapsed, rss)) + self.stop_event.wait(self.interval) + + def save(self): + """ + Write all collected samples and metadata to a JSON file at + ``/..memory-log.json``. + """ + path = ( + self.output_dir + / f"{self.engine.lower()}.{self.dataset.lower()}.memory-log.json" + ) + data = { + "engine": self.engine, + "dataset": self.dataset, + "start_time": datetime.fromtimestamp( + time.time() - (time.monotonic() - self.start_time) + ).isoformat(timespec="seconds"), + "peak_rss_bytes": self.peak_rss, + "peak_rss_human": format_size(self.peak_rss), + "elapsed_s": ( + round(self.samples[-1][0], 1) if self.samples else 0 + ), + "samples": [ + {"elapsed_s": round(t, 1), "rss_bytes": r} + for t, r in self.samples + ], + } + with open(path, "w") as f: + json.dump(data, f, indent=2) + + def __enter__(self): + """Start the background sampling thread.""" + self.start_time = time.monotonic() + self.thread = threading.Thread(target=self.run_loop, daemon=True) + self.thread.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Stop sampling, persist results, and log peak memory usage.""" + self.stop_event.set() + self.thread.join() + self.save() + log.info(f"Peak memory usage: {format_size(self.peak_rss)}") + return False diff --git a/src/qoxigraph/__init__.py b/src/qoxigraph/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/qoxigraph/commands/__init__.py b/src/qoxigraph/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/qoxigraph/commands/benchmark_queries.py b/src/qoxigraph/commands/benchmark_queries.py new file mode 100644 index 00000000..4f285520 --- /dev/null +++ b/src/qoxigraph/commands/benchmark_queries.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from qlever.commands.benchmark_queries import ( + BenchmarkQueriesCommand as QleverBenchmarkQueriesCommand, +) + + +class BenchmarkQueriesCommand(QleverBenchmarkQueriesCommand): + """ + Run benchmark queries against the Oxigraph SPARQL endpoint. + Overrides the default endpoint to use Oxigraph's /query path. + """ + + def execute(self, args) -> bool: + if not args.sparql_endpoint: + args.sparql_endpoint = f"{args.host_name}:{args.port}/query" + return super().execute(args) diff --git a/src/qoxigraph/commands/get_data.py b/src/qoxigraph/commands/get_data.py new file mode 100644 index 00000000..29bba0e2 --- /dev/null +++ b/src/qoxigraph/commands/get_data.py @@ -0,0 +1 @@ +from qlever.commands.get_data import GetDataCommand # noqa diff --git a/src/qoxigraph/commands/index.py b/src/qoxigraph/commands/index.py new file mode 100644 index 00000000..e3915ec9 --- /dev/null +++ b/src/qoxigraph/commands/index.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +import shlex +import time +from pathlib import Path + +import qlever.util as util +from qlever.command import QleverCommand +from qlever.containerize import Containerize +from qlever.log import log +from qlever.memory_monitor import MemoryMonitor + + +def wrap_cmd_in_container(args, cmd: str, ulimit: int | None = None) -> str: + """ + Wrap an indexing command in a container that is automatically removed + after the process exits (`--rm`) Use `use_bash=False` as Oxigraph image + doesn't support bash entrypoint. + """ + run_subcommand = "run --rm" + if ulimit: + run_subcommand += f" --ulimit nofile={ulimit}:{ulimit}" + return Containerize().containerize_command( + cmd=cmd, + container_system=args.system, + run_subcommand=run_subcommand, + image_name=args.image, + container_name=args.index_container, + volumes=[("$(pwd)", "/opt")], + working_directory="/opt", + use_bash=False, + ) + + +class IndexCommand(QleverCommand): + """ + Build an Oxigraph index for an RDF dataset. The indexing workflow is: + 1. Run `oxigraph load` to import input files into a RocksDB store. + 2. Optionally run `oxigraph optimize` to compact storage for read-only use. + + For large datasets (>5 GB), the file descriptor ulimit is raised + automatically because RocksDB opens many .sst files concurrently. + """ + + def __init__(self): + pass + + def description(self) -> str: + return "Build the index for a given RDF dataset" + + def should_have_qleverfile(self) -> bool: + return True + + def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: + return { + "data": ["name", "format"], + "index": [ + "input_files", + "ulimit", + "index_binary", + "lenient", + "extra_args", + ], + "server": ["read_only"], + "runtime": ["system", "image", "index_container"], + } + + def additional_arguments(self, subparser): + pass + + def execute(self, args) -> bool: + cmds_to_execute = [] + index_cmd = ( + f"load {'--lenient ' if args.lenient == 'yes' else ''}" + f"--location {args.name}_index/ --file {args.input_files} " + f"{args.extra_args} |& tee {args.name}.index-log.txt" + ) + + ulimit = args.ulimit + # RocksDB opens many .sst files concurrently. For datasets larger + # than 5 GB, raise the file descriptor limit so the process does + # not hit the default OS soft limit. + total_file_size = util.get_total_file_size( + shlex.split(args.input_files) + ) + if not ulimit and total_file_size > 5e9: + ulimit = 500_000 + if args.system in Containerize.supported_systems(): + index_cmd = wrap_cmd_in_container(args, index_cmd, ulimit) + else: + index_cmd = f"{args.index_binary} {index_cmd}" + if ulimit: + index_cmd = f"ulimit -Sn {ulimit} && {index_cmd}" + + cmds_to_execute.append(index_cmd) + + # Compact the RocksDB storage for read-only serving. This reduces + # disk usage and speeds up queries but makes the index immutable. + optimize_cmd = None + if args.read_only == "yes": + optimize_cmd = f"optimize -l {args.name}_index/" + if args.system in Containerize.supported_systems(): + optimize_cmd = wrap_cmd_in_container(args, optimize_cmd) + else: + optimize_cmd = f"{args.index_binary} {optimize_cmd}" + cmds_to_execute.append(optimize_cmd) + + # Show the command line. + self.show("\n".join(cmds_to_execute), only_show=args.show) + if args.show: + return True + + if not util.input_files_exist(args.input_files): + return False + + # When running natively, check if the binary exists and works. + if args.system in Containerize.supported_systems(): + if Containerize().is_running(args.system, args.index_container): + log.info( + f"{args.system} container {args.index_container} is still up, " + "which means that data loading is in progress. Please wait..." + ) + return False + else: + if not util.binary_exists(args.index_binary, "index-binary", args): + return False + + # Abort if a previous index already exists. RocksDB .sst files in + # the index directory indicate an existing store. + if ( + len([p.name for p in Path(f"{args.name}_index").glob("*.sst")]) + != 0 + ): + log.error( + f"Index files (*.sst) found in {args.name}_index directory " + "which shows presence of a previous index" + ) + log.info("") + log.info("Aborting the index operation...") + return False + + # Run the index command and record the elapsed time in the log + # file. Oxigraph's progress output is unreliable (may not print a + # final summary line when loading multiple files), so we measure + # the time externally. + # + # The MemoryMonitor wraps both the load and optimize steps so + # that peak RSS is tracked across the entire indexing workflow. + log_file_name = f"{args.name}.index-log.txt" + with MemoryMonitor( + engine="qoxigraph", + dataset=args.name, + cmdline_regex=args.index_binary, + container=args.index_container, + system=args.system, + ): + try: + load_start = time.time() + util.run_command( + index_cmd, show_output=True, show_stderr=True + ) + load_s = time.time() - load_start + except Exception as e: + log.error(f"Building the index failed: {e}") + return False + + optimize_s = 0.0 + if optimize_cmd: + try: + log.info("") + log.info("Optimizing read-only database storage:") + self.show(optimize_cmd) + optimize_start = time.time() + util.run_command( + optimize_cmd, show_output=True, show_stderr=True + ) + optimize_s = time.time() - optimize_start + except Exception as e: + log.error(f"Optimizing the database storage failed: {e}") + log.info( + f"Please run manually: " + f"{args.index_binary} optimize -l {args.name}_index/" + ) + + with open(log_file_name, "a") as f: + f.write(f"Load time: {load_s:.0f}s\n") + if optimize_cmd: + f.write(f"Optimize time: {optimize_s:.0f}s\n") + f.write( + f"Total elapsed time: {load_s + optimize_s:.0f}s\n" + ) + + return True diff --git a/src/qoxigraph/commands/index_stats.py b/src/qoxigraph/commands/index_stats.py new file mode 100644 index 00000000..5e386bf6 --- /dev/null +++ b/src/qoxigraph/commands/index_stats.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import re + +import qlever.util as util +from qlever.commands.index_stats import ( + IndexStatsCommand as QleverIndexStatsCommand, +) +from qlever.commands.index_stats import ( + get_size_unit, + get_size_unit_factor, + get_time_unit, + get_time_unit_factor, +) +from qlever.log import log + + +class IndexStatsCommand(QleverIndexStatsCommand): + """ + Show index build time and disk space usage for an Oxigraph dataset. + Time is read from the "Total elapsed time" line appended to the + index log by the index command; space is the sum of all .sst files. + """ + + def execute_time( + self, args, log_file_name: str + ) -> dict[str, tuple[float | None, str]]: + """Parse index build times from the index log file.""" + try: + # Read the last few lines of the log file (the times are + # always near the end). + log_text = util.run_command( + f"tail {log_file_name}", return_output=True + ) + except Exception as e: + log.error(f"Problem reading index log file {log_file_name}: {e}") + return {} + + patterns = { + "Load time": re.compile(r"Load time: ([\d,]+)s$"), + "Optimize time": re.compile(r"Optimize time: ([\d,]+)s$"), + "TOTAL time": re.compile(r"Total elapsed time: ([\d,]+)s$"), + } + + raw_seconds = {} + for line in log_text.splitlines(): + for name, pattern in patterns.items(): + match = pattern.search(line) + if match: + try: + raw_seconds[name] = float( + match.group(1).replace(",", "") + ) + except (ValueError, TypeError): + pass + + if not raw_seconds: + return {} + + # Pick a time unit based on the total time. + total_s = raw_seconds.get("TOTAL time") + time_unit = get_time_unit(args.time_unit, total_s) + unit_factor = get_time_unit_factor(time_unit) + + stats = {} + for name in ["Load time", "Optimize time", "TOTAL time"]: + if name in raw_seconds: + stats[name] = (raw_seconds[name] / unit_factor, time_unit) + + # If there was no optimize step, Load and TOTAL are identical + if "Optimize time" not in stats: + stats.pop("Load time", None) + + return stats + + def execute_space(self, args) -> dict[str, tuple[float, str]]: + """ + Return the space used by the index files (*.sst) along with the unit. + """ + index_size = util.get_total_file_size([f"{args.name}_index/*.sst"]) + + size_unit = get_size_unit(args.size_unit, index_size) + unit_factor = get_size_unit_factor(size_unit) + + index_size /= unit_factor + + return {"TOTAL size": (index_size, size_unit)} diff --git a/src/qoxigraph/commands/log.py b/src/qoxigraph/commands/log.py new file mode 100644 index 00000000..401d2148 --- /dev/null +++ b/src/qoxigraph/commands/log.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from qlever import script_name +from qlever.commands.log import LogCommand as QleverLogCommand +from qlever.containerize import Containerize +from qlever.log import log +from qlever.util import run_command + + +class LogCommand(QleverLogCommand): + """ + Show server logs for Oxigraph. For native execution, tails the log + file as usual. For containers, uses `docker/podman logs` as it is + not possible to redirect oxigraph logs to a log file. + """ + + def __init__(self): + pass + + def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: + return { + "data": ["name"], + "runtime": [ + "system", + "image", + "server_container", + ], + } + + def execute(self, args) -> bool: + if args.system not in Containerize.supported_systems(): + return super().execute(args) + + # Handle container logging using docker/podman logs command instead of tail + # This is because we don't have .server-log.txt for + # containerized execution + log_cmd = f"{args.system} logs " + + if not args.from_beginning: + log_cmd += f"-n {args.tail_num_lines} " + if not args.no_follow: + log_cmd += "-f " + + log_cmd += args.server_container + + # Show the command line. + self.show(log_cmd, only_show=args.show) + if args.show: + return True + + if not Containerize().is_running(args.system, args.server_container): + log.error(f"No server container {args.server_container} found!\n") + log.info(f"Are you sure you called `{script_name} start`?") + return False + + try: + run_command(log_cmd, show_output=True, show_stderr=True) + except Exception as e: + log.error(f"Cannot display container logs - {e}") + return True diff --git a/src/qoxigraph/commands/query.py b/src/qoxigraph/commands/query.py new file mode 100644 index 00000000..bc3fb35c --- /dev/null +++ b/src/qoxigraph/commands/query.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from qlever.commands.query import QueryCommand as QleverQueryCommand + + +class QueryCommand(QleverQueryCommand): + """ + Send a SPARQL query to the Oxigraph server. Extends the base query + command with Oxigraph's /query endpoint and supported result formats. + This class is used as the base QueryCommand by all the other new engines. + """ + + def additional_arguments(self, subparser) -> None: + subparser.add_argument( + "query", + type=str, + nargs="?", + default="SELECT * WHERE { ?s ?p ?o } LIMIT 10", + help="SPARQL query to send", + ) + subparser.add_argument( + "--predefined-query", + type=str, + choices=self.predefined_queries.keys(), + help="Use a predefined query", + ) + subparser.add_argument( + "--sparql-endpoint", type=str, help="URL of the SPARQL endpoint" + ) + subparser.add_argument( + "--accept", + type=str, + choices=[ + "text/tab-separated-values", + "text/csv", + "application/sparql-results+json", + "application/sparql-results+xml", + ], + default="text/tab-separated-values", + help="Accept header for the SPARQL query", + ) + subparser.add_argument( + "--get", + action="store_true", + default=False, + help="Use GET request instead of POST", + ) + subparser.add_argument( + "--no-time", + action="store_true", + default=False, + help="Do not print the (end-to-end) time taken", + ) + + def execute(self, args) -> bool: + # Oxigraph's SPARQL endpoint is at /query. + if not args.sparql_endpoint: + args.sparql_endpoint = f"{args.host_name}:{args.port}/query" + # These QLever-specific options are not supported by Oxigraph. + args.pin_to_cache = None + args.access_token = None + return super().execute(args) diff --git a/src/qoxigraph/commands/setup_config.py b/src/qoxigraph/commands/setup_config.py new file mode 100644 index 00000000..ed286269 --- /dev/null +++ b/src/qoxigraph/commands/setup_config.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from configparser import RawConfigParser +from pathlib import Path + +from qlever.commands.setup_config import ( + SetupConfigCommand as QleverSetupConfigCommand, +) +from qlever.log import log +from qlever.qleverfile import Qleverfile + + +class SetupConfigCommand(QleverSetupConfigCommand): + """ + Create a Qleverfile for Oxigraph from a dataset template from `src/qlever/Qleverfiles`. + Filters the template to keep only the relevant sections and adds Oxigraph-specific + defaults (read-only mode, query timeout). + This class is used as the base SetupConfigCommand by all the other new engines. + """ + + IMAGE = "ghcr.io/oxigraph/oxigraph" + + # Sections and keys to retain when filtering a Qleverfile template. + FILTER_CRITERIA = { + "data": [], + "index": ["INPUT_FILES"], + "server": ["PORT"], + "runtime": ["SYSTEM", "IMAGE"], + "ui": ["UI_CONFIG"], + } + + @staticmethod + def construct_engine_specific_params(args) -> dict[str, dict[str, str]]: + """Return Oxigraph-specific defaults to inject into the Qleverfile.""" + return {"server": {"READ_ONLY": "yes", "TIMEOUT": "60s"}} + + @staticmethod + def add_engine_specific_option_values( + qleverfile_parser: RawConfigParser, + engine_specific_params: dict[str, dict[str, str]], + ) -> None: + """Merge engine-specific parameters into the Qleverfile parser.""" + for section, option_dict in engine_specific_params.items(): + if qleverfile_parser.has_section(section): + for option, value in option_dict.items(): + qleverfile_parser.set(section, option, value) + + def execute(self, args) -> bool: + # Construct the command line and show it. + template_path = ( + self.qleverfiles_path / f"Qleverfile.{args.config_name}" + ) + setup_config_show = ( + f"Qleverfile for {args.config_name} will be created using " + f"Qleverfile.{args.config_name} file in {template_path}" + ) + self.show(setup_config_show, only_show=args.show) + if args.show: + return True + + # If there is already a Qleverfile in the current directory, exit. + if self.check_qleverfile_exists(): + return False + + qleverfile_path = Path("Qleverfile") + + try: + qleverfile_parser = Qleverfile.filter( + template_path, self.FILTER_CRITERIA + ) + qleverfile_parser.set("runtime", "IMAGE", self.IMAGE) + params = self.construct_engine_specific_params(args) + self.add_engine_specific_option_values(qleverfile_parser, params) + for section, arg_name in self.override_args: + if arg_value := getattr(args, arg_name, None): + qleverfile_parser.set( + section, arg_name.upper(), str(arg_value) + ) + with qleverfile_path.open("w") as f: + qleverfile_parser.write(f) + + log.info( + f'Created Qleverfile for config "{args.config_name}"' + f" in current directory" + ) + return True + except Exception as e: + log.error( + f'Could not copy "{qleverfile_path}" to current directory: {e}' + ) + return False diff --git a/src/qoxigraph/commands/start.py b/src/qoxigraph/commands/start.py new file mode 100644 index 00000000..7fbfd81f --- /dev/null +++ b/src/qoxigraph/commands/start.py @@ -0,0 +1,234 @@ +from __future__ import annotations + +import subprocess +import time +from pathlib import Path + +from qlever import script_name +from qlever.command import QleverCommand +from qlever.containerize import Containerize +from qlever.log import log +from qlever.util import ( + binary_exists, + is_server_alive, + run_command, + tail_log_file, +) +from qoxigraph.commands.stop import StopCommand + + +def timeout_supported(args, serve_ps: str) -> bool: + """Check whether the oxigraph server binary supports query timeouts.""" + help_cmd = f"{serve_ps} --help" + if args.system in Containerize.supported_systems(): + help_cmd = f"{args.system} run --rm {args.image} {help_cmd}" + else: + help_cmd = f"{args.server_binary} {help_cmd}" + try: + help_output = run_command(help_cmd, return_output=True) + return "timeout-s" in help_output + except Exception as e: + log.warning( + "Could not determine if query timeouts are supported by this version " + f"of Oxigraph! Falling back to no timeouts. Error: {e}", + ) + return False + + +def wrap_cmd_in_container(args, cmd: str) -> str: + """Wrap the server start command in a container with restart policy.""" + run_subcommand = "run --restart=unless-stopped" + if not args.run_in_foreground: + run_subcommand += " -d" + return Containerize().containerize_command( + cmd=cmd, + container_system=args.system, + run_subcommand=run_subcommand, + image_name=args.image, + container_name=args.server_container, + volumes=[("$(pwd)", "/opt")], + ports=[(args.port, args.port)], + working_directory="/opt", + use_bash=False, + ) + + +class StartCommand(QleverCommand): + """ + Start the Oxigraph SPARQL server for an already-indexed dataset. + Supports both native and containerized execution, with an option + to run in the foreground. Uses `serve-read-only` or `serve` + depending on the read_only setting. + """ + + def __init__(self): + pass + + def description(self) -> str: + return ( + "Start the server for Oxigraph (requires that you have built an " + "index before)" + ) + + def should_have_qleverfile(self) -> bool: + return True + + def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: + return { + "data": ["name"], + "server": [ + "host_name", + "port", + "read_only", + "server_binary", + "timeout", + "extra_args", + ], + "runtime": ["system", "image", "server_container"], + } + + def additional_arguments(self, subparser): + subparser.add_argument( + "--run-in-foreground", + action="store_true", + default=False, + help=( + "Run the start command in the foreground " + "(default: run in the background)" + ), + ) + + def execute(self, args) -> bool: + # Inside a container, bind to 0.0.0.0 so the port mapping is + # reachable from the host; natively, bind to the configured host. + bind = ( + f"0.0.0.0:{args.port}" + if args.system in Containerize.supported_systems() + else f"{args.host_name}:{args.port}" + ) + process = "serve-read-only" if args.read_only == "yes" else "serve" + timeout_str = "" + if timeout_supported(args, process): + try: + timeout_s = int(args.timeout[:-1]) + except ValueError as e: + log.warning( + f"Invalid timeout value {args.timeout}. Error: {e}" + ) + log.info("Setting timeout to 60s!") + timeout_s = 60 + timeout_str = f"--timeout-s {timeout_s}" + else: + log.info( + f"Ignoring the set timeout value of {args.timeout} as your " + "version of Oxigraph doesn't currently support query timeouts!" + ) + + start_cmd = ( + f"{process} --location {args.name}_index/ {args.extra_args} " + f"{timeout_str} --bind={bind}" + ) + + if args.system in Containerize.supported_systems(): + start_cmd = wrap_cmd_in_container(args, start_cmd) + else: + start_cmd = f"{args.server_binary} {start_cmd} > {args.name}.server-log.txt 2>&1" + if not args.run_in_foreground: + start_cmd = f"nohup {start_cmd} &" + + # Show the command line. + self.show(start_cmd, only_show=args.show) + if args.show: + return True + + endpoint_url = f"http://{args.host_name}:{args.port}/query" + + # When running natively, check if the binary exists and works. + if args.system not in Containerize.supported_systems(): + if not binary_exists(args.server_binary, "server-binary", args): + return False + + # Check if index files (*.sst) present in index directory + if ( + len([p.name for p in Path(f"{args.name}_index/").glob("*.sst")]) + == 0 + ): + log.error(f"No Oxigraph index files for {args.name} found!\n") + log.info( + f"Did you call `{script_name} index`? If you did, check " + "if .sst index files are present in index directory." + ) + return False + + # Check if server already alive at endpoint url from a previous run + if is_server_alive(url=endpoint_url): + log.error(f"Oxigraph server already running on {endpoint_url}\n") + log.info( + f"To kill the existing server, use `{script_name} stop`" + ) + return False + + # Remove old log file so that tail starts clean. + log_file = Path(f"{args.name}.server-log.txt") + log_file.unlink(missing_ok=True) + + try: + process = run_command( + start_cmd, + use_popen=args.run_in_foreground, + ) + except Exception as e: + log.error(f"Starting the Oxigraph server failed ({e})") + return False + + # Tail the server log until the server is ready (note that the `exec` + # is important to make sure that the tail process is killed and not + # just the bash process). + if args.run_in_foreground: + log.info( + "Follow the server logs as long as the server is" + " running (Ctrl-C stops the server)" + ) + else: + log.info( + "Follow the server logs until the server is ready" + " (Ctrl-C stops following the log, but NOT the server)" + ) + log.info("") + # For containers, use `docker/podman logs -f` as Oxigraph doesn't + # support redirecting logs to a log file. A short delay ensures + # the container is up before attaching. + if args.system in Containerize.supported_systems(): + time.sleep(2) + log_cmd = f"exec {args.system} logs -f {args.server_container}" + log_proc = subprocess.Popen(log_cmd, shell=True) + else: + log_proc = tail_log_file(log_file) + if log_proc is None: + return False + while not is_server_alive(endpoint_url): + time.sleep(1) + + log.info( + f"Oxigraph server webapp for {args.name} will be available at " + f"http://{args.host_name}:{args.port} and the sparql endpoint for " + f"queries is {endpoint_url} when the server is ready" + ) + + # Kill the log process + if not args.run_in_foreground: + log_proc.terminate() + + # With `--run-in-foreground`, wait until the server is stopped. + # On Ctrl-C, terminate the process and clean up the container. + if args.run_in_foreground: + try: + process.wait() + except KeyboardInterrupt: + process.terminate() + if args.system in Containerize.supported_systems(): + args.cmdline_regex = StopCommand.DEFAULT_REGEX + StopCommand().execute(args) + log_proc.terminate() + + return True diff --git a/src/qoxigraph/commands/status.py b/src/qoxigraph/commands/status.py new file mode 100644 index 00000000..d73548dc --- /dev/null +++ b/src/qoxigraph/commands/status.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from qlever.commands.status import StatusCommand as QleverStatusCommand + + +class StatusCommand(QleverStatusCommand): + """Show Oxigraph server processes running on this machine.""" + + DEFAULT_REGEX = "oxigraph\\s+serve" + + def description(self) -> str: + return "Show Oxigraph processes running on this machine" + + def additional_arguments(self, subparser) -> None: + subparser.add_argument( + "--cmdline-regex", + default=self.DEFAULT_REGEX, + help=( + "Show only processes where the command line matches this regex" + ), + ) diff --git a/src/qoxigraph/commands/stop.py b/src/qoxigraph/commands/stop.py new file mode 100644 index 00000000..47308284 --- /dev/null +++ b/src/qoxigraph/commands/stop.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from qlever.command import QleverCommand +from qlever.commands import stop as qlever_stop +from qlever.containerize import Containerize +from qlever.log import log +from qlever.util import stop_process_with_regex +from qoxigraph.commands.status import StatusCommand + + +class StopCommand(QleverCommand): + """ + Stop the Oxigraph server for a given dataset. For native execution, + finds and kills processes matching the dataset-name regex. For + containers, stops and removes the server container. + """ + + # Override this with StatusCommand from child class for execute + # method to work as intended + STATUS_COMMAND = StatusCommand() + # %%NAME%% is replaced at runtime with the dataset name from the Qleverfile + DEFAULT_REGEX = "oxigraph\\s+serve.*%%NAME%%_index" + + def __init__(self): + pass + + def description(self) -> str: + return "Stop Oxigraph server for a given dataset" + + def should_have_qleverfile(self) -> bool: + return True + + def relevant_qleverfile_arguments(self) -> dict[str, list[str]]: + return { + "data": ["name"], + "runtime": ["system", "server_container"], + } + + def additional_arguments(self, subparser) -> None: + subparser.add_argument( + "--cmdline-regex", + default=self.DEFAULT_REGEX, + help="Show only processes where the command " + "line matches this regex", + ) + + def execute(self, args) -> bool: + # Substitute the dataset name into the regex template so we only + # match the server running for this dataset. + cmdline_regex = args.cmdline_regex + if "%%NAME%%" in args.cmdline_regex and hasattr(args, "name"): + cmdline_regex = args.cmdline_regex.replace( + "%%NAME%%", str(args.name) + ) + description = ( + f"Checking for container with name {args.server_container}" + if args.system in Containerize.supported_systems() + else f'Checking for processes matching "{cmdline_regex}"' + ) + + self.show(description, only_show=args.show) + if args.show: + return True + + if args.system not in Containerize.supported_systems(): + stop_process_results = stop_process_with_regex(cmdline_regex) + if stop_process_results is None: + return False + if len(stop_process_results) > 0: + return all(stop_process_results) + + # If no matching process found, show a message and the output of the + # status command. + log.error("No matching process found") + args.cmdline_regex = self.STATUS_COMMAND.DEFAULT_REGEX + log.info("") + StatusCommand().execute(args) + return True + + # First check if container is running and if yes, stop and remove it + return qlever_stop.stop_container(args.server_container) diff --git a/src/qoxigraph/qleverfile.py b/src/qoxigraph/qleverfile.py new file mode 100644 index 00000000..467b77fa --- /dev/null +++ b/src/qoxigraph/qleverfile.py @@ -0,0 +1,77 @@ +from __future__ import annotations + + +def qleverfile_args(all_args: dict[str, dict[str, tuple]]) -> None: + """Define additional oxigraph specific Qleverfile parameters""" + + def arg(*args, **kwargs): + return (args, kwargs) + + index_args = all_args["index"] + server_args = all_args["server"] + + index_args["index_binary"] = arg( + "--index-binary", + type=str, + default="oxigraph", + help=( + "The binary for building the index (default: oxigraph) " + "(this requires that you have oxigraph-cli installed " + "on your machine)" + ), + ) + index_args["lenient"] = arg( + "--lenient", + type=str, + choices=["yes", "no"], + default="no", + help="Attempt to keep loading even if the data file is invalid", + ) + index_args["extra_args"] = arg( + "--extra-args", + type=str, + default="", + help=( + "Additional arguments to pass directly to the oxigraph load process. " + "This allows advanced users to specify options not exposed in " + "Qleverfile. The string is appended verbatim to the command." + ), + ) + + server_args["server_binary"] = arg( + "--server-binary", + type=str, + default="oxigraph", + help=( + "The binary for starting the server (default: oxigraph) " + "(this requires that you have oxigraph-cli installed " + "on your machine)" + ), + ) + server_args["read_only"] = arg( + "--read-only", + type=str, + choices=["yes", "no"], + default="yes", + help=( + "The HTTP server will not permit mutation operations in " + "read-only mode" + ), + ) + server_args["timeout"] = arg( + "--timeout", + type=str, + default="60s", + help="The maximal time in seconds a query is allowed to run", + ) + server_args["extra_args"] = arg( + "--extra-args", + type=str, + default="", + help=( + "Additional arguments to pass directly to the oxigraph " + "serve/serve-read-only. This allows advanced users to specify " + "options not exposed in Qleverfile. The string is appended " + "verbatim to the command." + ), + )