Skip to content

feat: Python SDK version 0.2#1

Open
dewitt4 wants to merge 2 commits intomainfrom
sdk-python-0-2
Open

feat: Python SDK version 0.2#1
dewitt4 wants to merge 2 commits intomainfrom
sdk-python-0-2

Conversation

@dewitt4
Copy link
Contributor

@dewitt4 dewitt4 commented Feb 10, 2026

Updating SDK to support multi-agentic workflows

@dewitt4 dewitt4 requested a review from Copilot February 10, 2026 18:39
@dewitt4 dewitt4 self-assigned this Feb 10, 2026
@dewitt4 dewitt4 added the enhancement New feature or request label Feb 10, 2026
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the WhiteBoxAI Python SDK from version 0.1.0 to 0.2.0, adding support for multi-agent workflows with new integrations for CrewAI and LangChain agents, Git context detection for model versioning, and comprehensive MkDocs documentation.

Changes:

  • Added CrewAI and LangChain multi-agent monitoring integrations with callback handlers and workflow tracking
  • Added Git integration utilities for automatic repository metadata detection
  • Added comprehensive MkDocs documentation setup with Material theme
  • Updated dependencies (httpx, added pandas and tenacity as core deps)
  • Added optional dependencies for git and crewai extras

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 26 comments.

Show a summary per file
File Description
src/whiteboxai/integrations/langchain_agents.py New LangChain multi-agent callback handlers and monitors for tracking agent workflows
src/whiteboxai/integrations/crewai_monitor.py New CrewAI integration for monitoring crew workflows, agents, and tasks
src/whiteboxai/git_utils.py New Git context detection utilities with GitPython and subprocess fallback
src/whiteboxai/integrations/init.py Added exports for new CrewAI and LangChain agent integrations
src/whiteboxai/init.py Fixed imports to use whiteboxai (not explainai), added Git utils exports
src/whiteboxai/version.py Version bump to 0.2.0
pyproject.toml Updated dependencies and version, added git and crewai extras
mkdocs.yml New documentation configuration with Material theme
docs/index.md New documentation homepage with quickstart examples
CHANGELOG.md Added 0.2.0 release notes
.gitignore Added CHANGELOG.md and MIGRATION_SUMMARY.md to ignore list

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1 to +609
"""
LangChain Multi-Agent Integration for WhiteBoxAI

Enhanced callback handler for monitoring multi-agent LangChain workflows including:
- LangGraph multi-agent patterns
- Agent supervisors and coordinators
- Tool usage and agent handoffs
- Agent-to-agent communication
"""

from typing import Any, Dict, List, Optional, Union
from datetime import datetime
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import AgentAction, AgentFinish, LLMResult
from langchain.schema.document import Document
from langchain.schema.output import ChatGeneration, Generation

try:
from whiteboxai import WhiteBoxAI
except ImportError:
WhiteBoxAI = None


class MultiAgentCallbackHandler(BaseCallbackHandler):
"""Enhanced callback handler for multi-agent LangChain workflows.

This handler tracks:
- Agent executions and decisions
- Tool calls and results
- Agent-to-agent handoffs
- LLM calls per agent
- Workflow-level metrics

Example:
```python
from langchain.agents import AgentExecutor, create_react_agent
from whiteboxai.integrations import MultiAgentCallbackHandler

# Initialize WhiteBoxAI client
client = WhiteBoxAI(api_key="your_key")

# Create workflow
workflow_id = client.agent_workflows.create(
name="Research Workflow",
framework="langchain"
).id

# Start workflow
client.agent_workflows.start(workflow_id)

# Create callback
callback = MultiAgentCallbackHandler(
client=client,
workflow_id=workflow_id,
agent_name="researcher"
)

# Use with agent
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
callbacks=[callback]
)
result = agent_executor.run("Research AI safety")

# Complete workflow
client.agent_workflows.complete(
workflow_id,
outputs={"result": result}
)
```
"""

def __init__(
self,
client: "WhiteBoxAI",
workflow_id: str,
agent_name: str = "main",
agent_role: Optional[str] = None,
track_tokens: bool = True,
track_costs: bool = True,
):
"""Initialize the callback handler.

Args:
client: WhiteBoxAI client instance
workflow_id: ID of the workflow to track
agent_name: Name of the current agent
agent_role: Role/description of the agent
track_tokens: Whether to track token usage
track_costs: Whether to estimate costs
"""
if WhiteBoxAI is None:
raise ImportError(
"whiteboxai package not installed. "
"Install with: pip install whiteboxai"
)

self.client = client
self.workflow_id = workflow_id
self.agent_name = agent_name
self.agent_role = agent_role or agent_name
self.track_tokens = track_tokens
self.track_costs = track_costs

# Tracking state
self.current_execution_id: Optional[str] = None
self.execution_start_time: Optional[datetime] = None
self.llm_call_count = 0
self.tool_call_count = 0
self.total_tokens = 0
self.total_cost = 0.0
self.execution_inputs: Optional[Dict[str, Any]] = None

def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs: Any
) -> None:
"""Run when chain starts."""
# Start agent execution
self.execution_start_time = datetime.utcnow()
self.execution_inputs = inputs
self.llm_call_count = 0
self.tool_call_count = 0
self.total_tokens = 0
self.total_cost = 0.0

def on_chain_end(
self,
outputs: Dict[str, Any],
**kwargs: Any
) -> None:
"""Run when chain ends successfully."""
if self.execution_start_time:
duration_ms = int(
(datetime.utcnow() - self.execution_start_time).total_seconds() * 1000
)

# Log agent execution
try:
response = self.client.agent_workflows.create_execution(
workflow_id=self.workflow_id,
agent_name=self.agent_name,
status="completed",
inputs=self.execution_inputs,
outputs=outputs,
duration_ms=duration_ms,
llm_call_count=self.llm_call_count,
tool_call_count=self.tool_call_count,
tokens_used=self.total_tokens if self.track_tokens else None,
cost=self.total_cost if self.track_costs else None,
)
self.current_execution_id = response.get("id")
except Exception as e:
print(f"Warning: Failed to log execution: {e}")

# Reset state
self.execution_start_time = None

def on_chain_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Run when chain errors."""
if self.execution_start_time:
duration_ms = int(
(datetime.utcnow() - self.execution_start_time).total_seconds() * 1000
)

# Log failed execution
try:
self.client.agent_workflows.create_execution(
workflow_id=self.workflow_id,
agent_name=self.agent_name,
status="failed",
inputs=self.execution_inputs,
outputs={"error": str(error)},
duration_ms=duration_ms,
llm_call_count=self.llm_call_count,
tool_call_count=self.tool_call_count,
)
except Exception as e:
print(f"Warning: Failed to log error: {e}")

self.execution_start_time = None

def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs: Any
) -> None:
"""Run when LLM starts."""
self.llm_call_count += 1

def on_llm_end(
self,
response: LLMResult,
**kwargs: Any
) -> None:
"""Run when LLM ends."""
# Track tokens if available
if self.track_tokens and hasattr(response, "llm_output"):
llm_output = response.llm_output or {}
token_usage = llm_output.get("token_usage", {})

total = token_usage.get("total_tokens", 0)
self.total_tokens += total

# Estimate cost if tracking
if self.track_costs and total > 0:
# Rough estimate: $0.002 per 1K tokens (GPT-3.5 pricing)
self.total_cost += (total / 1000) * 0.002

def on_agent_action(
self,
action: AgentAction,
**kwargs: Any
) -> None:
"""Run when agent takes an action (tool call)."""
self.tool_call_count += 1

# Log tool call as interaction
try:
self.client.agent_workflows.create_interaction(
workflow_id=self.workflow_id,
from_agent=self.agent_name,
to_agent="tool",
interaction_type="tool_call",
message=f"Tool: {action.tool}, Input: {action.tool_input}",
meta_data={
"tool": action.tool,
"tool_input": action.tool_input,
"log": action.log,
}
)
except Exception as e:
print(f"Warning: Failed to log tool call: {e}")

def on_agent_finish(
self,
finish: AgentFinish,
**kwargs: Any
) -> None:
"""Run when agent finishes execution."""
# This is called when the agent completes its reasoning
pass

def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any
) -> None:
"""Run when tool starts."""
pass

def on_tool_end(
self,
output: str,
**kwargs: Any
) -> None:
"""Run when tool ends."""
# Log tool result as interaction
try:
self.client.agent_workflows.create_interaction(
workflow_id=self.workflow_id,
from_agent="tool",
to_agent=self.agent_name,
interaction_type="response",
message=f"Tool result: {output[:500]}", # Truncate long outputs
meta_data={"output": output}
)
except Exception as e:
print(f"Warning: Failed to log tool result: {e}")

def on_tool_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Run when tool errors."""
try:
self.client.agent_workflows.create_interaction(
workflow_id=self.workflow_id,
from_agent="tool",
to_agent=self.agent_name,
interaction_type="response",
message=f"Tool error: {str(error)}",
meta_data={"error": str(error), "error_type": type(error).__name__}
)
except Exception as e:
print(f"Warning: Failed to log tool error: {e}")

def on_text(
self,
text: str,
**kwargs: Any
) -> None:
"""Run on arbitrary text."""
pass


class LangGraphMultiAgentMonitor:
"""Monitor for LangGraph multi-agent workflows.

Provides higher-level monitoring for LangGraph patterns like:
- Agent supervisors
- Agent networks
- Sequential/parallel agent execution

Example:
```python
from langgraph.graph import StateGraph
from whiteboxai.integrations import LangGraphMultiAgentMonitor

# Create monitor
monitor = LangGraphMultiAgentMonitor(
client=client,
workflow_name="Multi-Agent Research"
)

# Start monitoring
workflow_id = monitor.start_monitoring()

# Register agents
monitor.register_agent("supervisor", role="Coordinates other agents")
monitor.register_agent("researcher", role="Gathers information")
monitor.register_agent("writer", role="Writes content")

# Execute graph with callbacks
graph = StateGraph(...)
result = graph.invoke(
inputs,
config={"callbacks": [monitor.get_callbacks("supervisor")]}
)

# Complete monitoring
monitor.complete_monitoring(outputs={"result": result})
```
"""

def __init__(
self,
client: "WhiteBoxAI",
workflow_name: str,
meta_data: Optional[Dict[str, Any]] = None
):
"""Initialize the LangGraph monitor.

Args:
client: WhiteBoxAI client instance
workflow_name: Name for the workflow
meta_data: Additional meta_data to attach
"""
if WhiteBoxAI is None:
raise ImportError(
"whiteboxai package not installed. "
"Install with: pip install whiteboxai"
)

self.client = client
self.workflow_name = workflow_name
self.workflow_meta_data = meta_data or {}
self.workflow_id: Optional[str] = None
self.callbacks: Dict[str, MultiAgentCallbackHandler] = {}
self.start_time: Optional[datetime] = None

def start_monitoring(self, inputs: Optional[Dict[str, Any]] = None) -> str:
"""Start workflow monitoring.

Args:
inputs: Initial workflow inputs

Returns:
workflow_id: ID of the created workflow
"""
self.start_time = datetime.utcnow()

# Create workflow
response = self.client.agent_workflows.create(
name=self.workflow_name,
framework="langchain",
inputs=inputs,
meta_data=self.workflow_meta_data
)
self.workflow_id = response.get("id")

# Start workflow
self.client.agent_workflows.start(self.workflow_id)

return self.workflow_id

def register_agent(
self,
agent_name: str,
role: Optional[str] = None,
model_name: Optional[str] = None,
tools: Optional[List[str]] = None,
**kwargs
) -> None:
"""Register an agent in the workflow.

Args:
agent_name: Name of the agent
role: Agent's role/goal
model_name: LLM model used
tools: List of tool names
**kwargs: Additional agent configuration
"""
if not self.workflow_id:
raise ValueError("Must call start_monitoring() first")

self.client.agent_workflows.register_agent(
workflow_id=self.workflow_id,
name=agent_name,
role=role or agent_name,
model_name=model_name,
tools=tools,
**kwargs
)

def get_callbacks(
self,
agent_name: str,
agent_role: Optional[str] = None
) -> List[BaseCallbackHandler]:
"""Get callbacks for a specific agent.

Args:
agent_name: Name of the agent
agent_role: Optional role description

Returns:
List of callback handlers
"""
if not self.workflow_id:
raise ValueError("Must call start_monitoring() first")

if agent_name not in self.callbacks:
self.callbacks[agent_name] = MultiAgentCallbackHandler(
client=self.client,
workflow_id=self.workflow_id,
agent_name=agent_name,
agent_role=agent_role
)

return [self.callbacks[agent_name]]

def log_handoff(
self,
from_agent: str,
to_agent: str,
message: str,
meta_data: Optional[Dict[str, Any]] = None
) -> None:
"""Log an agent-to-agent handoff.

Args:
from_agent: Agent passing control
to_agent: Agent receiving control
message: Handoff message/context
meta_data: Additional meta_data
"""
if not self.workflow_id:
raise ValueError("Must call start_monitoring() first")

self.client.agent_workflows.create_interaction(
workflow_id=self.workflow_id,
from_agent=from_agent,
to_agent=to_agent,
interaction_type="handoff",
message=message,
meta_data=meta_data
)

def complete_monitoring(
self,
outputs: Optional[Dict[str, Any]] = None,
status: str = "completed"
) -> Dict[str, Any]:
"""Complete workflow monitoring.

Args:
outputs: Final workflow outputs
status: Workflow status (completed/failed)

Returns:
Summary with analytics
"""
if not self.workflow_id:
raise ValueError("Must call start_monitoring() first")

# Complete workflow
self.client.agent_workflows.complete(
workflow_id=self.workflow_id,
outputs=outputs,
status=status
)

# Get analytics
try:
analytics = self.client.agent_workflows.get_analytics(self.workflow_id)
return {
"workflow_id": self.workflow_id,
"status": status,
"outputs": outputs,
"analytics": analytics
}
except Exception as e:
print(f"Warning: Failed to retrieve analytics: {e}")
return {
"workflow_id": self.workflow_id,
"status": status,
"outputs": outputs
}


def monitor_langchain_agent(
client: "WhiteBoxAI",
agent_executor: Any,
workflow_name: str,
agent_name: str = "main",
inputs: Optional[Dict[str, Any]] = None,
**run_kwargs
) -> Dict[str, Any]:
"""Helper function to monitor a single LangChain agent execution.

Args:
client: WhiteBoxAI client
agent_executor: LangChain AgentExecutor instance
workflow_name: Name for the workflow
agent_name: Name of the agent
inputs: Inputs to the agent
**run_kwargs: Additional arguments to pass to agent.run()

Returns:
Dict with result and workflow_id

Example:
```python
from langchain.agents import AgentExecutor, create_react_agent
from whiteboxai.integrations import monitor_langchain_agent

result_dict = monitor_langchain_agent(
client=client,
agent_executor=agent_executor,
workflow_name="Research Task",
agent_name="researcher",
inputs={"input": "Research AI safety"}
)

print(f"Result: {result_dict['result']}")
print(f"Workflow ID: {result_dict['workflow_id']}")
```
"""
# Create workflow
response = client.agent_workflows.create(
name=workflow_name,
framework="langchain",
inputs=inputs
)
workflow_id = response.get("id")

# Start workflow
client.agent_workflows.start(workflow_id)

# Create callback
callback = MultiAgentCallbackHandler(
client=client,
workflow_id=workflow_id,
agent_name=agent_name
)

try:
# Run agent with callback
result = agent_executor.run(
callbacks=[callback],
**run_kwargs
)

# Complete workflow
client.agent_workflows.complete(
workflow_id,
outputs={"result": result}
)

return {
"result": result,
"workflow_id": workflow_id,
"status": "completed"
}
except Exception as e:
# Log failure
client.agent_workflows.complete(
workflow_id,
outputs={"error": str(e)},
status="failed"
)

return {
"result": None,
"workflow_id": workflow_id,
"status": "failed",
"error": str(e)
}
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR introduces three major new modules (langchain_agents.py, crewai_monitor.py, and git_utils.py) totaling over 1,400 lines of code, but no tests are included for any of them. The existing codebase has unit tests for client.py and monitor.py, and integration tests for sklearn.py, establishing a pattern of test coverage. These new modules should have corresponding test files to maintain code quality and prevent regressions.

Copilot uses AI. Check for mistakes.
Comment on lines +215 to +216
# Rough estimate: $0.002 per 1K tokens (GPT-3.5 pricing)
self.total_cost += (total / 1000) * 0.002
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hard-coded cost estimate of $0.002 per 1K tokens is outdated and only applies to GPT-3.5 Turbo. Different models have vastly different pricing (e.g., GPT-4 is ~$0.03 per 1K tokens for input, Claude has different pricing, etc.). This will provide inaccurate cost tracking for most models. Consider either removing the cost estimation, making it configurable per model, or clearly documenting this limitation in the docstring that costs are rough GPT-3.5 estimates only.

Copilot uses AI. Check for mistakes.
"goal": getattr(crew_agent, "goal", None),
"backstory": getattr(crew_agent, "backstory", None),
"tools": [tool.__class__.__name__ for tool in getattr(crew_agent, "tools", [])],
"llm_provider": getattr(getattr(crew_agent, "llm", None), "model_name", "unknown").split("/")[0] if hasattr(crew_agent, "llm") else None,
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code attempts to extract llm_provider from model_name by splitting on "/" and taking the first part (line 186). This is fragile and will fail for model names that don't contain "/". For example, if model_name is just "gpt-4" without a provider prefix, split("/")[0] will return "gpt-4", not the provider. Consider using a more robust method to extract the provider, or handle the case where there's no "/" in the model name.

Copilot uses AI. Check for mistakes.
Comment on lines +579 to +583
# Run agent with callback
result = agent_executor.run(
callbacks=[callback],
**run_kwargs
)
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inputs parameter is documented but never actually passed to agent_executor.run(). The function passes **run_kwargs which doesn't include inputs. If users expect the inputs parameter to be used for agent execution, this will not work as intended. Either remove the inputs parameter from the signature if it's only for metadata logging, or merge it into run_kwargs before passing to the agent executor.

Suggested change
# Run agent with callback
result = agent_executor.run(
callbacks=[callback],
**run_kwargs
)
# Run agent with callback, passing inputs if provided
if inputs is not None:
result = agent_executor.run(
inputs,
callbacks=[callback],
**run_kwargs
)
else:
result = agent_executor.run(
callbacks=[callback],
**run_kwargs
)

Copilot uses AI. Check for mistakes.
Comment on lines +234 to +477
meta_data={
"tool": action.tool,
"tool_input": action.tool_input,
"log": action.log,
}
)
except Exception as e:
print(f"Warning: Failed to log tool call: {e}")

def on_agent_finish(
self,
finish: AgentFinish,
**kwargs: Any
) -> None:
"""Run when agent finishes execution."""
# This is called when the agent completes its reasoning
pass

def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs: Any
) -> None:
"""Run when tool starts."""
pass

def on_tool_end(
self,
output: str,
**kwargs: Any
) -> None:
"""Run when tool ends."""
# Log tool result as interaction
try:
self.client.agent_workflows.create_interaction(
workflow_id=self.workflow_id,
from_agent="tool",
to_agent=self.agent_name,
interaction_type="response",
message=f"Tool result: {output[:500]}", # Truncate long outputs
meta_data={"output": output}
)
except Exception as e:
print(f"Warning: Failed to log tool result: {e}")

def on_tool_error(
self,
error: Union[Exception, KeyboardInterrupt],
**kwargs: Any
) -> None:
"""Run when tool errors."""
try:
self.client.agent_workflows.create_interaction(
workflow_id=self.workflow_id,
from_agent="tool",
to_agent=self.agent_name,
interaction_type="response",
message=f"Tool error: {str(error)}",
meta_data={"error": str(error), "error_type": type(error).__name__}
)
except Exception as e:
print(f"Warning: Failed to log tool error: {e}")

def on_text(
self,
text: str,
**kwargs: Any
) -> None:
"""Run on arbitrary text."""
pass


class LangGraphMultiAgentMonitor:
"""Monitor for LangGraph multi-agent workflows.

Provides higher-level monitoring for LangGraph patterns like:
- Agent supervisors
- Agent networks
- Sequential/parallel agent execution

Example:
```python
from langgraph.graph import StateGraph
from whiteboxai.integrations import LangGraphMultiAgentMonitor

# Create monitor
monitor = LangGraphMultiAgentMonitor(
client=client,
workflow_name="Multi-Agent Research"
)

# Start monitoring
workflow_id = monitor.start_monitoring()

# Register agents
monitor.register_agent("supervisor", role="Coordinates other agents")
monitor.register_agent("researcher", role="Gathers information")
monitor.register_agent("writer", role="Writes content")

# Execute graph with callbacks
graph = StateGraph(...)
result = graph.invoke(
inputs,
config={"callbacks": [monitor.get_callbacks("supervisor")]}
)

# Complete monitoring
monitor.complete_monitoring(outputs={"result": result})
```
"""

def __init__(
self,
client: "WhiteBoxAI",
workflow_name: str,
meta_data: Optional[Dict[str, Any]] = None
):
"""Initialize the LangGraph monitor.

Args:
client: WhiteBoxAI client instance
workflow_name: Name for the workflow
meta_data: Additional meta_data to attach
"""
if WhiteBoxAI is None:
raise ImportError(
"whiteboxai package not installed. "
"Install with: pip install whiteboxai"
)

self.client = client
self.workflow_name = workflow_name
self.workflow_meta_data = meta_data or {}
self.workflow_id: Optional[str] = None
self.callbacks: Dict[str, MultiAgentCallbackHandler] = {}
self.start_time: Optional[datetime] = None

def start_monitoring(self, inputs: Optional[Dict[str, Any]] = None) -> str:
"""Start workflow monitoring.

Args:
inputs: Initial workflow inputs

Returns:
workflow_id: ID of the created workflow
"""
self.start_time = datetime.utcnow()

# Create workflow
response = self.client.agent_workflows.create(
name=self.workflow_name,
framework="langchain",
inputs=inputs,
meta_data=self.workflow_meta_data
)
self.workflow_id = response.get("id")

# Start workflow
self.client.agent_workflows.start(self.workflow_id)

return self.workflow_id

def register_agent(
self,
agent_name: str,
role: Optional[str] = None,
model_name: Optional[str] = None,
tools: Optional[List[str]] = None,
**kwargs
) -> None:
"""Register an agent in the workflow.

Args:
agent_name: Name of the agent
role: Agent's role/goal
model_name: LLM model used
tools: List of tool names
**kwargs: Additional agent configuration
"""
if not self.workflow_id:
raise ValueError("Must call start_monitoring() first")

self.client.agent_workflows.register_agent(
workflow_id=self.workflow_id,
name=agent_name,
role=role or agent_name,
model_name=model_name,
tools=tools,
**kwargs
)

def get_callbacks(
self,
agent_name: str,
agent_role: Optional[str] = None
) -> List[BaseCallbackHandler]:
"""Get callbacks for a specific agent.

Args:
agent_name: Name of the agent
agent_role: Optional role description

Returns:
List of callback handlers
"""
if not self.workflow_id:
raise ValueError("Must call start_monitoring() first")

if agent_name not in self.callbacks:
self.callbacks[agent_name] = MultiAgentCallbackHandler(
client=self.client,
workflow_id=self.workflow_id,
agent_name=agent_name,
agent_role=agent_role
)

return [self.callbacks[agent_name]]

def log_handoff(
self,
from_agent: str,
to_agent: str,
message: str,
meta_data: Optional[Dict[str, Any]] = None
) -> None:
"""Log an agent-to-agent handoff.

Args:
from_agent: Agent passing control
to_agent: Agent receiving control
message: Handoff message/context
meta_data: Additional meta_data
"""
if not self.workflow_id:
raise ValueError("Must call start_monitoring() first")

self.client.agent_workflows.create_interaction(
workflow_id=self.workflow_id,
from_agent=from_agent,
to_agent=to_agent,
interaction_type="handoff",
message=message,
meta_data=meta_data
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent naming convention: this file uses meta_data (with underscore) while crewai_monitor.py uses metadata (no underscore) for the same concept. The codebase should use a consistent naming convention. Python style guides typically prefer metadata (no underscore) as it's a single concept, not two separate words. Consider standardizing on metadata throughout the codebase.

Copilot uses AI. Check for mistakes.
Comment on lines +233 to +234
except subprocess.CalledProcessError:
pass
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except subprocess.CalledProcessError:
pass
except subprocess.CalledProcessError as exc:
logger.debug(
"Failed to retrieve git commit message with 'git log -1 --pretty=%%B': %s",
exc,
)

Copilot uses AI. Check for mistakes.
cwd=cwd,
)
commit_author = result.stdout.strip()
except subprocess.CalledProcessError:
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.
branch_output = result.stdout.strip()
if branch_output != "HEAD": # Not in detached HEAD
branch = branch_output
except subprocess.CalledProcessError:
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.
cwd=cwd,
)
tag = result.stdout.strip()
except subprocess.CalledProcessError:
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.
cwd=cwd,
)
is_dirty = bool(result.stdout.strip())
except subprocess.CalledProcessError:
Copy link

Copilot AI Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments