diff --git a/CLAUDE.md b/CLAUDE.md index 073466fb..61afd073 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -13,8 +13,8 @@ This is the DNAstack client library and CLI, a Python package that provides both ## Development Commands ### Development Setup -- **IMPORTANT**: Run `eval "$(pyenv init -)" && pyenv activate dnastack-client` before any Python, make, uv, or git commit commands. This activates the correct pyenv virtualenv that has `uv` and other tools available. - `make setup` - Set up development environment with uv (creates .venv and installs dependencies) +- Use `uv run ` for all Python commands — no virtualenv activation needed. `uv` is installed via Homebrew and available on PATH. ### Running the CLI - Use `uv run dnastack` to run the CLI (no virtual environment activation needed) diff --git a/dnastack/cli/commands/publisher/questions/commands.py b/dnastack/cli/commands/publisher/questions/commands.py index 7172cba5..d92a5432 100644 --- a/dnastack/cli/commands/publisher/questions/commands.py +++ b/dnastack/cli/commands/publisher/questions/commands.py @@ -1,3 +1,4 @@ +import time from typing import Optional import click @@ -8,6 +9,7 @@ validate_question_parameters, ) from dnastack.cli.commands.explorer.questions.utils import handle_question_results_output +from dnastack.cli.commands.publisher.questions.telemetry import submit_telemetry from dnastack.cli.core.command import formatted_command from dnastack.cli.core.command_spec import ( ArgumentSpec, @@ -21,6 +23,7 @@ from dnastack.cli.helpers.iterator_printer import show_iterator from dnastack.common.logger import get_logger from dnastack.common.tracing import Span +from dnastack.feature_flags import metrics_enabled logger = get_logger(__name__) @@ -154,6 +157,16 @@ def ask_question( click.echo(f"Error: {e}", err=True) raise click.Abort() - results_iter = client.ask_question(collection, question_name, inputs, trace=trace) - results = list(results_iter) + start_time_ns = time.time_ns() + outcome = 'error' + row_count = None + try: + results_iter = client.ask_question(collection, question_name, inputs, trace=trace) + results = list(results_iter) + outcome = 'success' + row_count = len(results) + finally: + if metrics_enabled: + submit_telemetry(client, question_name, collection, start_time_ns, time.time_ns(), outcome, row_count=row_count) + handle_question_results_output(results, output_file, output) diff --git a/dnastack/cli/commands/publisher/questions/telemetry.py b/dnastack/cli/commands/publisher/questions/telemetry.py new file mode 100644 index 00000000..6967fd27 --- /dev/null +++ b/dnastack/cli/commands/publisher/questions/telemetry.py @@ -0,0 +1,83 @@ +import platform +from typing import Literal + +from dnastack.common.logger import get_logger + +logger = get_logger(__name__) + +try: + import importlib.metadata + _DNASTACK_VERSION = importlib.metadata.version('dnastack-client-library') +except Exception: + _DNASTACK_VERSION = 'unknown' + +_SEVERITY_INFO = 9 +_SEVERITY_ERROR = 17 + + +def _build_message(question_name: str, outcome: Literal['success', 'error'], duration_ms: float, row_count: int | None) -> str: + if outcome == 'success': + suffix = f" ({row_count} rows)" if row_count is not None else "" + return f"Question '{question_name}' executed successfully in {duration_ms:.1f}ms{suffix}" + else: + return f"Question '{question_name}' failed after {duration_ms:.1f}ms" + + +def build_otlp_log( + question_name: str, + collection: str, + start_time_ns: int, + end_time_ns: int, + outcome: Literal['success', 'error'], + row_count: int | None = None, +) -> dict: + """Build an OTLP logs JSON payload for a single question execution event.""" + duration_ms = (end_time_ns - start_time_ns) / 1_000_000 + severity = _SEVERITY_INFO if outcome == 'success' else _SEVERITY_ERROR + message = _build_message(question_name, outcome, duration_ms, row_count) + + attributes = [ + {"key": "question.name", "value": {"stringValue": question_name}}, + {"key": "question.collection", "value": {"stringValue": collection}}, + {"key": "question.outcome", "value": {"stringValue": outcome}}, + {"key": "question.duration_ms", "value": {"doubleValue": duration_ms}}, + *([{"key": "question.row_count", "value": {"doubleValue": float(row_count)}}] if row_count is not None else []), + {"key": "runtime.python", "value": {"stringValue": platform.python_version()}}, + ] + + return { + "resourceLogs": [{ + "resource": { + "attributes": [ + {"key": "service.name", "value": {"stringValue": "dnastack-client"}}, + {"key": "service.version", "value": {"stringValue": _DNASTACK_VERSION}}, + ] + }, + "scopeLogs": [{ + "scope": {"name": "dnastack.publisher.questions"}, + "logRecords": [{ + "timeUnixNano": str(end_time_ns), + "severityNumber": severity, + "body": {"stringValue": message}, + "attributes": attributes, + }] + }] + }] + } + + +def submit_telemetry( + client, + question_name: str, + collection: str, + start_time_ns: int, + end_time_ns: int, + outcome: Literal['success', 'error'], + row_count: int | None = None, +) -> None: + """Submit OTLP telemetry to collection-service. Errors are silently swallowed.""" + try: + payload = build_otlp_log(question_name, collection, start_time_ns, end_time_ns, outcome, row_count) + client.submit_telemetry(payload) + except Exception as e: + logger.debug(f"Telemetry submission failed (non-fatal): {e}") diff --git a/dnastack/client/collections/client.py b/dnastack/client/collections/client.py index c4d28810..84e1d572 100644 --- a/dnastack/client/collections/client.py +++ b/dnastack/client/collections/client.py @@ -400,6 +400,15 @@ def ask_question( ) ) + def submit_telemetry(self, otlp_payload: dict, trace: Optional[Span] = None) -> None: + """Submit an OTLP logs payload. Best-effort — callers should handle failures gracefully.""" + with self.create_http_session() as session: + session.post( + urljoin(self.url, 'otlp/v1/logs'), + json=otlp_payload, + trace_context=trace + ) + def data_connect_endpoint(self, collection: Union[str, Collection, None] = None, no_auth: bool = False) -> ServiceEndpoint: diff --git a/dnastack/feature_flags.py b/dnastack/feature_flags.py index 0473b3d9..a50fb844 100644 --- a/dnastack/feature_flags.py +++ b/dnastack/feature_flags.py @@ -34,3 +34,6 @@ def on_debug_mode_change(hook: Callable[[bool], None]): detailed_error = flag('DNASTACK_DETAILED_ERROR', description='Provide more details on error') show_distributed_trace_stack_on_error = flag('DNASTACK_DISPLAY_TRACE_ON_ERROR', description='Display distributed trace on error') + +metrics_enabled = flag('DNASTACK_METRICS_ENABLED', + description='Enable telemetry submission after publisher question execution') diff --git a/tests/unit/client/__init__.py b/tests/unit/client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/client/test_collection_service_client.py b/tests/unit/client/test_collection_service_client.py new file mode 100644 index 00000000..76298880 --- /dev/null +++ b/tests/unit/client/test_collection_service_client.py @@ -0,0 +1,45 @@ +from unittest.mock import MagicMock, patch + +from dnastack.client.collections.client import CollectionServiceClient +from dnastack.client.models import ServiceEndpoint + + +def _make_client(url='http://localhost:8093/'): + client = CollectionServiceClient.__new__(CollectionServiceClient) + client._endpoint = MagicMock(spec=ServiceEndpoint) + client._endpoint.url = url + return client + + +class TestSubmitTelemetry: + + def test_posts_to_otlp_logs_endpoint(self): + client = _make_client() + mock_session = MagicMock() + mock_session.__enter__ = MagicMock(return_value=mock_session) + mock_session.__exit__ = MagicMock(return_value=False) + + with patch.object(client, 'create_http_session', return_value=mock_session): + client.submit_telemetry({'resourceLogs': []}) + + mock_session.post.assert_called_once_with( + 'http://localhost:8093/otlp/v1/logs', + json={'resourceLogs': []}, + trace_context=None + ) + + def test_passes_trace_context_when_provided(self): + client = _make_client() + mock_session = MagicMock() + mock_session.__enter__ = MagicMock(return_value=mock_session) + mock_session.__exit__ = MagicMock(return_value=False) + mock_trace = MagicMock() + + with patch.object(client, 'create_http_session', return_value=mock_session): + client.submit_telemetry({'resourceLogs': []}, trace=mock_trace) + + mock_session.post.assert_called_once_with( + 'http://localhost:8093/otlp/v1/logs', + json={'resourceLogs': []}, + trace_context=mock_trace + ) diff --git a/tests/unit/publisher/__init__.py b/tests/unit/publisher/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/publisher/test_ask_question_telemetry.py b/tests/unit/publisher/test_ask_question_telemetry.py new file mode 100644 index 00000000..a2b1a678 --- /dev/null +++ b/tests/unit/publisher/test_ask_question_telemetry.py @@ -0,0 +1,87 @@ +from unittest.mock import MagicMock, patch + +import click +from click.testing import CliRunner + + +def _build_cli(): + from dnastack.cli.commands.publisher.questions.commands import init_questions_commands + + @click.group() + def cli(): + pass + + init_questions_commands(cli) + return cli + + +def _make_mock_client(raises=False): + mock_client = MagicMock() + mock_client.get_question.return_value = MagicMock(parameters=[]) + if raises: + mock_client.ask_question.side_effect = RuntimeError("network failure") + else: + mock_client.ask_question.return_value = iter([{'col': 'val'}]) + return mock_client + + +class TestAskQuestionTelemetry: + + def _invoke(self, metrics_enabled=True, raises=False): + mock_client = _make_mock_client(raises=raises) + cli = _build_cli() + + with patch('dnastack.cli.commands.publisher.questions.commands.get_collection_service_client', + return_value=mock_client), \ + patch('dnastack.cli.commands.publisher.questions.commands.metrics_enabled', metrics_enabled), \ + patch('dnastack.cli.commands.publisher.questions.commands.handle_question_results_output'), \ + patch('dnastack.cli.commands.publisher.questions.commands.submit_telemetry') as mock_submit: + runner = CliRunner() + runner.invoke(cli, ['ask', '--question-name', 'q', '--collection', 'c']) + + return mock_client, mock_submit + + def test_submits_telemetry_on_success_when_enabled(self): + _, mock_submit = self._invoke(metrics_enabled=True) + mock_submit.assert_called_once() + + def test_does_not_submit_when_disabled(self): + _, mock_submit = self._invoke(metrics_enabled=False) + mock_submit.assert_not_called() + + def test_submits_with_success_outcome_on_success(self): + _, mock_submit = self._invoke(metrics_enabled=True) + args, kwargs = mock_submit.call_args + # submit_telemetry(client, question_name, collection, start_ns, end_ns, outcome) + outcome = args[5] + assert outcome == 'success' + + def test_submits_with_error_outcome_on_failure(self): + _, mock_submit = self._invoke(metrics_enabled=True, raises=True) + mock_submit.assert_called_once() + args, kwargs = mock_submit.call_args + outcome = args[5] + assert outcome == 'error' + + def test_question_name_and_collection_passed_to_telemetry(self): + _, mock_submit = self._invoke(metrics_enabled=True) + args, kwargs = mock_submit.call_args + # submit_telemetry(client, question_name, collection, start_ns, end_ns, outcome) + assert args[1] == 'q' + assert args[2] == 'c' + + def test_original_exception_propagates(self): + """Telemetry submission in finally must not swallow the original exception.""" + mock_client = _make_mock_client(raises=True) + cli = _build_cli() + + with patch('dnastack.cli.commands.publisher.questions.commands.get_collection_service_client', + return_value=mock_client), \ + patch('dnastack.cli.commands.publisher.questions.commands.metrics_enabled', True), \ + patch('dnastack.cli.commands.publisher.questions.commands.submit_telemetry'), \ + patch('dnastack.cli.commands.publisher.questions.commands.handle_question_results_output'): + runner = CliRunner() + result = runner.invoke(cli, ['ask', '--question-name', 'q', '--collection', 'c']) + + # CliRunner catches exceptions — verify the exception was raised (exit code != 0) + assert result.exit_code != 0 diff --git a/tests/unit/publisher/test_telemetry.py b/tests/unit/publisher/test_telemetry.py new file mode 100644 index 00000000..fcc45c79 --- /dev/null +++ b/tests/unit/publisher/test_telemetry.py @@ -0,0 +1,102 @@ +import time +from unittest.mock import MagicMock + +from dnastack.cli.commands.publisher.questions.telemetry import build_otlp_log, submit_telemetry + + +class TestBuildOtlpLog: + + def test_returns_valid_otlp_logs_structure(self): + start_ns = time.time_ns() + end_ns = start_ns + 1_000_000_000 + payload = build_otlp_log('my-question', 'my-collection', start_ns, end_ns, 'success') + + assert 'resourceLogs' in payload + resource_logs = payload['resourceLogs'] + assert len(resource_logs) == 1 + scope_logs = resource_logs[0]['scopeLogs'] + assert len(scope_logs) == 1 + log_records = scope_logs[0]['logRecords'] + assert len(log_records) == 1 + + def test_success_outcome_sets_severity_info(self): + lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'success')) + assert lr['severityNumber'] == 9 + + def test_error_outcome_sets_severity_error(self): + lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'error')) + assert lr['severityNumber'] == 17 + + def test_attributes_include_question_name_and_collection(self): + lr = _first_log_record(build_otlp_log('my-q', 'my-col', 0, 1, 'success')) + attrs = _string_attrs(lr) + assert attrs['question.name'] == 'my-q' + assert attrs['question.collection'] == 'my-col' + assert attrs['question.outcome'] == 'success' + + def test_duration_ms_is_calculated_from_start_and_end(self): + start_ns = 1_000_000_000 + end_ns = 2_500_000_000 # 1500 ms later + lr = _first_log_record(build_otlp_log('q', 'c', start_ns, end_ns, 'success')) + double_attrs = _double_attrs(lr) + assert double_attrs['question.duration_ms'] == 1500.0 + + def test_row_count_included_when_provided(self): + lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'success', row_count=42)) + assert _double_attrs(lr)['question.row_count'] == 42.0 + + def test_row_count_omitted_when_not_provided(self): + lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'error')) + assert 'question.row_count' not in [a['key'] for a in lr['attributes']] + + def test_resource_attributes_include_service_name(self): + payload = build_otlp_log('q', 'c', 0, 1, 'success') + resource_attrs = {a['key']: a['value']['stringValue'] + for a in payload['resourceLogs'][0]['resource']['attributes']} + assert resource_attrs['service.name'] == 'dnastack-client' + + def test_time_unix_nano_is_end_time(self): + end_ns = 1_713_369_601_000_000_000 + lr = _first_log_record(build_otlp_log('q', 'c', 0, end_ns, 'success')) + assert lr['timeUnixNano'] == str(end_ns) + + def test_success_message_includes_question_name_and_duration(self): + start_ns = 0 + end_ns = 1_500_000_000 # 1500 ms + lr = _first_log_record(build_otlp_log('my-question', 'c', start_ns, end_ns, 'success')) + assert 'my-question' in lr['body']['stringValue'] + assert '1500.0ms' in lr['body']['stringValue'] + + def test_success_message_includes_row_count_when_provided(self): + lr = _first_log_record(build_otlp_log('q', 'c', 0, 1_000_000_000, 'success', row_count=7)) + assert '7 rows' in lr['body']['stringValue'] + + def test_error_message_includes_question_name_and_duration(self): + lr = _first_log_record(build_otlp_log('bad-question', 'c', 0, 500_000_000, 'error')) + assert 'bad-question' in lr['body']['stringValue'] + assert '500.0ms' in lr['body']['stringValue'] + + +class TestSubmitTelemetry: + + def test_calls_client_submit_telemetry(self): + client = MagicMock() + submit_telemetry(client, 'q', 'c', 0, 1, 'success') + client.submit_telemetry.assert_called_once() + + def test_swallows_client_errors_silently(self): + client = MagicMock() + client.submit_telemetry.side_effect = Exception("network error") + submit_telemetry(client, 'q', 'c', 0, 1, 'success') # must not raise + + +def _first_log_record(otlp_payload: dict) -> dict: + return otlp_payload['resourceLogs'][0]['scopeLogs'][0]['logRecords'][0] + + +def _string_attrs(log_record: dict) -> dict: + return {a['key']: a['value']['stringValue'] for a in log_record['attributes'] if 'stringValue' in a['value']} + + +def _double_attrs(log_record: dict) -> dict: + return {a['key']: a['value']['doubleValue'] for a in log_record['attributes'] if 'doubleValue' in a['value']} diff --git a/tests/unit/test_feature_flags.py b/tests/unit/test_feature_flags.py new file mode 100644 index 00000000..fd865848 --- /dev/null +++ b/tests/unit/test_feature_flags.py @@ -0,0 +1,24 @@ +import importlib +import os + +import pytest + + +@pytest.fixture(autouse=True) +def clean_metrics_env(): + os.environ.pop('DNASTACK_METRICS_ENABLED', None) + yield + os.environ.pop('DNASTACK_METRICS_ENABLED', None) + + +def test_metrics_enabled_defaults_to_false(): + import dnastack.feature_flags as ff + importlib.reload(ff) + assert ff.metrics_enabled is False + + +def test_metrics_enabled_is_true_when_set_to_1(): + os.environ['DNASTACK_METRICS_ENABLED'] = '1' + import dnastack.feature_flags as ff + importlib.reload(ff) + assert ff.metrics_enabled is True