Decompose src/agent/dispatcher.rs into cohesive submodules (#122)#136
Conversation
|
Note Reviews pausedUse the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughSummarise the refactor: split thread‑ops into submodules; adjust undo‑manager and session lock ordering and add consistency checks; hydrate threads using a scoped DB message API; centralise boot‑screen construction; extend auth‑result parsing; add scoped DB APIs and tests; expand tool‑calling architecture docs and boot‑screen snapshot tests. Changes
Possibly related issues
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 6 | ❌ 3❌ Failed checks (3 warnings)
✅ Passed checks (6 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📋 Issue PlannerBuilt with CodeRabbit's Coding Plans for faster development and fewer bugs. View plan used: ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Reviewer's GuideRefactors the agent dispatcher and thread operations into cohesive submodules (dispatcher delegate, tool execution, LLM hooks, thread hydration/control/turn execution/persistence), introduces small helper structs for turn and tool-call persistence, and updates tool execution APIs and tests accordingly while keeping behavior largely the same. Sequence diagram for execute_tool_calls 3-phase tool pipelinesequenceDiagram
participant Delegate as ChatDelegate
participant ToolExec as tool_exec_module
participant Session as Session
participant Thread as Thread
participant Turn as Turn
participant Channels as ChannelManager
participant Tools as ToolRegistry
participant Safety as SafetyLayer
participant JobCtx as JobContext
participant Agent as Agent
participant reason_ctx as reason_ctx
Note over Delegate,ToolExec: Entry: NativeLoopDelegate.execute_tool_calls
Delegate->>ToolExec: execute_tool_calls(delegate, tool_calls, content, reason_ctx)
ToolExec->>reason_ctx: messages.push(assistant_with_tool_calls)
ToolExec->>Channels: send_status(Thinking("Executing N tool(s)..."))
ToolExec->>ToolExec: record_redacted_tool_calls(delegate, tool_calls)
ToolExec->>Session: lock()
Session->>Thread: get_mut(thread_id)
Thread->>Turn: last_turn_mut().record_tool_call(redacted_args)
Session-->>ToolExec: unlock()
Note over ToolExec: Phase 1: Preflight
ToolExec->>ToolExec: group_tool_calls(delegate, tool_calls)
ToolExec->>Tools: get(tc.name)
ToolExec->>Safety: redact_params(tc.arguments, sensitive)
ToolExec->>Agent: hooks().run(HookEvent::ToolCall)
alt hook rejects
ToolExec->>ToolExec: preflight.push(Rejected(msg))
else needs approval
ToolExec->>ToolExec: approval_needed = Some(ApprovalCandidate)
ToolExec-->>Delegate: return NeedApproval
else runnable
ToolExec->>ToolExec: preflight.push(Runnable), runnable.push
end
Note over ToolExec: Phase 2: Execution
ToolExec->>ToolExec: run_phase2(delegate, preflight.len, runnable)
alt small batch
ToolExec->>ToolExec: run_tool_batch_inline
loop each runnable tc
ToolExec->>Delegate: execute_one_tool(tc)
Delegate->>Channels: send_status(ToolStarted)
Delegate->>Agent: execute_chat_tool(name, args, job_ctx)
Agent->>Tools: tools()
Tools-->>Agent: Tool
Agent-->>Delegate: Result<String,Error>
Delegate->>Channels: send_status(ToolCompleted)
Delegate-->>ToolExec: result
end
else large batch
ToolExec->>ToolExec: run_tool_batch_parallel
par each runnable tc
ToolExec->>Channels: send_status(ToolStarted)
ToolExec->>Tools: execute_chat_tool_standalone(ToolCallSpec)
Tools->>Safety: execute_tool_with_safety
Safety-->>Tools: Result<String,Error>
Tools-->>ToolExec: result
ToolExec->>Channels: send_status(ToolCompleted)
end
ToolExec->>ToolExec: fill missing exec_results with ToolError
end
Note over ToolExec: Phase 3: Post-flight
ToolExec->>ToolExec: run_postflight(delegate, preflight, exec_results)
loop for each preflight entry
alt PreflightOutcome::Rejected
ToolExec->>Session: lock()
Session->>Thread: get_mut(thread_id)
Thread->>Turn: last_turn_mut().record_tool_error(msg)
Session-->>ToolExec: unlock()
ToolExec->>reason_ctx: messages.push(tool_result error)
else PreflightOutcome::Runnable
ToolExec->>ToolExec: process_runnable_tool(delegate, tc, result)
alt result is Err
ToolExec->>ToolExec: fold_into_context(error, is_tool_error=true)
else result is Ok
ToolExec->>ToolExec: maybe_emit_image_sentinel
ToolExec->>Safety: sanitize_tool_output or is_valid_json
ToolExec->>Channels: send_status(ToolResult preview)
ToolExec->>ToolExec: check_auth_required + parse_auth_result
alt awaiting token
ToolExec->>Session: lock() and enter_auth_mode
ToolExec->>Channels: send_status(AuthRequired)
ToolExec->>ToolExec: deferred_auth = Some(instructions)
end
ToolExec->>JobCtx: tool_output_stash.insert(tc.id, output)
ToolExec->>ToolExec: fold_into_context(result_content, is_tool_error)
ToolExec->>Session: lock()
Session->>Thread: last_turn_mut().record_tool_result or record_tool_error
Session-->>ToolExec: unlock()
ToolExec->>reason_ctx: messages.push(tool_result)
end
end
end
alt deferred_auth is Some
ToolExec-->>Delegate: LoopOutcome::Response(instructions)
else approval_needed is Some
ToolExec->>ToolExec: build_pending_approval(delegate, candidate, tool_calls, reason_ctx)
ToolExec-->>Delegate: LoopOutcome::NeedApproval(PendingApproval)
else
ToolExec-->>Delegate: None
end
Sequence diagram for approval dispatch via dispatch_approval helpersequenceDiagram
participant Channels as ChannelManager
participant Agent as Agent
participant Dispatch as dispatch_submission
participant Scope as TurnScope
Channels->>Agent: handle_submission(ctx, Submission::ExecApproval{request_id,approved,always})
Agent->>Dispatch: dispatch_submission(ctx, submission)
Dispatch->>Agent: dispatch_approval(&ctx, ApprovalParams{Some(request_id),approved,always})
Agent->>Scope: TurnScope::new(ctx.session.clone, ctx.thread_id, &ctx.message)
Agent->>Agent: process_approval(scope, params)
Agent-->>Dispatch: SubmissionResult
Note over Dispatch,Agent: ApprovalResponse path (no explicit request_id)
Channels->>Agent: handle_submission(ctx, Submission::ApprovalResponse{approved,always})
Agent->>Dispatch: dispatch_submission(ctx, submission)
Dispatch->>Agent: dispatch_approval(&ctx, ApprovalParams{None,approved,always})
Agent->>Scope: TurnScope::new(ctx.session.clone, ctx.thread_id, &ctx.message)
Agent->>Agent: process_approval(scope, params)
Agent-->>Dispatch: SubmissionResult
Class diagram for dispatcher delegate and tool execution refactorclassDiagram
class Agent {
+config
+tools() ToolRegistry*
+safety() SafetyLayer*
+channels: ChannelManager*
+llm()
+cost_guard()
+execute_chat_tool(name, params, job_ctx) Result~String,Error~
}
class ChatDelegate {
+agent: Agent*
+session: Arc_Mutex_Session_
+thread_id: Uuid
+message: IncomingMessage*
+job_ctx: JobContext
+active_skills: Vec_LoadedSkill_
+cached_prompt: String
+cached_prompt_no_tools: String
+nudge_at: usize
+force_text_at: usize
+user_tz: chrono_tz_Tz
}
class ToolBatch {
+preflight: Vec_ToolCall_PreflightOutcome_
+runnable: Vec_usize_ToolCall_
}
class PreflightOutcome {
<<enum>>
Rejected(error_msg: String)
Runnable
}
class ApprovalCandidate {
+idx: usize
+tool_call: ToolCall
+tool: Tool*
}
class ParsedAuthData {
+auth_url: Option_String_
+setup_url: Option_String_
}
class ToolCallSpec {
+name: &str
+params: &serde_json_Value
}
class ToolRegistry {
+get(name) Option_Tool_
+tool_definitions() Vec_ToolDefinition_
}
class SafetyLayer {
+sanitize_tool_output(name, output) SanitizedOutput
+wrap_for_llm(name, content, was_modified) String
}
class JobContext {
+tool_output_stash: RwLock_HashMap_String,String_
}
class StatusUpdate {
<<enum>>
}
class LoopOutcome {
<<enum>>
Response(String)
NeedApproval(PendingApproval)
}
class PendingApproval {
+request_id: Uuid
+tool_name: String
+parameters: serde_json_Value
+display_parameters: serde_json_Value
+description: String
+tool_call_id: String
+context_messages: Vec_ChatMessage_
+deferred_tool_calls: Vec_ToolCall_
+user_timezone: Option_String_
}
class ReasoningContext {
+messages: Vec_ChatMessage_
+available_tools: Vec_ToolDefinition_
+system_prompt: Option_String_
+force_text: bool
}
class Reasoning {
+respond_with_tools(ctx) Result_RespondOutput,LlmError_
}
class NativeLoopDelegate {
<<trait>>
+check_signals()
+before_llm_call()
+call_llm()
+handle_text_response()
+execute_tool_calls()
}
Agent --> ToolRegistry
Agent --> SafetyLayer
ChatDelegate --> Agent
ChatDelegate --> ReasoningContext
ChatDelegate ..|> NativeLoopDelegate
ToolBatch --> PreflightOutcome
ApprovalCandidate --> ToolCall
ApprovalCandidate --> "1" PendingApproval
ParsedAuthData --> StatusUpdate
ToolCallSpec --> ToolRegistry
ToolCallSpec --> SafetyLayer
ReasoningContext --> LoopOutcome
ChatDelegate --> JobContext
%% Key free functions in tool_exec.rs
class tool_exec_module {
<<module>>
+execute_tool_calls(delegate, tool_calls, content, reason_ctx) Result_Option_LoopOutcome_,Error_
+group_tool_calls(delegate, tool_calls) Result_ToolBatch,Option_ApprovalCandidate__,Error_
+execute_chat_tool_standalone(tools, safety, spec, job_ctx) Result_String,Error_
+process_runnable_tool(delegate, tc, result, reason_ctx) Option_String_
+check_auth_required(tool_name, result) Option_String,String_
+parse_auth_result(result) ParsedAuthData
}
tool_exec_module --> ChatDelegate
tool_exec_module --> ToolBatch
tool_exec_module --> ApprovalCandidate
tool_exec_module --> ParsedAuthData
tool_exec_module --> ToolCallSpec
tool_exec_module --> LoopOutcome
tool_exec_module --> PendingApproval
tool_exec_module --> ReasoningContext
tool_exec_module --> ToolRegistry
tool_exec_module --> SafetyLayer
tool_exec_module --> JobContext
Class diagram for thread operations refactor (hydration, control, turn execution, persistence)classDiagram
class Agent {
+hydrate_and_resolve_session_thread(message) (Session,Uuid)
+maybe_hydrate_thread(message, external_thread_id)
+process_user_input(message, req) Result_SubmissionResult,Error_
+persist_user_message(thread_id, user_id, user_input)
+persist_assistant_response(thread_id, user_id, response)
+persist_tool_calls(ctx, tool_calls)
+process_undo(session, thread_id)
+process_redo(session, thread_id)
+process_interrupt(session, thread_id)
+process_compact(session, thread_id)
+process_clear(session, thread_id)
+process_new_thread(message)
+process_switch_thread(message, target_thread_id)
+process_resume(session, thread_id, checkpoint_id)
}
class Session {
+id: Uuid
+threads: HashMap_Uuid,Thread_
+active_thread: Option_Uuid_
+last_active_at: DateTime
+switch_thread(target_thread_id) bool
+is_tool_auto_approved(name) bool
}
class Thread {
+id: Uuid
+state: ThreadState
+turns: Vec_Turn_
+messages() Vec_ChatMessage_
+turn_number() usize
+start_turn(content) &mut Turn
+complete_turn(response)
+fail_turn(error)
+await_approval(pending)
+restore_from_messages(messages)
+enter_auth_mode(extension_name)
}
class ThreadState {
<<enum>>
Idle
Processing
AwaitingApproval
Interrupted
Completed
}
class Turn {
+turn_number: usize
+tool_calls: Vec_TurnToolCall_
+image_content_parts: Vec_ImagePart_
+record_tool_call(name, args)
+record_tool_result(result)
+record_tool_error(error)
}
class TurnToolCall {
+name: String
+result: Option_serde_json_Value_
+error: Option_String_
}
class UserTurnRequest {
+session: Arc_Mutex_Session_
+thread_id: Uuid
+content: String
}
class TurnPersistContext {
+thread_id: Uuid
+user_id: &str
+turn_number: usize
}
class SubmissionResult {
<<enum>>
+response(String)
+NeedApproval(request_id,tool_name,description,parameters)
+Interrupted
+error(msg)
+ok_with_message(msg)
}
class IncomingMessage {
+id: String
+user_id: String
+channel: String
+thread_id: Option_String_
+content: String
+attachments: Vec_Attachment_
+metadata: serde_json_Value
}
class UndoManager {
+checkpoint(turn_number, messages, description)
+undo(current_turn, current_messages) Option_Checkpoint_
+redo(current_turn, current_messages) Option_Checkpoint_
+restore(checkpoint_id) Option_Checkpoint_
+can_undo() bool
+can_redo() bool
+undo_count() usize
+clear()
}
class Checkpoint {
+turn_number: usize
+messages: Vec_ChatMessage_
+description: String
}
class EnsureConversationParams {
+channel: String
+id: Uuid
+thread_id: Option_Uuid_
}
class Store {
+ensure_conversation(params) Result
+add_conversation_message(thread_id, role, content) Result
+list_conversation_messages(thread_id) Result_Vec_DbMessage_
}
class gateway_conversation_params_fn {
<<function>>
+gateway_conversation_params(thread_id, user_id) EnsureConversationParams
}
Agent --> Session : hydrate_and_resolve_session_thread
Agent --> Thread : maybe_hydrate_thread
Agent --> UserTurnRequest : process_user_input
Agent --> TurnPersistContext : persist_tool_calls
Agent --> UndoManager : process_undo
Agent --> UndoManager : process_redo
Agent --> ThreadState : process_interrupt
Agent --> ContextCompactor : process_compact
Session --> Thread
Thread --> ThreadState
Thread --> Turn
Turn --> TurnToolCall
UserTurnRequest --> Session
TurnPersistContext --> Thread
Agent --> Store : persist_user_message
Agent --> Store : persist_assistant_response
Agent --> Store : persist_tool_calls
gateway_conversation_params_fn --> EnsureConversationParams
Agent --> gateway_conversation_params_fn
UndoManager --> Checkpoint
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
8cf7e5c to
d8cef37
Compare
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. src/agent/dispatcher/delegate.rs Comment on file //! Chat delegate implementation for the agentic loop.
❌ New issue: Low Cohesion |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. src/agent/dispatcher/delegate.rs Comment on file async fn execute_tool_calls(
&self,
tool_calls: Vec<crate::llm::ToolCall>,
content: Option<String>,
reason_ctx: &mut ReasoningContext,
) -> Result<Option<LoopOutcome>, Error> {
// Add the assistant message with tool_calls to context.
// OpenAI protocol requires this before tool-result messages.
reason_ctx
.messages
.push(ChatMessage::assistant_with_tool_calls(
content,
tool_calls.clone(),
));
// Execute tools and add results to context
let _ = self
.agent
.channels
.send_status(
&self.message.channel,
StatusUpdate::Thinking(format!("Executing {} tool(s)...", tool_calls.len())),
&self.message.metadata,
)
.await;
// Record tool calls in the thread with sensitive params redacted.
{
let mut redacted_args: Vec<serde_json::Value> = Vec::with_capacity(tool_calls.len());
for tc in &tool_calls {
let safe = if let Some(tool) = self.agent.tools().get(&tc.name).await {
redact_params(&tc.arguments, tool.sensitive_params())
} else {
tc.arguments.clone()
};
redacted_args.push(safe);
}
let mut sess = self.session.lock().await;
if let Some(thread) = sess.threads.get_mut(&self.thread_id)
&& let Some(turn) = thread.last_turn_mut()
{
for (tc, safe_args) in tool_calls.iter().zip(redacted_args) {
turn.record_tool_call(&tc.name, safe_args);
}
}
}
// === Phase 1: Preflight (sequential) ===
// Walk tool_calls checking approval and hooks. Classify
// each tool as Rejected (by hook) or Runnable. Stop at the
// first tool that needs approval.
enum PreflightOutcome {
Rejected(String),
Runnable,
}
let mut preflight: Vec<(crate::llm::ToolCall, PreflightOutcome)> = Vec::new();
let mut runnable: Vec<(usize, crate::llm::ToolCall)> = Vec::new();
let mut approval_needed: Option<(
usize,
crate::llm::ToolCall,
Arc<dyn crate::tools::Tool>,
)> = None;
for (idx, original_tc) in tool_calls.iter().enumerate() {
let mut tc = original_tc.clone();
let tool_opt = self.agent.tools().get(&tc.name).await;
let sensitive = tool_opt
.as_ref()
.map(|t| t.sensitive_params())
.unwrap_or(&[]);
// Hook: BeforeToolCall
let hook_params = redact_params(&tc.arguments, sensitive);
let event = crate::hooks::HookEvent::ToolCall {
tool_name: tc.name.clone(),
parameters: hook_params,
user_id: self.message.user_id.clone(),
context: "chat".to_string(),
};
match self.agent.hooks().run(&event).await {
Err(crate::hooks::HookError::Rejected { reason }) => {
preflight.push((
tc,
PreflightOutcome::Rejected(format!(
"Tool call rejected by hook: {}",
reason
)),
));
continue;
}
Err(err) => {
preflight.push((
tc,
PreflightOutcome::Rejected(format!(
"Tool call blocked by hook policy: {}",
err
)),
));
continue;
}
Ok(crate::hooks::HookOutcome::Continue {
modified: Some(new_params),
}) => match serde_json::from_str::<serde_json::Value>(&new_params) {
Ok(mut parsed) => {
if let Some(obj) = parsed.as_object_mut() {
for key in sensitive {
if let Some(orig_val) = original_tc.arguments.get(*key) {
obj.insert((*key).to_string(), orig_val.clone());
}
}
}
tc.arguments = parsed;
}
Err(e) => {
tracing::warn!(
tool = %tc.name,
"Hook returned non-JSON modification for ToolCall, ignoring: {}",
e
);
}
},
_ => {}
}
// Check if tool requires approval
if !self.agent.config.auto_approve_tools
&& let Some(tool) = tool_opt
{
use crate::tools::ApprovalRequirement;
let needs_approval = match tool.requires_approval(&tc.arguments) {
ApprovalRequirement::Never => false,
ApprovalRequirement::UnlessAutoApproved => {
let sess = self.session.lock().await;
!sess.is_tool_auto_approved(&tc.name)
}
ApprovalRequirement::Always => true,
};
if needs_approval {
approval_needed = Some((idx, tc, tool));
break;
}
}
let preflight_idx = preflight.len();
preflight.push((tc.clone(), PreflightOutcome::Runnable));
runnable.push((preflight_idx, tc));
}
// === Phase 2: Parallel execution ===
let mut exec_results: Vec<Option<Result<String, Error>>> =
(0..preflight.len()).map(|_| None).collect();
if runnable.len() <= 1 {
for (pf_idx, tc) in &runnable {
let _ = self
.agent
.channels
.send_status(
&self.message.channel,
StatusUpdate::ToolStarted {
name: tc.name.clone(),
},
&self.message.metadata,
)
.await;
let result = self
.agent
.execute_chat_tool(&tc.name, &tc.arguments, &self.job_ctx)
.await;
let disp_tool = self.agent.tools().get(&tc.name).await;
let _ = self
.agent
.channels
.send_status(
&self.message.channel,
StatusUpdate::tool_completed(
tc.name.clone(),
&result,
&tc.arguments,
disp_tool.as_deref(),
),
&self.message.metadata,
)
.await;
exec_results[*pf_idx] = Some(result);
}
} else {
let mut join_set = JoinSet::new();
for (pf_idx, tc) in &runnable {
let pf_idx = *pf_idx;
let tools = self.agent.tools().clone();
let safety = self.agent.safety().clone();
let channels = self.agent.channels.clone();
let job_ctx = self.job_ctx.clone();
let tc = tc.clone();
let channel = self.message.channel.clone();
let metadata = self.message.metadata.clone();
join_set.spawn(async move {
let _ = channels
.send_status(
&channel,
StatusUpdate::ToolStarted {
name: tc.name.clone(),
},
&metadata,
)
.await;
let result = execute_chat_tool_standalone(
&tools,
&safety,
&tc.name,
&tc.arguments,
&job_ctx,
)
.await;
let par_tool = tools.get(&tc.name).await;
let _ = channels
.send_status(
&channel,
StatusUpdate::tool_completed(
tc.name.clone(),
&result,
&tc.arguments,
par_tool.as_deref(),
),
&metadata,
)
.await;
(pf_idx, result)
});
}
while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok((pf_idx, result)) => {
exec_results[pf_idx] = Some(result);
}
Err(e) => {
if e.is_panic() {
tracing::error!("Chat tool execution task panicked: {}", e);
} else {
tracing::error!("Chat tool execution task cancelled: {}", e);
}
}
}
}
// Fill panicked slots with error results
for (pf_idx, tc) in runnable.iter() {
if exec_results[*pf_idx].is_none() {
tracing::error!(
tool = %tc.name,
"Filling failed task slot with error"
);
exec_results[*pf_idx] = Some(Err(crate::error::ToolError::ExecutionFailed {
name: tc.name.clone(),
reason: "Task failed during execution".to_string(),
}
.into()));
}
}
}
// === Phase 3: Post-flight (sequential, in original order) ===
let mut deferred_auth: Option<String> = None;
for (pf_idx, (tc, outcome)) in preflight.into_iter().enumerate() {
match outcome {
PreflightOutcome::Rejected(error_msg) => {
{
let mut sess = self.session.lock().await;
if let Some(thread) = sess.threads.get_mut(&self.thread_id)
&& let Some(turn) = thread.last_turn_mut()
{
turn.record_tool_error(error_msg.clone());
}
}
reason_ctx
.messages
.push(ChatMessage::tool_result(&tc.id, &tc.name, error_msg));
}
PreflightOutcome::Runnable => {
let tool_result = exec_results[pf_idx].take().unwrap_or_else(|| {
Err(crate::error::ToolError::ExecutionFailed {
name: tc.name.clone(),
reason: "No result available".to_string(),
}
.into())
});
// Detect image generation sentinel
let is_image_sentinel = if let Ok(ref output) = tool_result
&& matches!(tc.name.as_str(), "image_generate" | "image_edit")
{
if let Ok(sentinel) = serde_json::from_str::<serde_json::Value>(output)
&& sentinel.get("type").and_then(|v| v.as_str())
== Some("image_generated")
{
let data_url = sentinel
.get("data")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let path = sentinel
.get("path")
.and_then(|v| v.as_str())
.map(String::from);
if data_url.is_empty() {
tracing::warn!(
"Image generation sentinel has empty data URL, skipping broadcast"
);
} else {
let _ = self
.agent
.channels
.send_status(
&self.message.channel,
StatusUpdate::ImageGenerated { data_url, path },
&self.message.metadata,
)
.await;
}
true
} else {
false
}
} else {
false
};
// Send ToolResult preview
if !is_image_sentinel
&& let Ok(ref output) = tool_result
&& !output.is_empty()
{
let _ = self
.agent
.channels
.send_status(
&self.message.channel,
StatusUpdate::ToolResult {
name: tc.name.clone(),
preview: output.clone(),
},
&self.message.metadata,
)
.await;
}
// Check for auth awaiting
if deferred_auth.is_none()
&& let Some((ext_name, instructions)) =
check_auth_required(&tc.name, &tool_result)
{
let auth_data = parse_auth_result(&tool_result);
{
let mut sess = self.session.lock().await;
if let Some(thread) = sess.threads.get_mut(&self.thread_id) {
thread.enter_auth_mode(ext_name.clone());
}
}
let _ = self
.agent
.channels
.send_status(
&self.message.channel,
StatusUpdate::AuthRequired {
extension_name: ext_name,
instructions: Some(instructions.clone()),
auth_url: auth_data.auth_url,
setup_url: auth_data.setup_url,
},
&self.message.metadata,
)
.await;
deferred_auth = Some(instructions);
}
// Stash full output so subsequent tools can reference it
if let Ok(ref output) = tool_result {
self.job_ctx
.tool_output_stash
.write()
.await
.insert(tc.id.clone(), output.clone());
}
// Sanitize and add tool result to context
let is_tool_error = tool_result.is_err();
let result_content = match tool_result {
Ok(output) => {
let sanitized =
self.agent.safety().sanitize_tool_output(&tc.name, &output);
self.agent.safety().wrap_for_llm(
&tc.name,
&sanitized.content,
sanitized.was_modified,
)
}
Err(e) => format!("Tool '{}' failed: {}", tc.name, e),
};
// Record sanitized result in thread
{
let mut sess = self.session.lock().await;
if let Some(thread) = sess.threads.get_mut(&self.thread_id)
&& let Some(turn) = thread.last_turn_mut()
{
if is_tool_error {
turn.record_tool_error(result_content.clone());
} else {
turn.record_tool_result(serde_json::json!(result_content));
}
}
}
reason_ctx.messages.push(ChatMessage::tool_result(
&tc.id,
&tc.name,
result_content,
));
}
}
}
// Return auth response after all results are recorded
if let Some(instructions) = deferred_auth {
return Ok(Some(LoopOutcome::Response(instructions)));
}
// Handle approval if a tool needed it
if let Some((approval_idx, tc, tool)) = approval_needed {
let display_params = redact_params(&tc.arguments, tool.sensitive_params());
let pending = crate::agent::session::PendingApproval {
request_id: Uuid::new_v4(),
tool_name: tc.name.clone(),
parameters: tc.arguments.clone(),
display_parameters: display_params,
description: tool.description().to_string(),
tool_call_id: tc.id.clone(),
context_messages: reason_ctx.messages.clone(),
deferred_tool_calls: tool_calls[approval_idx + 1..].to_vec(),
user_timezone: Some(self.user_tz.name().to_string()),
};
return Ok(Some(LoopOutcome::NeedApproval(Box::new(pending))));
}
Ok(None)
}❌ New issue: Complex Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. src/agent/thread_ops/persistence.rs Comment on file pub(super) async fn persist_tool_calls(
&self,
thread_id: Uuid,
user_id: &str,
turn_number: usize,
tool_calls: &[crate::agent::session::TurnToolCall],
) {
if tool_calls.is_empty() {
return;
}
let store = match self.store() {
Some(s) => Arc::clone(s),
None => return,
};
let summaries: Vec<serde_json::Value> = tool_calls
.iter()
.enumerate()
.map(|(i, tc)| {
let mut obj = serde_json::json!({
"name": tc.name,
"call_id": format!("turn{}_{}", turn_number, i),
});
if let Some(ref result) = tc.result {
let preview = match result {
serde_json::Value::String(s) => truncate_preview(s, 500),
other => truncate_preview(&other.to_string(), 500),
};
obj["result_preview"] = serde_json::Value::String(preview);
// Store full result (truncated to ~1000 chars) for LLM context rebuild
let full_result = match result {
serde_json::Value::String(s) => truncate_preview(s, 1000),
other => truncate_preview(&other.to_string(), 1000),
};
obj["result"] = serde_json::Value::String(full_result);
}
if let Some(ref error) = tc.error {
obj["error"] = serde_json::Value::String(error.clone());
}
obj
})
.collect();
let content = match serde_json::to_string(&summaries) {
Ok(c) => c,
Err(e) => {
tracing::warn!("Failed to serialize tool calls: {}", e);
return;
}
};
if let Err(e) = store
.ensure_conversation(gateway_conversation_params(thread_id, user_id))
.await
{
tracing::warn!("Failed to ensure conversation {}: {}", thread_id, e);
return;
}
if let Err(e) = store
.add_conversation_message(thread_id, "tool_calls", &content)
.await
{
tracing::warn!("Failed to persist tool calls: {}", e);
}
}❌ New issue: Complex Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. src/agent/thread_ops/persistence.rs Comment on file pub(super) async fn persist_tool_calls(
&self,
thread_id: Uuid,
user_id: &str,
turn_number: usize,
tool_calls: &[crate::agent::session::TurnToolCall],
) {
if tool_calls.is_empty() {
return;
}
let store = match self.store() {
Some(s) => Arc::clone(s),
None => return,
};
let summaries: Vec<serde_json::Value> = tool_calls
.iter()
.enumerate()
.map(|(i, tc)| {
let mut obj = serde_json::json!({
"name": tc.name,
"call_id": format!("turn{}_{}", turn_number, i),
});
if let Some(ref result) = tc.result {
let preview = match result {
serde_json::Value::String(s) => truncate_preview(s, 500),
other => truncate_preview(&other.to_string(), 500),
};
obj["result_preview"] = serde_json::Value::String(preview);
// Store full result (truncated to ~1000 chars) for LLM context rebuild
let full_result = match result {
serde_json::Value::String(s) => truncate_preview(s, 1000),
other => truncate_preview(&other.to_string(), 1000),
};
obj["result"] = serde_json::Value::String(full_result);
}
if let Some(ref error) = tc.error {
obj["error"] = serde_json::Value::String(error.clone());
}
obj
})
.collect();
let content = match serde_json::to_string(&summaries) {
Ok(c) => c,
Err(e) => {
tracing::warn!("Failed to serialize tool calls: {}", e);
return;
}
};
if let Err(e) = store
.ensure_conversation(gateway_conversation_params(thread_id, user_id))
.await
{
tracing::warn!("Failed to ensure conversation {}: {}", thread_id, e);
return;
}
if let Err(e) = store
.add_conversation_message(thread_id, "tool_calls", &content)
.await
{
tracing::warn!("Failed to persist tool calls: {}", e);
}
}❌ New issue: Excess Number of Function Arguments |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. src/agent/thread_ops/turn_execution.rs Comment on file pub(super) async fn process_user_input(
&self,
message: &IncomingMessage,
session: Arc<Mutex<Session>>,
thread_id: Uuid,
content: &str,
) -> Result<SubmissionResult, Error> {
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
content_len = content.len(),
"Processing user input"
);
// First check thread state without holding lock during I/O
let thread_state = {
let sess = session.lock().await;
let thread = sess
.threads
.get(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
thread.state
};
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
thread_state = ?thread_state,
"Checked thread state"
);
// Check thread state
match thread_state {
ThreadState::Processing => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread is processing, rejecting new input"
);
return Ok(SubmissionResult::error(
"Turn in progress. Use /interrupt to cancel.",
));
}
ThreadState::AwaitingApproval => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread awaiting approval, rejecting new input"
);
return Ok(SubmissionResult::error(
"Waiting for approval. Use /interrupt to cancel.",
));
}
ThreadState::Completed => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread completed, rejecting new input"
);
return Ok(SubmissionResult::error(
"Thread completed. Use /thread new.",
));
}
ThreadState::Idle | ThreadState::Interrupted => {
// Can proceed
}
}
// Safety validation for user input
let validation = self.safety().validate_input(content);
if !validation.is_valid {
let details = validation
.errors
.iter()
.map(|e| format!("{}: {}", e.field, e.message))
.collect::<Vec<_>>()
.join("; ");
return Ok(SubmissionResult::error(format!(
"Input rejected by safety validation: {}",
details
)));
}
let violations = self.safety().check_policy(content);
if violations
.iter()
.any(|rule| rule.action == crate::safety::PolicyAction::Block)
{
return Ok(SubmissionResult::error("Input rejected by safety policy."));
}
// Scan inbound messages for secrets (API keys, tokens).
// Catching them here prevents the LLM from echoing them back, which
// would trigger the outbound leak detector and create error loops.
if let Some(warning) = self.safety().scan_inbound_for_secrets(content) {
tracing::warn!(
user = %message.user_id,
channel = %message.channel,
"Inbound message blocked: contains leaked secret"
);
return Ok(SubmissionResult::error(warning));
}
// Handle explicit commands (starting with /) directly
// Everything else goes through the normal agentic loop with tools
let temp_message = IncomingMessage {
content: content.to_string(),
..message.clone()
};
if let Some(intent) = self.router.route_command(&temp_message) {
// Explicit command like /status, /job, /list - handle directly
return self.handle_job_or_command(intent, message).await;
}
// Natural language goes through the agentic loop
// Job tools (create_job, list_jobs, etc.) are in the tool registry
// Auto-compact if needed BEFORE adding new turn
{
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
let messages = thread.messages();
if let Some(strategy) = self.context_monitor.suggest_compaction(&messages) {
let pct = self.context_monitor.usage_percent(&messages);
tracing::info!("Context at {:.1}% capacity, auto-compacting", pct);
// Notify the user that compaction is happening
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status(format!(
"Context at {:.0}% capacity, compacting...",
pct
)),
&message.metadata,
)
.await;
let compactor = ContextCompactor::new(self.llm().clone());
if let Err(e) = compactor
.compact(thread, strategy, self.workspace().map(|w| w.as_ref()))
.await
{
tracing::warn!("Auto-compaction failed: {}", e);
}
}
}
// Create checkpoint before turn
let undo_mgr = self.session_manager.get_undo_manager(thread_id).await;
{
let sess = session.lock().await;
let thread = sess
.threads
.get(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
let mut mgr = undo_mgr.lock().await;
mgr.checkpoint(
thread.turn_number(),
thread.messages(),
format!("Before turn {}", thread.turn_number()),
);
}
// Augment content with attachment context (transcripts, metadata, images)
let augmented =
crate::agent::attachments::augment_with_attachments(content, &message.attachments);
let (effective_content, image_parts) = match &augmented {
Some(result) => (result.text.as_str(), result.image_parts.clone()),
None => (content, Vec::new()),
};
// Start the turn and get messages
let turn_messages = {
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
let turn = thread.start_turn(effective_content);
turn.image_content_parts = image_parts;
thread.messages()
};
// Persist user message to DB immediately so it survives crashes
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
"Persisting user message to DB"
);
self.persist_user_message(thread_id, &message.user_id, effective_content)
.await;
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
"User message persisted, starting agentic loop"
);
// Send thinking status
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Thinking("Processing...".into()),
&message.metadata,
)
.await;
// Run the agentic tool execution loop
let result = self
.run_agentic_loop(message, session.clone(), thread_id, turn_messages)
.await;
// Re-acquire lock and check if interrupted
let interrupted = {
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
thread.state == ThreadState::Interrupted
};
if interrupted {
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Interrupted".into()),
&message.metadata,
)
.await;
return Ok(SubmissionResult::Interrupted);
}
// Re-acquire lock for processing result
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
// Complete, fail, or request approval
match result {
Ok(AgenticLoopResult::Response(response)) => {
// Drop the session lock before running the response transform hook
drop(sess);
// Hook: TransformResponse — allow hooks to modify or reject the final response
let response = {
let event = crate::hooks::HookEvent::ResponseTransform {
user_id: message.user_id.clone(),
thread_id: thread_id.to_string(),
response: response.clone(),
};
match self.hooks().run(&event).await {
Err(crate::hooks::HookError::Rejected { reason }) => {
format!("[Response filtered: {}]", reason)
}
Ok(crate::hooks::HookOutcome::Reject { reason }) => {
format!("[Response filtered: {}]", reason)
}
Err(err) => {
tracing::warn!("TransformResponse hook failed open: {}", err);
response
}
Ok(crate::hooks::HookOutcome::Continue {
modified: Some(new_response),
}) => new_response,
_ => response, // fail-open: use original
}
};
// Re-acquire lock to complete turn and snapshot data
let completion = {
let mut sess = session.lock().await;
let thread = sess.threads.get_mut(&thread_id).ok_or_else(|| {
Error::from(crate::error::JobError::NotFound { id: thread_id })
})?;
if thread.state == ThreadState::Interrupted {
None
} else {
thread.complete_turn(&response);
Some(
thread
.turns
.last()
.map(|t| (t.turn_number, t.tool_calls.clone()))
.unwrap_or_default(),
)
}
};
let Some((turn_number, tool_calls)) = completion else {
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Interrupted".into()),
&message.metadata,
)
.await;
return Ok(SubmissionResult::Interrupted);
};
// Lock is dropped here at end of block
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Done".into()),
&message.metadata,
)
.await;
// Persist tool calls then assistant response (user message already persisted at turn start)
self.persist_tool_calls(thread_id, &message.user_id, turn_number, &tool_calls)
.await;
self.persist_assistant_response(thread_id, &message.user_id, &response)
.await;
Ok(SubmissionResult::response(response))
}
Ok(AgenticLoopResult::NeedApproval { pending }) => {
// Store pending approval in thread and update state
let request_id = pending.request_id;
let tool_name = pending.tool_name.clone();
let description = pending.description.clone();
let parameters = pending.display_parameters.clone();
thread.await_approval(pending);
// Drop the session lock before async operations
drop(sess);
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Awaiting approval".into()),
&message.metadata,
)
.await;
Ok(SubmissionResult::NeedApproval {
request_id,
tool_name,
description,
parameters,
})
}
Err(e) => {
thread.fail_turn(e.to_string());
// User message already persisted at turn start; nothing else to save
Ok(SubmissionResult::error(e.to_string()))
}
}
}❌ New issue: Complex Method |
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. src/agent/thread_ops/turn_execution.rs Comment on file pub(super) async fn process_user_input(
&self,
message: &IncomingMessage,
session: Arc<Mutex<Session>>,
thread_id: Uuid,
content: &str,
) -> Result<SubmissionResult, Error> {
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
content_len = content.len(),
"Processing user input"
);
// First check thread state without holding lock during I/O
let thread_state = {
let sess = session.lock().await;
let thread = sess
.threads
.get(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
thread.state
};
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
thread_state = ?thread_state,
"Checked thread state"
);
// Check thread state
match thread_state {
ThreadState::Processing => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread is processing, rejecting new input"
);
return Ok(SubmissionResult::error(
"Turn in progress. Use /interrupt to cancel.",
));
}
ThreadState::AwaitingApproval => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread awaiting approval, rejecting new input"
);
return Ok(SubmissionResult::error(
"Waiting for approval. Use /interrupt to cancel.",
));
}
ThreadState::Completed => {
tracing::warn!(
message_id = %message.id,
thread_id = %thread_id,
"Thread completed, rejecting new input"
);
return Ok(SubmissionResult::error(
"Thread completed. Use /thread new.",
));
}
ThreadState::Idle | ThreadState::Interrupted => {
// Can proceed
}
}
// Safety validation for user input
let validation = self.safety().validate_input(content);
if !validation.is_valid {
let details = validation
.errors
.iter()
.map(|e| format!("{}: {}", e.field, e.message))
.collect::<Vec<_>>()
.join("; ");
return Ok(SubmissionResult::error(format!(
"Input rejected by safety validation: {}",
details
)));
}
let violations = self.safety().check_policy(content);
if violations
.iter()
.any(|rule| rule.action == crate::safety::PolicyAction::Block)
{
return Ok(SubmissionResult::error("Input rejected by safety policy."));
}
// Scan inbound messages for secrets (API keys, tokens).
// Catching them here prevents the LLM from echoing them back, which
// would trigger the outbound leak detector and create error loops.
if let Some(warning) = self.safety().scan_inbound_for_secrets(content) {
tracing::warn!(
user = %message.user_id,
channel = %message.channel,
"Inbound message blocked: contains leaked secret"
);
return Ok(SubmissionResult::error(warning));
}
// Handle explicit commands (starting with /) directly
// Everything else goes through the normal agentic loop with tools
let temp_message = IncomingMessage {
content: content.to_string(),
..message.clone()
};
if let Some(intent) = self.router.route_command(&temp_message) {
// Explicit command like /status, /job, /list - handle directly
return self.handle_job_or_command(intent, message).await;
}
// Natural language goes through the agentic loop
// Job tools (create_job, list_jobs, etc.) are in the tool registry
// Auto-compact if needed BEFORE adding new turn
{
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
let messages = thread.messages();
if let Some(strategy) = self.context_monitor.suggest_compaction(&messages) {
let pct = self.context_monitor.usage_percent(&messages);
tracing::info!("Context at {:.1}% capacity, auto-compacting", pct);
// Notify the user that compaction is happening
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status(format!(
"Context at {:.0}% capacity, compacting...",
pct
)),
&message.metadata,
)
.await;
let compactor = ContextCompactor::new(self.llm().clone());
if let Err(e) = compactor
.compact(thread, strategy, self.workspace().map(|w| w.as_ref()))
.await
{
tracing::warn!("Auto-compaction failed: {}", e);
}
}
}
// Create checkpoint before turn
let undo_mgr = self.session_manager.get_undo_manager(thread_id).await;
{
let sess = session.lock().await;
let thread = sess
.threads
.get(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
let mut mgr = undo_mgr.lock().await;
mgr.checkpoint(
thread.turn_number(),
thread.messages(),
format!("Before turn {}", thread.turn_number()),
);
}
// Augment content with attachment context (transcripts, metadata, images)
let augmented =
crate::agent::attachments::augment_with_attachments(content, &message.attachments);
let (effective_content, image_parts) = match &augmented {
Some(result) => (result.text.as_str(), result.image_parts.clone()),
None => (content, Vec::new()),
};
// Start the turn and get messages
let turn_messages = {
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
let turn = thread.start_turn(effective_content);
turn.image_content_parts = image_parts;
thread.messages()
};
// Persist user message to DB immediately so it survives crashes
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
"Persisting user message to DB"
);
self.persist_user_message(thread_id, &message.user_id, effective_content)
.await;
tracing::debug!(
message_id = %message.id,
thread_id = %thread_id,
"User message persisted, starting agentic loop"
);
// Send thinking status
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Thinking("Processing...".into()),
&message.metadata,
)
.await;
// Run the agentic tool execution loop
let result = self
.run_agentic_loop(message, session.clone(), thread_id, turn_messages)
.await;
// Re-acquire lock and check if interrupted
let interrupted = {
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
thread.state == ThreadState::Interrupted
};
if interrupted {
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Interrupted".into()),
&message.metadata,
)
.await;
return Ok(SubmissionResult::Interrupted);
}
// Re-acquire lock for processing result
let mut sess = session.lock().await;
let thread = sess
.threads
.get_mut(&thread_id)
.ok_or_else(|| Error::from(crate::error::JobError::NotFound { id: thread_id }))?;
// Complete, fail, or request approval
match result {
Ok(AgenticLoopResult::Response(response)) => {
// Drop the session lock before running the response transform hook
drop(sess);
// Hook: TransformResponse — allow hooks to modify or reject the final response
let response = {
let event = crate::hooks::HookEvent::ResponseTransform {
user_id: message.user_id.clone(),
thread_id: thread_id.to_string(),
response: response.clone(),
};
match self.hooks().run(&event).await {
Err(crate::hooks::HookError::Rejected { reason }) => {
format!("[Response filtered: {}]", reason)
}
Ok(crate::hooks::HookOutcome::Reject { reason }) => {
format!("[Response filtered: {}]", reason)
}
Err(err) => {
tracing::warn!("TransformResponse hook failed open: {}", err);
response
}
Ok(crate::hooks::HookOutcome::Continue {
modified: Some(new_response),
}) => new_response,
_ => response, // fail-open: use original
}
};
// Re-acquire lock to complete turn and snapshot data
let completion = {
let mut sess = session.lock().await;
let thread = sess.threads.get_mut(&thread_id).ok_or_else(|| {
Error::from(crate::error::JobError::NotFound { id: thread_id })
})?;
if thread.state == ThreadState::Interrupted {
None
} else {
thread.complete_turn(&response);
Some(
thread
.turns
.last()
.map(|t| (t.turn_number, t.tool_calls.clone()))
.unwrap_or_default(),
)
}
};
let Some((turn_number, tool_calls)) = completion else {
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Interrupted".into()),
&message.metadata,
)
.await;
return Ok(SubmissionResult::Interrupted);
};
// Lock is dropped here at end of block
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Done".into()),
&message.metadata,
)
.await;
// Persist tool calls then assistant response (user message already persisted at turn start)
self.persist_tool_calls(thread_id, &message.user_id, turn_number, &tool_calls)
.await;
self.persist_assistant_response(thread_id, &message.user_id, &response)
.await;
Ok(SubmissionResult::response(response))
}
Ok(AgenticLoopResult::NeedApproval { pending }) => {
// Store pending approval in thread and update state
let request_id = pending.request_id;
let tool_name = pending.tool_name.clone();
let description = pending.description.clone();
let parameters = pending.display_parameters.clone();
thread.await_approval(pending);
// Drop the session lock before async operations
drop(sess);
let _ = self
.channels
.send_status(
&message.channel,
StatusUpdate::Status("Awaiting approval".into()),
&message.metadata,
)
.await;
Ok(SubmissionResult::NeedApproval {
request_id,
tool_name,
description,
parameters,
})
}
Err(e) => {
thread.fail_turn(e.to_string());
// User message already persisted at turn start; nothing else to save
Ok(SubmissionResult::error(e.to_string()))
}
}
}❌ New issue: Excess Number of Function Arguments |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
…136 Split src/agent/dispatcher.rs (2618 lines) into 4 focused submodules: - preflight.rs: PreflightOutcome, ToolBatch, group_tool_calls, handle_rejected_tool - execution.rs: Tool execution (inline and parallel), execute_chat_tool_standalone - postflight.rs: Post-execution processing, auth handling, context folding - delegate.rs: ChatDelegate struct, NativeLoopDelegate impl src/agent/dispatcher/mod.rs now contains: - Module declarations and shared imports - Shared constants (PREVIEW_MAX_CHARS) - Shared utility functions (is_valid_json, truncate_for_preview, select_active_skills) - AgenticLoopResult enum - impl Agent block with run_agentic_loop and execute_chat_tool - Free functions: compact_messages_for_retry, strip_internal_tool_call_text - Original tests preserved Re-exports added for cross-module usage: - execution::execute_chat_tool_standalone - postflight::{check_auth_required, parse_auth_result} This addresses CodeScene's Low Cohesion warning by separating 5 distinct responsibilities into focused submodules before introducing delegate.rs. Pure structural refactor with no logic changes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Parse successful tool output as JSON before recording it in the\nactive turn so JSON-producing tools keep object and array structure\nin thread history.\n\nWhen the stored content is the wrapped tool-output envelope, extract the\ninner payload first and fall back to the wrapped string only when the\ncontent is not valid JSON. Add regression tests for both JSON and\nplain-text tool results.
Route the active chat delegate tool-call path through the split tool_exec pipeline so hook-mutated arguments are committed to reasoning context and thread history before execution continues. Trim the now-unreachable legacy delegate helpers, keep approval-test coverage intact, and add a regression test proving the assistant tool-call message records the post-hook arguments rather than the original payload.
Add concrete unit coverage for the live pure helper modules touched by PR #136 on this branch. Cover message compaction and tool-call marker stripping in llm_hooks, and cover auth-barrier parsing in tool_exec postflight. The current thread-op modules already had behavioural coverage, so this change focuses on the remaining inline helper gaps rather than duplicating newer tests.
Persist the turn's executed tool calls before returning auth instructions from the approval flow. This keeps DB-backed thread reconstruction aligned with the in-memory turn state by recording the tool-call summary alongside the assistant auth message for intercepted turns.
Reuse the chat delegate's BeforeToolCall hook path during deferred post-approval execution so hook-driven parameter rewrites and rejections match the main loop. This keeps deferred tool execution from running stale arguments and adds regression coverage for both hook mutation and hook rejection in the approval preflight path.
Replace the duplicated Docker status snapshot tests with one rstest that preserves the existing snapshot names. This keeps the assertions and snapshots unchanged while reducing repetition in the boot screen test module.
Document the ownership checks performed by the scoped LibSQL conversation-message helper. This makes the security invariant explicit for future maintainers: the helper must verify the (user_id, channel) owner tuple before reading message rows and returns NotFound when the tuple does not match.
Extend the boot screen DB override rstest with the enabled-DB branch so the constructor is verified for both the override and passthrough cases. This keeps the existing helper-based setup while checking that the config backend string is preserved when --no-db is not set.
Make the shared libsql test backend fixture fallible instead of panicking, then add regressions for stale compaction snapshots, happy-path turn execution, and leaked-secret safety rejection. The new tests match the current orchestration semantics: successful turns record a pre-turn checkpoint, call the LLM, and return the thread to idle after finalisation.
Reduce the main run_before_tool_call_hook helper to a two-argument shape by introducing a module-private parameter object. Keep the existing hook behaviour and user-facing rejection text unchanged, while updating the deferred approval path to use a dedicated wrapper that preserves its current runtime semantics.
Extract helper functions from deferred approval preflight so the per-call hook, sensitive-field restoration, approval classification, and rejection side effects are easier to follow. Keep the existing approval and hook behaviour intact while removing stale delegate-side helper code that is no longer used by the deferred execution path.
17a1768 to
44d6a9d
Compare
|
@coderabbitai Please suggest a fix for this issue and supply a prompt for an AI coding agent to enable it to apply the fix. Include the file and symbol names indicated in the issue at the head of your response. Excess Number of Function Argumentssrc/agent/thread_ops/approval.rs: run_before_tool_call_hook_for_deferred What lead to degradation?run_before_tool_call_hook_for_deferred has 5 arguments, max arguments = 4 Why does this problem occur?Functions with many arguments indicate either a) low cohesion where the function has too many responsibilities, or b) a missing abstraction that encapsulates those arguments. How to fix it?Start by investigating the responsibilities of the function. Make sure it doesn't do too many things, in which case it should be split into smaller and more cohesive functions. Consider the refactoring INTRODUCE PARAMETER OBJECT to encapsulate arguments that refer to the same logical concept. |
This comment was marked as resolved.
This comment was marked as resolved.
|
@coderabbitai Have the following failed checks now been resolved? If further work is required, please provide an AI agent prompt for the remaining work to be done to address these failures. ❌ Failed checks (3 warnings)
|
This comment was marked as resolved.
This comment was marked as resolved.
|
@coderabbitai open a GitHub issue for these tests. |
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
This comment was marked as resolved.
|
@coderabbitai Have the following now been resolved? src/agent/thread_ops/approval.rs (2)
🤖 Prompt for AI Agents
🤖 Prompt for AI Agents |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: Length of output: 2973
Optional follow‑ups (tests, if you want them):
If you want me to draft those tests, say “plan it” and I’ll prepare a concrete change plan with snippets. |
Change Type
Linked Issue
Closes #122
Summary by Sourcery
Decompose the chat dispatcher and thread operations into cohesive submodules for tool execution, LLM hooks, turn lifecycle, persistence, hydration, and control, while preserving existing behaviour and updating documentation.
Enhancements:
Documentation:
Tests:
Validation
cargo fmtappliedcargo clippypasses with no new warningsSecurity Considerations
No new attack surface introduced. This is a pure structural refactor; all safety, hook, and auth logic is moved without modification.
Database Impact
None. Persistence helpers are moved to
thread_ops/persistence.rs; no schema or query changes.Blast Radius
Contained within
src/agent/dispatcher/andsrc/agent/thread_ops/. All public API surfaces (function signatures visible outside these modules) are unchanged. Internalpub(crate)andpub(super)items are reorganised.Review Track
Standard — reviewers should focus on module boundary correctness and that no logic has been silently altered during extraction.
Rollback Plan
Revert to the pre-PR commit on
main. The monolithicdispatcher.rsandthread_ops.rsare fully intact there. No database migrations or configuration changes are involved.