diff --git a/README.md b/README.md index 2d67a75..362aed7 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ **Key Features:** - šŸŽÆ **Conversation Tracking**: Automatic multi-turn conversation tracking with `conversation_context` +- šŸ¤– **Agent Tracking**: First-class agent identity with `agent_context` (OTel `gen_ai.agent.*` semantic conventions) - šŸ”„ **Workflow Management**: Track complex multi-step AI workflows with `workflow_context` - šŸŽØ **Zero-Touch Instrumentation**: `@observe()` decorator for automatic tracking - šŸ“Š **Context Propagation**: Thread-safe attribute tracking across nested operations @@ -25,6 +26,7 @@ ### Core Tracking - šŸŽÆ **Conversation Tracking**: Multi-turn conversations with `gen_ai.conversation.id` and turn numbers +- šŸ¤– **Agent Identity**: Track agents with `gen_ai.agent.id`, `gen_ai.agent.name`, `gen_ai.agent.version` (OTel semantic conventions) - šŸ”„ **Workflow Management**: Track multi-step AI operations across LLM calls, tools, and retrievals - šŸ“Š **Auto-Context Propagation**: Thread-safe context managers that automatically tag all nested operations - šŸŽØ **Decorator Pattern**: `@observe()` for zero-touch instrumentation with full input/output/latency tracking @@ -143,6 +145,31 @@ with conversation_context(conversation_id="support_123"): result = lookup_and_respond() ``` +### Track Agents + +Track agent identity using [OTel GenAI semantic conventions](https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/) (`gen_ai.agent.*`): + +```python +from last9_genai import agent_context + +# Track agent identity — all child spans get gen_ai.agent.* attributes +with agent_context(agent_id="support_bot_v2", agent_name="Support Bot", agent_version="2.0"): + response = client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": "Help me with my order"}] + ) + # Span automatically has gen_ai.agent.id, gen_ai.agent.name, gen_ai.agent.version + +# Nest with conversations for full context +with conversation_context(conversation_id="session_123", user_id="user_456"): + with agent_context(agent_id="router_agent", agent_name="Router"): + route = classify_intent(query) + + with agent_context(agent_id="support_agent", agent_name="Support"): + response = handle_support(query) + # Each agent's spans are tagged separately, both share the conversation +``` + ### Decorator Pattern (Zero-Touch) Use `@observe()` for automatic tracking of everything: @@ -479,6 +506,11 @@ workflow.llm_calls = 3 # Conversation gen_ai.conversation.id = "session_123" gen_ai.conversation.turn_number = 2 + +# Agent (OTel GenAI semantic conventions) +gen_ai.agent.id = "support_bot_v2" +gen_ai.agent.name = "Support Bot" +gen_ai.agent.version = "2.0" ``` ## Model Pricing @@ -569,6 +601,7 @@ See [`examples/`](./examples/) directory: **Advanced:** - [`conversation_tracking.py`](./examples/conversation_tracking.py) - Multi-turn conversations +- [`agent_tracking.py`](./examples/agent_tracking.py) - Agent identity tracking with OTel semantic conventions ## Contributing diff --git a/examples/agent_tracking.py b/examples/agent_tracking.py new file mode 100644 index 0000000..a55bef0 --- /dev/null +++ b/examples/agent_tracking.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +Agent identity tracking example + +Demonstrates tracking agent identity using OTel GenAI semantic conventions +(gen_ai.agent.id, gen_ai.agent.name, gen_ai.agent.version). + +This is useful for multi-agent systems where you need to attribute spans +to specific agents and correlate their interactions. +""" + +import sys +import os + +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +import time +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from last9_genai import ( + Last9SpanProcessor, + ModelPricing, + agent_context, + conversation_context, + workflow_context, +) + + +def setup_tracing(): + """Set up OpenTelemetry tracing with Last9 auto-enrichment""" + provider = TracerProvider() + trace.set_tracer_provider(provider) + + console_exporter = ConsoleSpanExporter() + provider.add_span_processor(BatchSpanProcessor(console_exporter)) + + custom_pricing = { + "gpt-4o": ModelPricing(input=2.50, output=10.0), + "gpt-4o-mini": ModelPricing(input=0.15, output=0.60), + } + l9_processor = Last9SpanProcessor(custom_pricing=custom_pricing) + provider.add_span_processor(l9_processor) + + return trace.get_tracer(__name__) + + +def simulate_llm_call(tracer, model: str, prompt: str) -> dict: + """Simulate an LLM API call""" + with tracer.start_span("gen_ai.chat.completions") as span: + time.sleep(0.05) + span.set_attribute("gen_ai.request.model", model) + span.set_attribute("gen_ai.operation.name", "chat") + span.set_attribute("gen_ai.usage.input_tokens", len(prompt.split()) * 2) + span.set_attribute("gen_ai.usage.output_tokens", 50) + return {"response": f"Response to: {prompt[:40]}..."} + + +def single_agent_example(): + """Basic agent context example""" + tracer = setup_tracing() + + print("\n--- Example 1: Single agent tracking ---\n") + + with agent_context(agent_id="support_bot_v2", agent_name="Support Bot", agent_version="2.0"): + result = simulate_llm_call(tracer, "gpt-4o", "Help me with my order") + print(f" Response: {result['response']}") + + print("\n Span attributes:") + print(" gen_ai.agent.id = 'support_bot_v2'") + print(" gen_ai.agent.name = 'Support Bot'") + print(" gen_ai.agent.version = '2.0'") + + +def multi_agent_routing_example(): + """Multi-agent system with routing""" + tracer = setup_tracing() + + print("\n--- Example 2: Multi-agent routing ---\n") + + with conversation_context(conversation_id="session_abc", user_id="user_42"): + # Router agent classifies intent + with agent_context(agent_id="router_v1", agent_name="Router Agent"): + intent = simulate_llm_call(tracer, "gpt-4o-mini", "Classify: refund my order") + print(f" Router: {intent['response']}") + + # Specialist agent handles the request + with agent_context( + agent_id="refund_agent_v3", agent_name="Refund Agent", agent_version="3.1" + ): + response = simulate_llm_call(tracer, "gpt-4o", "Process refund for order #12345") + print(f" Refund Agent: {response['response']}") + + print("\n Router spans: gen_ai.agent.id='router_v1', conversation_id='session_abc'") + print(" Refund spans: gen_ai.agent.id='refund_agent_v3', conversation_id='session_abc'") + + +def agent_with_workflow_example(): + """Agent context nested with workflow context""" + tracer = setup_tracing() + + print("\n--- Example 3: Agent + workflow nesting ---\n") + + with conversation_context(conversation_id="session_xyz"): + with agent_context(agent_id="rag_agent", agent_name="RAG Agent", agent_version="1.0"): + with workflow_context(workflow_id="retrieval_pipeline", workflow_type="rag"): + simulate_llm_call(tracer, "gpt-4o-mini", "Expand query: best restaurants") + simulate_llm_call(tracer, "gpt-4o", "Synthesize answer from documents") + print(" RAG pipeline completed") + + print("\n All spans have:") + print(" gen_ai.conversation.id = 'session_xyz'") + print(" gen_ai.agent.id = 'rag_agent'") + print(" workflow.id = 'retrieval_pipeline'") + + +if __name__ == "__main__": + print("Last9 GenAI - Agent Identity Tracking (OTel Semantic Conventions)") + print("=" * 70) + + try: + single_agent_example() + multi_agent_routing_example() + agent_with_workflow_example() + + trace.get_tracer_provider().force_flush(timeout_millis=5000) + + print("\n" + "=" * 70) + print("All agent tracking examples completed!") + print("\nAttributes follow OTel GenAI semantic conventions:") + print(" gen_ai.agent.id - Unique agent identifier") + print(" gen_ai.agent.name - Human-readable name") + print(" gen_ai.agent.version - Agent version") + print("\nSee: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/") + + except Exception as e: + print(f"Error: {e}") + import traceback + + traceback.print_exc() diff --git a/examples/context_tracking.py b/examples/context_tracking.py index 91651ef..3f8ecb8 100644 --- a/examples/context_tracking.py +++ b/examples/context_tracking.py @@ -23,6 +23,7 @@ ModelPricing, conversation_context, workflow_context, + agent_context, propagate_attributes, ) @@ -202,11 +203,35 @@ def simulated_chat_endpoint(session_id: str, user_id: str, message: str): print(" - Zero manual attribute setting!") +def agent_context_example(): + """Agent identity tracking with OTel semantic conventions""" + tracer = setup_tracing() + + print("\nšŸ”„ Example 5: Agent identity tracking\n") + + with conversation_context(conversation_id="multi_agent_session", user_id="user_agent"): + # Router agent + with agent_context(agent_id="router_v1", agent_name="Router Agent"): + simulate_llm_call(tracer, "gpt-3.5-turbo", "Classify user intent") + print(" āœ… Router agent classified intent") + + # Specialist agent + with agent_context(agent_id="support_v2", agent_name="Support Agent", agent_version="2.0"): + simulate_llm_call(tracer, "gpt-4o", "Handle support request") + print(" āœ… Support agent handled request") + + print("\n Agent spans automatically have:") + print(" - gen_ai.agent.id (unique per agent)") + print(" - gen_ai.agent.name (human-readable)") + print(" - gen_ai.agent.version (when provided)") + print(" - gen_ai.conversation.id (from parent context)") + + def multi_turn_conversation_example(): """Example with turn numbers""" tracer = setup_tracing() - print("\nšŸ”„ Example 5: Multi-turn conversation with turn tracking\n") + print("\nšŸ”„ Example 6: Multi-turn conversation with turn tracking\n") conversation_id = "multi_turn_session" messages = ["Hello!", "What's the weather?", "Thank you!"] @@ -236,6 +261,7 @@ def multi_turn_conversation_example(): nested_workflow_example() propagate_attributes_example() fastapi_pattern_example() + agent_context_example() multi_turn_conversation_example() # Force export of spans diff --git a/last9_genai/__init__.py b/last9_genai/__init__.py index 4dc1de1..4bb9e7a 100644 --- a/last9_genai/__init__.py +++ b/last9_genai/__init__.py @@ -59,6 +59,7 @@ propagate_attributes, conversation_context, workflow_context, + agent_context, get_current_context, clear_context, ) @@ -93,6 +94,7 @@ "propagate_attributes", "conversation_context", "workflow_context", + "agent_context", "get_current_context", "clear_context", # Span processor diff --git a/last9_genai/context.py b/last9_genai/context.py index 2007f20..025518f 100644 --- a/last9_genai/context.py +++ b/last9_genai/context.py @@ -15,6 +15,9 @@ _user_id: ContextVar[Optional[str]] = ContextVar("user_id", default=None) _workflow_id: ContextVar[Optional[str]] = ContextVar("workflow_id", default=None) _workflow_type: ContextVar[Optional[str]] = ContextVar("workflow_type", default=None) +_agent_id: ContextVar[Optional[str]] = ContextVar("agent_id", default=None) +_agent_name: ContextVar[Optional[str]] = ContextVar("agent_name", default=None) +_agent_version: ContextVar[Optional[str]] = ContextVar("agent_version", default=None) _custom_attributes: ContextVar[Dict[str, Any]] = ContextVar("custom_attributes", default={}) @@ -77,6 +80,18 @@ def get_current_context() -> Dict[str, Any]: if workflow_type is not None: context["workflow_type"] = workflow_type + agent_id = _agent_id.get() + if agent_id is not None: + context["agent_id"] = agent_id + + agent_name = _agent_name.get() + if agent_name is not None: + context["agent_name"] = agent_name + + agent_version = _agent_version.get() + if agent_version is not None: + context["agent_version"] = agent_version + turn_number = _conversation_turn.get() if turn_number is not None: context["turn_number"] = turn_number @@ -94,6 +109,9 @@ def clear_context() -> None: _user_id.set(None) _workflow_id.set(None) _workflow_type.set(None) + _agent_id.set(None) + _agent_name.set(None) + _agent_version.set(None) _conversation_turn.set(None) _custom_attributes.set({}) @@ -211,3 +229,62 @@ def workflow_context( _workflow_type.set(prev_wf_type) _user_id.set(prev_user_id) _custom_attributes.set(prev_custom) + + +@contextmanager +def agent_context( + agent_id: str, + agent_name: Optional[str] = None, + agent_version: Optional[str] = None, + **custom_attrs, +): + """ + Context manager for agent tracking using OTel GenAI semantic conventions. + + All spans created within this context will automatically have + gen_ai.agent.id, gen_ai.agent.name, and gen_ai.agent.version attributes. + + Args: + agent_id: Unique agent identifier (gen_ai.agent.id) + agent_name: Human-readable agent name (gen_ai.agent.name) + agent_version: Agent version (gen_ai.agent.version) + **custom_attrs: Additional custom attributes + + Example: + ```python + with agent_context(agent_id="agent_123", agent_name="Support Bot", agent_version="2.0"): + # All spans automatically tagged with gen_ai.agent.id + response = client.chat.completions.create(...) + ``` + + # Can be nested with conversation and workflow contexts: + ```python + with conversation_context(conversation_id="session_123"): + with agent_context(agent_id="agent_xyz", agent_name="Router"): + # Both conversation AND agent tracked + result = route_request(query) + ``` + """ + prev_agent_id = _agent_id.get() + prev_agent_name = _agent_name.get() + prev_agent_version = _agent_version.get() + prev_custom = _custom_attributes.get() + + try: + _agent_id.set(agent_id) + if agent_name is not None: + _agent_name.set(agent_name) + if agent_version is not None: + _agent_version.set(agent_version) + if custom_attrs: + merged = prev_custom.copy() if prev_custom else {} + merged.update(custom_attrs) + _custom_attributes.set(merged) + + yield + + finally: + _agent_id.set(prev_agent_id) + _agent_name.set(prev_agent_name) + _agent_version.set(prev_agent_version) + _custom_attributes.set(prev_custom) diff --git a/last9_genai/core.py b/last9_genai/core.py index 3d31cc1..4abb5a0 100644 --- a/last9_genai/core.py +++ b/last9_genai/core.py @@ -88,6 +88,12 @@ class GenAIAttributes: PROMPT_HASH = "gen_ai.prompt.hash" PROMPT_TEMPLATE_ID = "gen_ai.prompt.template_id" + # Agent attributes (OTel GenAI semantic conventions - experimental) + AGENT_ID = "gen_ai.agent.id" + AGENT_NAME = "gen_ai.agent.name" + AGENT_DESCRIPTION = "gen_ai.agent.description" + AGENT_VERSION = "gen_ai.agent.version" + # Tool attributes TOOL_NAME = "gen_ai.tool.name" TOOL_TYPE = "gen_ai.tool.type" @@ -145,6 +151,8 @@ class Operations: EMBEDDINGS = "embeddings" TEXT_COMPLETION = "text.completion" TOOL_CALL = "tool.call" + CREATE_AGENT = "create_agent" + INVOKE_AGENT = "invoke_agent" class Providers: diff --git a/last9_genai/processor.py b/last9_genai/processor.py index 120c670..d4991fb 100644 --- a/last9_genai/processor.py +++ b/last9_genai/processor.py @@ -144,6 +144,16 @@ def _add_context_attributes_on_start(self, span: "Span") -> None: if "workflow_type" in context: span.set_attribute("workflow.type", context["workflow_type"]) + # Add agent attributes (OTel GenAI semantic conventions) + if "agent_id" in context: + span.set_attribute(GenAIAttributes.AGENT_ID, context["agent_id"]) + + if "agent_name" in context: + span.set_attribute(GenAIAttributes.AGENT_NAME, context["agent_name"]) + + if "agent_version" in context: + span.set_attribute(GenAIAttributes.AGENT_VERSION, context["agent_version"]) + # Add any custom attributes for key, value in context.items(): if key not in [ @@ -152,6 +162,9 @@ def _add_context_attributes_on_start(self, span: "Span") -> None: "user_id", "workflow_id", "workflow_type", + "agent_id", + "agent_name", + "agent_version", ]: span.set_attribute(f"custom.{key}", str(value)) diff --git a/tests/test_context.py b/tests/test_context.py index 7a38955..e4b2232 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -8,6 +8,7 @@ from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from last9_genai import ( + agent_context, conversation_context, workflow_context, propagate_attributes, @@ -249,6 +250,176 @@ def test_deeply_nested_contexts(self, tracer_setup): assert spans[0].attributes["workflow.id"] == "wf_level_1" +class TestAgentContext: + """Test agent_context() context manager""" + + def test_agent_context_basic(self, tracer_setup): + """Test basic agent_context with just agent_id""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="agent_123"): + with tracer.start_as_current_span("test_span"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "agent_123" + + def test_agent_context_with_all_fields(self, tracer_setup): + """Test agent_context with id, name, and version""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="bot_v2", agent_name="Support Bot", agent_version="2.0"): + with tracer.start_as_current_span("test_span"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "bot_v2" + assert spans[0].attributes[GenAIAttributes.AGENT_NAME] == "Support Bot" + assert spans[0].attributes[GenAIAttributes.AGENT_VERSION] == "2.0" + + def test_agent_context_propagates_to_nested_spans(self, tracer_setup): + """Test that agent context propagates to all nested spans""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="nested_agent", agent_name="Nested"): + with tracer.start_as_current_span("parent"): + with tracer.start_as_current_span("child"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "nested_agent" + assert spans[1].attributes[GenAIAttributes.AGENT_ID] == "nested_agent" + + def test_agent_context_cleanup(self, tracer_setup): + """Test that agent context is cleaned up after exit""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="temp_agent"): + context = get_current_context() + assert context["agent_id"] == "temp_agent" + + context = get_current_context() + assert "agent_id" not in context or context.get("agent_id") != "temp_agent" + + def test_agent_context_override(self, tracer_setup): + """Test that inner agent context overrides outer""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="outer_agent", agent_name="Outer"): + with agent_context(agent_id="inner_agent", agent_name="Inner"): + with tracer.start_as_current_span("test_span"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "inner_agent" + assert spans[0].attributes[GenAIAttributes.AGENT_NAME] == "Inner" + + def test_multi_agent_sequential(self, tracer_setup): + """Test sequential agent contexts (multi-agent routing)""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="router", agent_name="Router"): + with tracer.start_as_current_span("route"): + pass + + with agent_context(agent_id="handler", agent_name="Handler"): + with tracer.start_as_current_span("handle"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "router" + assert spans[1].attributes[GenAIAttributes.AGENT_ID] == "handler" + + def test_agent_with_conversation_context(self, tracer_setup): + """Test agent nested inside conversation""" + tracer, memory_exporter = tracer_setup + + with conversation_context(conversation_id="conv_abc", user_id="user_1"): + with agent_context(agent_id="agent_xyz", agent_name="Bot"): + with tracer.start_as_current_span("test_span"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAIAttributes.CONVERSATION_ID] == "conv_abc" + assert spans[0].attributes["user.id"] == "user_1" + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "agent_xyz" + assert spans[0].attributes[GenAIAttributes.AGENT_NAME] == "Bot" + + def test_agent_with_workflow_context(self, tracer_setup): + """Test agent nested with workflow""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="rag_agent", agent_name="RAG"): + with workflow_context(workflow_id="retrieval", workflow_type="rag"): + with tracer.start_as_current_span("test_span"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "rag_agent" + assert spans[0].attributes["workflow.id"] == "retrieval" + assert spans[0].attributes["workflow.type"] == "rag" + + def test_agent_conversation_workflow_triple_nesting(self, tracer_setup): + """Test all three contexts nested together""" + tracer, memory_exporter = tracer_setup + + with conversation_context(conversation_id="session_1"): + with agent_context(agent_id="agent_1", agent_name="Agent", agent_version="1.0"): + with workflow_context(workflow_id="wf_1"): + with tracer.start_as_current_span("test_span"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].attributes[GenAIAttributes.CONVERSATION_ID] == "session_1" + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "agent_1" + assert spans[0].attributes[GenAIAttributes.AGENT_NAME] == "Agent" + assert spans[0].attributes[GenAIAttributes.AGENT_VERSION] == "1.0" + assert spans[0].attributes["workflow.id"] == "wf_1" + + def test_multi_agent_in_conversation(self, tracer_setup): + """Test multiple agents within same conversation (handoff pattern)""" + tracer, memory_exporter = tracer_setup + + with conversation_context(conversation_id="session_handoff"): + with agent_context(agent_id="router_v1", agent_name="Router"): + with tracer.start_as_current_span("classify"): + pass + + with agent_context(agent_id="support_v2", agent_name="Support"): + with tracer.start_as_current_span("respond"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 2 + + # Both have same conversation, different agents + assert spans[0].attributes[GenAIAttributes.CONVERSATION_ID] == "session_handoff" + assert spans[0].attributes[GenAIAttributes.AGENT_ID] == "router_v1" + assert spans[0].attributes[GenAIAttributes.AGENT_NAME] == "Router" + + assert spans[1].attributes[GenAIAttributes.CONVERSATION_ID] == "session_handoff" + assert spans[1].attributes[GenAIAttributes.AGENT_ID] == "support_v2" + assert spans[1].attributes[GenAIAttributes.AGENT_NAME] == "Support" + + def test_agent_context_no_span(self, tracer_setup): + """Test agent_context works even without spans""" + tracer, memory_exporter = tracer_setup + + with agent_context(agent_id="no_span_agent"): + pass + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 0 + + class TestPropagateAttributes: """Test propagate_attributes() context manager"""