Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ This is the DNAstack client library and CLI, a Python package that provides both
## Development Commands

### Development Setup
- **IMPORTANT**: Run `eval "$(pyenv init -)" && pyenv activate dnastack-client` before any Python, make, uv, or git commit commands. This activates the correct pyenv virtualenv that has `uv` and other tools available.
- `make setup` - Set up development environment with uv (creates .venv and installs dependencies)
- Use `uv run <command>` for all Python commands — no virtualenv activation needed. `uv` is installed via Homebrew and available on PATH.

### Running the CLI
- Use `uv run dnastack` to run the CLI (no virtual environment activation needed)
Expand Down
17 changes: 15 additions & 2 deletions dnastack/cli/commands/publisher/questions/commands.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from typing import Optional

import click
Expand All @@ -8,6 +9,7 @@
validate_question_parameters,
)
from dnastack.cli.commands.explorer.questions.utils import handle_question_results_output
from dnastack.cli.commands.publisher.questions.telemetry import submit_telemetry
from dnastack.cli.core.command import formatted_command
from dnastack.cli.core.command_spec import (
ArgumentSpec,
Expand All @@ -21,6 +23,7 @@
from dnastack.cli.helpers.iterator_printer import show_iterator
from dnastack.common.logger import get_logger
from dnastack.common.tracing import Span
from dnastack.feature_flags import metrics_enabled

logger = get_logger(__name__)

Expand Down Expand Up @@ -154,6 +157,16 @@ def ask_question(
click.echo(f"Error: {e}", err=True)
raise click.Abort()

results_iter = client.ask_question(collection, question_name, inputs, trace=trace)
results = list(results_iter)
start_time_ns = time.time_ns()
outcome = 'error'
row_count = None
try:
results_iter = client.ask_question(collection, question_name, inputs, trace=trace)
results = list(results_iter)
outcome = 'success'
row_count = len(results)
finally:
if metrics_enabled:
submit_telemetry(client, question_name, collection, start_time_ns, time.time_ns(), outcome, row_count=row_count)

handle_question_results_output(results, output_file, output)
83 changes: 83 additions & 0 deletions dnastack/cli/commands/publisher/questions/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import platform
from typing import Literal

from dnastack.common.logger import get_logger

logger = get_logger(__name__)

try:
import importlib.metadata
_DNASTACK_VERSION = importlib.metadata.version('dnastack-client-library')
except Exception:
_DNASTACK_VERSION = 'unknown'

_SEVERITY_INFO = 9
_SEVERITY_ERROR = 17


def _build_message(question_name: str, outcome: Literal['success', 'error'], duration_ms: float, row_count: int | None) -> str:
if outcome == 'success':
suffix = f" ({row_count} rows)" if row_count is not None else ""
return f"Question '{question_name}' executed successfully in {duration_ms:.1f}ms{suffix}"
else:
return f"Question '{question_name}' failed after {duration_ms:.1f}ms"


def build_otlp_log(
question_name: str,
collection: str,
start_time_ns: int,
end_time_ns: int,
outcome: Literal['success', 'error'],
row_count: int | None = None,
) -> dict:
"""Build an OTLP logs JSON payload for a single question execution event."""
duration_ms = (end_time_ns - start_time_ns) / 1_000_000
severity = _SEVERITY_INFO if outcome == 'success' else _SEVERITY_ERROR
message = _build_message(question_name, outcome, duration_ms, row_count)

attributes = [
{"key": "question.name", "value": {"stringValue": question_name}},
{"key": "question.collection", "value": {"stringValue": collection}},
{"key": "question.outcome", "value": {"stringValue": outcome}},
{"key": "question.duration_ms", "value": {"doubleValue": duration_ms}},
*([{"key": "question.row_count", "value": {"doubleValue": float(row_count)}}] if row_count is not None else []),
{"key": "runtime.python", "value": {"stringValue": platform.python_version()}},
]

return {
"resourceLogs": [{
"resource": {
"attributes": [
{"key": "service.name", "value": {"stringValue": "dnastack-client"}},
{"key": "service.version", "value": {"stringValue": _DNASTACK_VERSION}},
]
},
"scopeLogs": [{
"scope": {"name": "dnastack.publisher.questions"},
"logRecords": [{
"timeUnixNano": str(end_time_ns),
"severityNumber": severity,
"body": {"stringValue": message},
"attributes": attributes,
}]
}]
}]
}


def submit_telemetry(
client,
question_name: str,
collection: str,
start_time_ns: int,
end_time_ns: int,
outcome: Literal['success', 'error'],
row_count: int | None = None,
) -> None:
"""Submit OTLP telemetry to collection-service. Errors are silently swallowed."""
try:
payload = build_otlp_log(question_name, collection, start_time_ns, end_time_ns, outcome, row_count)
client.submit_telemetry(payload)
except Exception as e:
logger.debug(f"Telemetry submission failed (non-fatal): {e}")
9 changes: 9 additions & 0 deletions dnastack/client/collections/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,15 @@ def ask_question(
)
)

def submit_telemetry(self, otlp_payload: dict, trace: Optional[Span] = None) -> None:
"""Submit an OTLP logs payload. Best-effort — callers should handle failures gracefully."""
with self.create_http_session() as session:
session.post(
urljoin(self.url, 'otlp/v1/logs'),
json=otlp_payload,
trace_context=trace
)

def data_connect_endpoint(self,
collection: Union[str, Collection, None] = None,
no_auth: bool = False) -> ServiceEndpoint:
Expand Down
3 changes: 3 additions & 0 deletions dnastack/feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ def on_debug_mode_change(hook: Callable[[bool], None]):
detailed_error = flag('DNASTACK_DETAILED_ERROR', description='Provide more details on error')
show_distributed_trace_stack_on_error = flag('DNASTACK_DISPLAY_TRACE_ON_ERROR',
description='Display distributed trace on error')

metrics_enabled = flag('DNASTACK_METRICS_ENABLED',
description='Enable telemetry submission after publisher question execution')
Empty file added tests/unit/client/__init__.py
Empty file.
45 changes: 45 additions & 0 deletions tests/unit/client/test_collection_service_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from unittest.mock import MagicMock, patch

from dnastack.client.collections.client import CollectionServiceClient
from dnastack.client.models import ServiceEndpoint


def _make_client(url='http://localhost:8093/'):
client = CollectionServiceClient.__new__(CollectionServiceClient)
client._endpoint = MagicMock(spec=ServiceEndpoint)
client._endpoint.url = url
return client


class TestSubmitTelemetry:

def test_posts_to_otlp_logs_endpoint(self):
client = _make_client()
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=False)

with patch.object(client, 'create_http_session', return_value=mock_session):
client.submit_telemetry({'resourceLogs': []})

mock_session.post.assert_called_once_with(
'http://localhost:8093/otlp/v1/logs',
json={'resourceLogs': []},
trace_context=None
)

def test_passes_trace_context_when_provided(self):
client = _make_client()
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=False)
mock_trace = MagicMock()

with patch.object(client, 'create_http_session', return_value=mock_session):
client.submit_telemetry({'resourceLogs': []}, trace=mock_trace)

mock_session.post.assert_called_once_with(
'http://localhost:8093/otlp/v1/logs',
json={'resourceLogs': []},
trace_context=mock_trace
)
Empty file.
87 changes: 87 additions & 0 deletions tests/unit/publisher/test_ask_question_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from unittest.mock import MagicMock, patch

import click
from click.testing import CliRunner


def _build_cli():
from dnastack.cli.commands.publisher.questions.commands import init_questions_commands

@click.group()
def cli():
pass

init_questions_commands(cli)
return cli


def _make_mock_client(raises=False):
mock_client = MagicMock()
mock_client.get_question.return_value = MagicMock(parameters=[])
if raises:
mock_client.ask_question.side_effect = RuntimeError("network failure")
else:
mock_client.ask_question.return_value = iter([{'col': 'val'}])
return mock_client


class TestAskQuestionTelemetry:

def _invoke(self, metrics_enabled=True, raises=False):
mock_client = _make_mock_client(raises=raises)
cli = _build_cli()

with patch('dnastack.cli.commands.publisher.questions.commands.get_collection_service_client',
return_value=mock_client), \
patch('dnastack.cli.commands.publisher.questions.commands.metrics_enabled', metrics_enabled), \
patch('dnastack.cli.commands.publisher.questions.commands.handle_question_results_output'), \
patch('dnastack.cli.commands.publisher.questions.commands.submit_telemetry') as mock_submit:
runner = CliRunner()
runner.invoke(cli, ['ask', '--question-name', 'q', '--collection', 'c'])

return mock_client, mock_submit

def test_submits_telemetry_on_success_when_enabled(self):
_, mock_submit = self._invoke(metrics_enabled=True)
mock_submit.assert_called_once()

def test_does_not_submit_when_disabled(self):
_, mock_submit = self._invoke(metrics_enabled=False)
mock_submit.assert_not_called()

def test_submits_with_success_outcome_on_success(self):
_, mock_submit = self._invoke(metrics_enabled=True)
args, kwargs = mock_submit.call_args
# submit_telemetry(client, question_name, collection, start_ns, end_ns, outcome)
outcome = args[5]
assert outcome == 'success'

def test_submits_with_error_outcome_on_failure(self):
_, mock_submit = self._invoke(metrics_enabled=True, raises=True)
mock_submit.assert_called_once()
args, kwargs = mock_submit.call_args
outcome = args[5]
assert outcome == 'error'

def test_question_name_and_collection_passed_to_telemetry(self):
_, mock_submit = self._invoke(metrics_enabled=True)
args, kwargs = mock_submit.call_args
# submit_telemetry(client, question_name, collection, start_ns, end_ns, outcome)
assert args[1] == 'q'
assert args[2] == 'c'

def test_original_exception_propagates(self):
"""Telemetry submission in finally must not swallow the original exception."""
mock_client = _make_mock_client(raises=True)
cli = _build_cli()

with patch('dnastack.cli.commands.publisher.questions.commands.get_collection_service_client',
return_value=mock_client), \
patch('dnastack.cli.commands.publisher.questions.commands.metrics_enabled', True), \
patch('dnastack.cli.commands.publisher.questions.commands.submit_telemetry'), \
patch('dnastack.cli.commands.publisher.questions.commands.handle_question_results_output'):
runner = CliRunner()
result = runner.invoke(cli, ['ask', '--question-name', 'q', '--collection', 'c'])

# CliRunner catches exceptions — verify the exception was raised (exit code != 0)
assert result.exit_code != 0
102 changes: 102 additions & 0 deletions tests/unit/publisher/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import time
from unittest.mock import MagicMock

from dnastack.cli.commands.publisher.questions.telemetry import build_otlp_log, submit_telemetry


class TestBuildOtlpLog:

def test_returns_valid_otlp_logs_structure(self):
start_ns = time.time_ns()
end_ns = start_ns + 1_000_000_000
payload = build_otlp_log('my-question', 'my-collection', start_ns, end_ns, 'success')

assert 'resourceLogs' in payload
resource_logs = payload['resourceLogs']
assert len(resource_logs) == 1
scope_logs = resource_logs[0]['scopeLogs']
assert len(scope_logs) == 1
log_records = scope_logs[0]['logRecords']
assert len(log_records) == 1

def test_success_outcome_sets_severity_info(self):
lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'success'))
assert lr['severityNumber'] == 9

def test_error_outcome_sets_severity_error(self):
lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'error'))
assert lr['severityNumber'] == 17

def test_attributes_include_question_name_and_collection(self):
lr = _first_log_record(build_otlp_log('my-q', 'my-col', 0, 1, 'success'))
attrs = _string_attrs(lr)
assert attrs['question.name'] == 'my-q'
assert attrs['question.collection'] == 'my-col'
assert attrs['question.outcome'] == 'success'

def test_duration_ms_is_calculated_from_start_and_end(self):
start_ns = 1_000_000_000
end_ns = 2_500_000_000 # 1500 ms later
lr = _first_log_record(build_otlp_log('q', 'c', start_ns, end_ns, 'success'))
double_attrs = _double_attrs(lr)
assert double_attrs['question.duration_ms'] == 1500.0

def test_row_count_included_when_provided(self):
lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'success', row_count=42))
assert _double_attrs(lr)['question.row_count'] == 42.0

def test_row_count_omitted_when_not_provided(self):
lr = _first_log_record(build_otlp_log('q', 'c', 0, 1, 'error'))
assert 'question.row_count' not in [a['key'] for a in lr['attributes']]

def test_resource_attributes_include_service_name(self):
payload = build_otlp_log('q', 'c', 0, 1, 'success')
resource_attrs = {a['key']: a['value']['stringValue']
for a in payload['resourceLogs'][0]['resource']['attributes']}
assert resource_attrs['service.name'] == 'dnastack-client'

def test_time_unix_nano_is_end_time(self):
end_ns = 1_713_369_601_000_000_000
lr = _first_log_record(build_otlp_log('q', 'c', 0, end_ns, 'success'))
assert lr['timeUnixNano'] == str(end_ns)

def test_success_message_includes_question_name_and_duration(self):
start_ns = 0
end_ns = 1_500_000_000 # 1500 ms
lr = _first_log_record(build_otlp_log('my-question', 'c', start_ns, end_ns, 'success'))
assert 'my-question' in lr['body']['stringValue']
assert '1500.0ms' in lr['body']['stringValue']

def test_success_message_includes_row_count_when_provided(self):
lr = _first_log_record(build_otlp_log('q', 'c', 0, 1_000_000_000, 'success', row_count=7))
assert '7 rows' in lr['body']['stringValue']

def test_error_message_includes_question_name_and_duration(self):
lr = _first_log_record(build_otlp_log('bad-question', 'c', 0, 500_000_000, 'error'))
assert 'bad-question' in lr['body']['stringValue']
assert '500.0ms' in lr['body']['stringValue']


class TestSubmitTelemetry:

def test_calls_client_submit_telemetry(self):
client = MagicMock()
submit_telemetry(client, 'q', 'c', 0, 1, 'success')
client.submit_telemetry.assert_called_once()

def test_swallows_client_errors_silently(self):
client = MagicMock()
client.submit_telemetry.side_effect = Exception("network error")
submit_telemetry(client, 'q', 'c', 0, 1, 'success') # must not raise


def _first_log_record(otlp_payload: dict) -> dict:
return otlp_payload['resourceLogs'][0]['scopeLogs'][0]['logRecords'][0]


def _string_attrs(log_record: dict) -> dict:
return {a['key']: a['value']['stringValue'] for a in log_record['attributes'] if 'stringValue' in a['value']}


def _double_attrs(log_record: dict) -> dict:
return {a['key']: a['value']['doubleValue'] for a in log_record['attributes'] if 'doubleValue' in a['value']}
Loading
Loading