diff --git a/crates/atomic-core/src/agent.rs b/crates/atomic-core/src/agent.rs index 551d8b8d..e230c30e 100644 --- a/crates/atomic-core/src/agent.rs +++ b/crates/atomic-core/src/agent.rs @@ -4,6 +4,7 @@ //! retrieves atoms, and generates responses with citations. //! Uses a callback-based event system (same pattern as EmbeddingEvent). +use crate::atom_edit::{apply_atom_edits, AtomEditOperation}; use crate::chunking::count_tokens; use crate::embedding::EmbeddingEvent; use crate::models::{ @@ -166,40 +167,9 @@ fn get_tools() -> Vec { "required": ["content"] }), ), - ToolDefinition::new( - "update_atom", - "Replace an existing atom's markdown content. Only use this when the user explicitly asks you to fully rewrite an atom or provided the complete replacement content. Prefer edit_atom for targeted edits, appends, and insertions. Never update atoms just because you found useful information. If the user says to update \"this atom\", call get_current_page_context first to get the atom_id.", - json!({ - "type": "object", - "properties": { - "atom_id": { - "type": "string", - "description": "The ID of the atom to update" - }, - "content": { - "type": "string", - "description": "Full replacement markdown content for the atom" - }, - "source_url": { - "type": "string", - "description": "Optional replacement source URL. Omit to preserve the current source URL." - }, - "published_at": { - "type": "string", - "description": "Optional replacement publication date. Omit to preserve the current publication date." - }, - "tag_ids": { - "type": "array", - "description": "Optional replacement list of existing tag IDs. Omit to preserve current tags; pass [] to clear tags.", - "items": { "type": "string" } - } - }, - "required": ["atom_id", "content"] - }), - ), ToolDefinition::new( "edit_atom", - "Apply targeted edits to an existing atom. Only use this when the user explicitly asks you to modify an atom. Supports replace, insert_after, and append operations. replace and insert_after require exact text that appears exactly once in the current atom; call get_atom first if you need context. append adds text to the end of the atom without needing an anchor.", + "Apply edits to an existing atom. Only use this when the user explicitly asks you to modify an atom. Supports replace, insert_after, append, and replace_all operations. Prefer targeted edits. Use replace_all only when the user explicitly asks for a full rewrite or provides complete replacement content. replace and insert_after require exact text that appears exactly once in the current atom; call get_atom first if you need context.", json!({ "type": "object", "properties": { @@ -215,8 +185,8 @@ fn get_tools() -> Vec { "properties": { "operation": { "type": "string", - "enum": ["replace", "insert_after", "append"], - "description": "replace swaps exact old_text for new_text; insert_after inserts text after exact anchor_text; append adds text to the end of the atom." + "enum": ["replace", "insert_after", "append", "replace_all"], + "description": "replace swaps exact old_text for new_text; insert_after inserts text after exact anchor_text; append adds text to the end of the atom; replace_all replaces the full atom content." }, "old_text": { "type": "string", @@ -233,6 +203,10 @@ fn get_tools() -> Vec { "text": { "type": "string", "description": "Text to insert for insert_after or append. Include leading newlines/spaces exactly as desired." + }, + "content": { + "type": "string", + "description": "Full replacement markdown content. Required for replace_all." } }, "required": ["operation"] @@ -590,15 +564,14 @@ async fn execute_create_atom( cache.invalidate(); } - enqueue_and_process_agent_pipeline( - storage, - &id, - "agent_create_atom", + enqueue_agent_pipeline_in_background( + storage.clone(), + id.clone(), + "agent_create_atom".to_string(), external_settings, - canvas_cache, + canvas_cache.cloned(), on_embedding_event, - ) - .await?; + ); Ok(atom) } @@ -617,6 +590,7 @@ async fn enqueue_and_process_agent_pipeline( tag_requested: true, not_before: None, reason: reason.to_string(), + replace_existing: false, }; storage .enqueue_pipeline_jobs_sync(&[job]) @@ -648,141 +622,36 @@ async fn enqueue_and_process_agent_pipeline( Ok(()) } -async fn execute_update_atom( - storage: &StorageBackend, - tool_args: &serde_json::Value, +fn enqueue_agent_pipeline_in_background( + storage: StorageBackend, + atom_id: String, + reason: String, external_settings: Option>, - canvas_cache: Option<&crate::CanvasCache>, + canvas_cache: Option, on_embedding_event: Arc, -) -> Result, String> { - let atom_id = tool_args["atom_id"].as_str().unwrap_or(""); - let Some(existing) = storage - .get_atom_impl(atom_id) - .await - .map_err(|e| e.to_string())? - else { - return Ok(None); - }; - - let content = tool_args["content"].as_str().unwrap_or("").to_string(); - if content.trim().is_empty() { - return Err("Cannot update an atom to empty content".to_string()); - } - - let source_url = if tool_args.get("source_url").is_some() { - parse_optional_string_arg(tool_args, "source_url") - } else { - existing.atom.source_url - }; - let published_at = if tool_args.get("published_at").is_some() { - parse_optional_string_arg(tool_args, "published_at") - } else { - existing.atom.published_at - }; - let tag_ids = tool_args - .get("tag_ids") - .and_then(|value| value.as_array().map(|_| parse_tag_ids_arg(tool_args))); - - let now = Utc::now().to_rfc3339(); - let request = crate::UpdateAtomRequest { - content: content.clone(), - source_url, - published_at, - tag_ids, - }; - let atom = storage - .update_atom_impl(atom_id, &request, &now) - .await - .map_err(|e| e.to_string())?; - if let Some(cache) = canvas_cache { - cache.invalidate(); - } - - enqueue_and_process_agent_pipeline( - storage, - atom_id, - "agent_update_atom", - external_settings, - canvas_cache, - on_embedding_event, - ) - .await?; - - Ok(Some(atom)) -} - -fn exact_match_range( - content: &str, - needle: &str, - edit_index: usize, -) -> Result<(usize, usize), String> { - if needle.is_empty() { - return Err(format!("Edit {} has empty anchor text", edit_index + 1)); - } - - let mut matches = content.match_indices(needle); - let Some((start, matched)) = matches.next() else { - return Err(format!( - "Edit {} anchor text was not found exactly once", - edit_index + 1 - )); - }; - if matches.next().is_some() { - return Err(format!( - "Edit {} anchor text matched more than once; use a more specific anchor", - edit_index + 1 - )); - } - - Ok((start, start + matched.len())) -} - -fn apply_atom_edits(content: &str, edits: &[serde_json::Value]) -> Result { - if edits.is_empty() { - return Err("At least one edit is required".to_string()); - } - - let mut updated = content.to_string(); - for (index, edit) in edits.iter().enumerate() { - let operation = edit["operation"].as_str().unwrap_or(""); - match operation { - "replace" => { - let old_text = edit["old_text"] - .as_str() - .ok_or_else(|| format!("Edit {} is missing old_text", index + 1))?; - let new_text = edit["new_text"] - .as_str() - .ok_or_else(|| format!("Edit {} is missing new_text", index + 1))?; - let (start, end) = exact_match_range(&updated, old_text, index)?; - updated.replace_range(start..end, new_text); - } - "insert_after" => { - let anchor_text = edit["anchor_text"] - .as_str() - .ok_or_else(|| format!("Edit {} is missing anchor_text", index + 1))?; - let text = edit["text"] - .as_str() - .ok_or_else(|| format!("Edit {} is missing text", index + 1))?; - let (_, end) = exact_match_range(&updated, anchor_text, index)?; - updated.insert_str(end, text); - } - "append" => { - let text = edit["text"] - .as_str() - .ok_or_else(|| format!("Edit {} is missing text", index + 1))?; - updated.push_str(text); - } - _ => { - return Err(format!( - "Edit {} has unsupported operation '{}'", - index + 1, - operation - )); - } +) { + let failure_callback = Arc::clone(&on_embedding_event); + tokio::spawn(async move { + let result = enqueue_and_process_agent_pipeline( + &storage, + &atom_id, + &reason, + external_settings, + canvas_cache.as_ref(), + on_embedding_event, + ) + .await; + + if let Err(error) = result { + tracing::warn!( + atom_id = %atom_id, + reason = %reason, + error = %error, + "Agent mutation pipeline failed" + ); + failure_callback(EmbeddingEvent::EmbeddingFailed { atom_id, error }); } - } - - Ok(updated) + }); } async fn execute_edit_atom( @@ -801,10 +670,14 @@ async fn execute_edit_atom( return Ok(None); }; - let edits = tool_args["edits"] - .as_array() - .ok_or_else(|| "edits must be an array".to_string())?; - let content = apply_atom_edits(&existing.atom.content, edits)?; + let edits: Vec = serde_json::from_value( + tool_args + .get("edits") + .cloned() + .ok_or_else(|| "edits must be an array".to_string())?, + ) + .map_err(|e| format!("edits must be valid edit operations: {}", e))?; + let content = apply_atom_edits(&existing.atom.content, &edits)?; if content == existing.atom.content { return Err("Edits did not change the atom content".to_string()); } @@ -812,35 +685,29 @@ async fn execute_edit_atom( return Err("Cannot update an atom to empty content".to_string()); } - let tag_ids = existing - .tags - .iter() - .map(|tag| tag.id.clone()) - .collect::>(); let request = crate::UpdateAtomRequest { content: content.clone(), source_url: existing.atom.source_url, published_at: existing.atom.published_at, - tag_ids: Some(tag_ids), + tag_ids: None, }; let now = Utc::now().to_rfc3339(); let atom = storage - .update_atom_impl(atom_id, &request, &now) + .update_atom_if_unchanged_impl(atom_id, &request, &now, &existing.atom.updated_at) .await .map_err(|e| e.to_string())?; if let Some(cache) = canvas_cache { cache.invalidate(); } - enqueue_and_process_agent_pipeline( - storage, - atom_id, - "agent_edit_atom", + enqueue_agent_pipeline_in_background( + storage.clone(), + atom_id.to_string(), + "agent_edit_atom".to_string(), external_settings, - canvas_cache, + canvas_cache.cloned(), on_embedding_event, - ) - .await?; + ); Ok(Some(atom)) } @@ -855,8 +722,8 @@ fn get_system_prompt(scope_description: &str) -> String { Guidelines: - Use search_atoms to find relevant information before answering, unless another available tool more directly addresses the user's request -- Only call create_atom, edit_atom, or update_atom when the user explicitly asks you to create or modify an atom -- Prefer edit_atom for targeted changes; use update_atom only for intentional full-content replacement +- Only call create_atom or edit_atom when the user explicitly asks you to create or modify an atom +- Prefer targeted edit_atom operations. Use replace_all only for intentional full-content replacement - When you create a new atom, include [[atom_id]] in the final response so the user can open it - If the initial search doesn't find enough, try different search queries - When you find relevant information, cite it using [N] notation where N is a sequential number @@ -1285,41 +1152,6 @@ async fn run_agent_loop( Err(e) => (format!("Error: {}", e), 0), } } - "update_atom" => { - match execute_update_atom( - &storage, - &tool_args, - external_settings.clone(), - canvas_cache, - Arc::clone(&on_embedding_event), - ) - .await - { - Ok(Some(atom)) => { - on_event(ChatEvent::AtomUpdated { - conversation_id: ctx.conversation_id.clone(), - atom: atom.clone(), - }); - ctx.citations.push(( - atom.atom.id.clone(), - None, - atom.atom.snippet.chars().take(200).collect(), - )); - ( - serde_json::to_string_pretty(&json!({ - "atom_id": atom.atom.id, - "title": atom.atom.title, - "snippet": atom.atom.snippet, - "reference": format!("[[{}]]", atom.atom.id), - })) - .unwrap_or_else(|_| atom.atom.id), - 1, - ) - } - Ok(None) => ("Atom not found".to_string(), 0), - Err(e) => (format!("Error: {}", e), 0), - } - } "edit_atom" => { match execute_edit_atom( &storage, @@ -1607,65 +1439,6 @@ where Ok(result) } -#[cfg(test)] -mod tests { - use super::apply_atom_edits; - use serde_json::json; - - #[test] - fn apply_atom_edits_supports_replace_insert_and_append() { - let edits = vec![ - json!({ - "operation": "replace", - "old_text": "old item", - "new_text": "new item", - }), - json!({ - "operation": "insert_after", - "anchor_text": "## Tasks\n", - "text": "\nIntro line\n", - }), - json!({ - "operation": "append", - "text": "\n\nDone.", - }), - ]; - - let updated = apply_atom_edits("# Note\n\n## Tasks\n- old item", &edits).unwrap(); - - assert_eq!( - updated, - "# Note\n\n## Tasks\n\nIntro line\n- new item\n\nDone." - ); - } - - #[test] - fn apply_atom_edits_rejects_missing_anchor() { - let edits = vec![json!({ - "operation": "replace", - "old_text": "missing", - "new_text": "replacement", - })]; - - let error = apply_atom_edits("content", &edits).unwrap_err(); - - assert!(error.contains("not found")); - } - - #[test] - fn apply_atom_edits_rejects_ambiguous_anchor() { - let edits = vec![json!({ - "operation": "insert_after", - "anchor_text": "same", - "text": "!", - })]; - - let error = apply_atom_edits("same and same", &edits).unwrap_err(); - - assert!(error.contains("matched more than once")); - } -} - /// Like `send_chat_message_with_settings` but with optional UI context for /// page-aware and canvas-aware tools. pub async fn send_chat_message_with_canvas( diff --git a/crates/atomic-core/src/atom_edit.rs b/crates/atomic-core/src/atom_edit.rs new file mode 100644 index 00000000..39124548 --- /dev/null +++ b/crates/atomic-core/src/atom_edit.rs @@ -0,0 +1,192 @@ +use serde::{Deserialize, Serialize}; + +/// A single markdown edit operation for an atom. +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +pub struct AtomEditOperation { + pub operation: String, + #[serde(default)] + pub old_text: Option, + #[serde(default)] + pub new_text: Option, + #[serde(default)] + pub anchor_text: Option, + #[serde(default)] + pub text: Option, + #[serde(default)] + pub content: Option, +} + +fn exact_match_range( + content: &str, + needle: &str, + edit_index: usize, +) -> Result<(usize, usize), String> { + if needle.is_empty() { + return Err(format!("Edit {} has empty anchor text", edit_index + 1)); + } + + let mut matches = content.match_indices(needle); + let Some((start, matched)) = matches.next() else { + return Err(format!( + "Edit {} anchor text was not found exactly once", + edit_index + 1 + )); + }; + if matches.next().is_some() { + return Err(format!( + "Edit {} anchor text matched more than once; use a more specific anchor", + edit_index + 1 + )); + } + + Ok((start, start + matched.len())) +} + +pub fn apply_atom_edits(content: &str, edits: &[AtomEditOperation]) -> Result { + if edits.is_empty() { + return Err("At least one edit is required".to_string()); + } + + let mut updated = content.to_string(); + for (index, edit) in edits.iter().enumerate() { + match edit.operation.as_str() { + "replace" => { + let old_text = edit + .old_text + .as_deref() + .ok_or_else(|| format!("Edit {} is missing old_text", index + 1))?; + let new_text = edit + .new_text + .as_deref() + .ok_or_else(|| format!("Edit {} is missing new_text", index + 1))?; + let (start, end) = exact_match_range(&updated, old_text, index)?; + updated.replace_range(start..end, new_text); + } + "insert_after" => { + let anchor_text = edit + .anchor_text + .as_deref() + .ok_or_else(|| format!("Edit {} is missing anchor_text", index + 1))?; + let text = edit + .text + .as_deref() + .ok_or_else(|| format!("Edit {} is missing text", index + 1))?; + let (_, end) = exact_match_range(&updated, anchor_text, index)?; + updated.insert_str(end, text); + } + "append" => { + let text = edit + .text + .as_deref() + .ok_or_else(|| format!("Edit {} is missing text", index + 1))?; + updated.push_str(text); + } + "replace_all" => { + let content = edit + .content + .as_deref() + .ok_or_else(|| format!("Edit {} is missing content", index + 1))?; + updated = content.to_string(); + } + operation => { + return Err(format!( + "Edit {} has unsupported operation '{}'", + index + 1, + operation + )); + } + } + } + + Ok(updated) +} + +#[cfg(test)] +mod tests { + use super::{apply_atom_edits, AtomEditOperation}; + + #[test] + fn apply_atom_edits_supports_replace_insert_and_append() { + let edits = vec![ + AtomEditOperation { + operation: "replace".to_string(), + old_text: Some("old item".to_string()), + new_text: Some("new item".to_string()), + anchor_text: None, + text: None, + content: None, + }, + AtomEditOperation { + operation: "insert_after".to_string(), + old_text: None, + new_text: None, + anchor_text: Some("## Tasks\n".to_string()), + text: Some("\nIntro line\n".to_string()), + content: None, + }, + AtomEditOperation { + operation: "append".to_string(), + old_text: None, + new_text: None, + anchor_text: None, + text: Some("\n\nDone.".to_string()), + content: None, + }, + ]; + + let updated = apply_atom_edits("# Note\n\n## Tasks\n- old item", &edits).unwrap(); + + assert_eq!( + updated, + "# Note\n\n## Tasks\n\nIntro line\n- new item\n\nDone." + ); + } + + #[test] + fn apply_atom_edits_supports_replace_all() { + let edits = vec![AtomEditOperation { + operation: "replace_all".to_string(), + old_text: None, + new_text: None, + anchor_text: None, + text: None, + content: Some("# Replacement\n\nFull body.".to_string()), + }]; + + let updated = apply_atom_edits("# Original\n\nOld body.", &edits).unwrap(); + + assert_eq!(updated, "# Replacement\n\nFull body."); + } + + #[test] + fn apply_atom_edits_rejects_missing_anchor() { + let edits = vec![AtomEditOperation { + operation: "replace".to_string(), + old_text: Some("missing".to_string()), + new_text: Some("replacement".to_string()), + anchor_text: None, + text: None, + content: None, + }]; + + let error = apply_atom_edits("content", &edits).unwrap_err(); + + assert!(error.contains("not found")); + } + + #[test] + fn apply_atom_edits_rejects_ambiguous_anchor() { + let edits = vec![AtomEditOperation { + operation: "insert_after".to_string(), + old_text: None, + new_text: None, + anchor_text: Some("same".to_string()), + text: Some("!".to_string()), + content: None, + }]; + + let error = apply_atom_edits("same and same", &edits).unwrap_err(); + + assert!(error.contains("matched more than once")); + } +} diff --git a/crates/atomic-core/src/lib.rs b/crates/atomic-core/src/lib.rs index 7a599f33..63f3ef9c 100644 --- a/crates/atomic-core/src/lib.rs +++ b/crates/atomic-core/src/lib.rs @@ -28,6 +28,7 @@ //! ``` pub mod agent; +pub mod atom_edit; pub(crate) mod atom_links; pub mod briefing; pub mod canvas_level; @@ -59,6 +60,7 @@ pub mod wiki; // Re-exports for convenience pub use agent::{CanvasClusterSummary, CanvasContext, ChatEvent, PageContext}; +pub use atom_edit::{apply_atom_edits, AtomEditOperation}; pub use briefing::{ Briefing, BriefingCitation, BriefingFrequency, BriefingSchedule, BriefingScheduleStatus, BriefingWeekday, BriefingWithCitations, @@ -936,6 +938,7 @@ impl AtomicCore { tag_requested: true, not_before: None, reason: "create_atom".to_string(), + replace_existing: false, }; self.storage.enqueue_pipeline_jobs_sync(&[job]).await?; self.process_queued_pipeline_jobs(on_event).await?; @@ -1025,6 +1028,7 @@ impl AtomicCore { tag_requested: true, not_before: None, reason: "create_atoms_bulk".to_string(), + replace_existing: false, }) .collect(); self.storage.enqueue_pipeline_jobs_sync(&jobs).await?; @@ -1060,6 +1064,41 @@ impl AtomicCore { tag_requested: true, not_before: None, reason: "update_atom".to_string(), + replace_existing: false, + }; + self.storage.enqueue_pipeline_jobs_sync(&[job]).await?; + self.process_queued_pipeline_jobs(on_event).await?; + + Ok(atom_with_tags) + } + + /// Update an atom only if its `updated_at` still matches the caller's + /// previously-read value, then trigger re-embedding and re-tagging. + pub async fn update_atom_if_unchanged( + &self, + id: &str, + request: UpdateAtomRequest, + expected_updated_at: &str, + on_event: F, + ) -> Result + where + F: Fn(EmbeddingEvent) + Send + Sync + 'static, + { + let now = Utc::now().to_rfc3339(); + + let atom_with_tags = self + .storage + .update_atom_if_unchanged_impl(id, &request, &now, expected_updated_at) + .await?; + self.canvas_cache.invalidate(); + + let job = AtomPipelineJobRequest { + atom_id: id.to_string(), + embed_requested: true, + tag_requested: true, + not_before: None, + reason: "update_atom_if_unchanged".to_string(), + replace_existing: false, }; self.storage.enqueue_pipeline_jobs_sync(&[job]).await?; self.process_queued_pipeline_jobs(on_event).await?; @@ -1986,6 +2025,7 @@ impl AtomicCore { tag_requested: tagging_status == "pending", not_before: None, reason: "retry_embedding".to_string(), + replace_existing: false, }; self.storage.enqueue_pipeline_jobs_sync(&[job]).await?; self.process_queued_pipeline_jobs(on_event).await?; @@ -2013,6 +2053,7 @@ impl AtomicCore { tag_requested: false, not_before: None, reason: "spawn_reembed_pending".to_string(), + replace_existing: true, }) .collect(); self.storage.enqueue_pipeline_jobs_sync(&jobs).await?; @@ -2043,6 +2084,7 @@ impl AtomicCore { tag_requested: false, not_before: None, reason: "reembed_all_atoms".to_string(), + replace_existing: true, }) .collect(); self.storage.enqueue_pipeline_jobs_sync(&jobs).await?; @@ -2078,6 +2120,7 @@ impl AtomicCore { tag_requested: true, not_before: None, reason: "retag_all_atoms".to_string(), + replace_existing: false, }) .collect(); self.storage.enqueue_pipeline_jobs_sync(&jobs).await?; @@ -2121,6 +2164,7 @@ impl AtomicCore { tag_requested: true, not_before: None, reason: "retry_tagging".to_string(), + replace_existing: false, }; self.storage.enqueue_pipeline_jobs_sync(&[job]).await?; embedding::process_queued_pipeline_jobs_with_settings( @@ -2724,6 +2768,7 @@ impl AtomicCore { tag_requested: true, not_before: None, reason: "process_atom_pipeline".to_string(), + replace_existing: false, }; self.storage.enqueue_pipeline_jobs_sync(&[job]).await?; self.process_queued_pipeline_jobs(on_event).await?; @@ -3207,6 +3252,7 @@ impl AtomicCore { tag_requested: true, not_before: None, reason: "import_markdown".to_string(), + replace_existing: false, }) .collect(); self.canvas_cache.invalidate(); diff --git a/crates/atomic-core/src/models.rs b/crates/atomic-core/src/models.rs index 642b5371..3a9fd81c 100644 --- a/crates/atomic-core/src/models.rs +++ b/crates/atomic-core/src/models.rs @@ -862,6 +862,7 @@ pub struct AtomPipelineJobRequest { pub tag_requested: bool, pub not_before: Option, pub reason: String, + pub replace_existing: bool, } /// Existing chunk content reused by embed-only re-embedding. diff --git a/crates/atomic-core/src/storage/mod.rs b/crates/atomic-core/src/storage/mod.rs index 81dfc2de..59e3290d 100644 --- a/crates/atomic-core/src/storage/mod.rs +++ b/crates/atomic-core/src/storage/mod.rs @@ -325,6 +325,8 @@ dispatch! { => sqlite: insert_atoms_bulk_impl, pg_trait: AtomStore, pg_method: insert_atoms_bulk; fn update_atom_impl(&self, id: &str, request: &UpdateAtomRequest, updated_at: &str) -> Result => sqlite: update_atom_impl, pg_trait: AtomStore, pg_method: update_atom; + fn update_atom_if_unchanged_impl(&self, id: &str, request: &UpdateAtomRequest, updated_at: &str, expected_updated_at: &str) -> Result + => sqlite: update_atom_if_unchanged_impl, pg_trait: AtomStore, pg_method: update_atom_if_unchanged; fn update_atom_content_only_impl(&self, id: &str, request: &UpdateAtomRequest, updated_at: &str) -> Result => sqlite: update_atom_content_only_impl, pg_trait: AtomStore, pg_method: update_atom_content_only; fn delete_atom_impl(&self, id: &str) -> Result<(), AtomicCoreError> @@ -592,22 +594,6 @@ dispatch! { fn get_clusters_sync(&self) -> Result, AtomicCoreError> => sqlite: get_clusters_sync, pg_trait: ClusterStore, pg_method: get_clusters; - // ---- DatabaseStore ---- - fn list_databases_sync(&self) -> Result, AtomicCoreError> - => sqlite: list_databases_sync, pg_trait: DatabaseStore, pg_method: list_databases; - fn create_database_sync(&self, name: &str) -> Result - => sqlite: create_database_sync, pg_trait: DatabaseStore, pg_method: create_database; - fn rename_database_sync(&self, id: &str, name: &str) -> Result<(), AtomicCoreError> - => sqlite: rename_database_sync, pg_trait: DatabaseStore, pg_method: rename_database; - fn delete_database_sync(&self, id: &str) -> Result<(), AtomicCoreError> - => sqlite: delete_database_sync, pg_trait: DatabaseStore, pg_method: delete_database; - fn get_default_database_id_sync(&self) -> Result - => sqlite: get_default_database_id_sync, pg_trait: DatabaseStore, pg_method: get_default_database_id; - fn set_default_database_sync(&self, id: &str) -> Result<(), AtomicCoreError> - => sqlite: set_default_database_sync, pg_trait: DatabaseStore, pg_method: set_default_database; - fn purge_database_data_sync(&self, db_id: &str) -> Result<(), AtomicCoreError> - => sqlite: purge_database_data_sync, pg_trait: DatabaseStore, pg_method: purge_database_data; - // ---- SettingsStore ---- fn get_all_settings_sync(&self) -> Result, AtomicCoreError> => sqlite: get_all_settings_sync, pg_trait: SettingsStore, pg_method: get_all_settings; @@ -634,3 +620,93 @@ dispatch! { fn ensure_default_token_sync(&self) -> Result, AtomicCoreError> => sqlite: ensure_default_token_sync, pg_trait: TokenStore, pg_method: ensure_default_token; } + +#[cfg(feature = "postgres")] +impl StorageBackend { + pub(crate) async fn list_databases_sync( + &self, + ) -> Result, AtomicCoreError> { + match self { + StorageBackend::Sqlite(s) => ::list_databases(s).await, + StorageBackend::Postgres(s) => { + ::list_databases(s).await + } + } + } + + pub(crate) async fn create_database_sync( + &self, + name: &str, + ) -> Result { + match self { + StorageBackend::Sqlite(s) => { + ::create_database(s, name).await + } + StorageBackend::Postgres(s) => { + ::create_database(s, name).await + } + } + } + + pub(crate) async fn rename_database_sync( + &self, + id: &str, + name: &str, + ) -> Result<(), AtomicCoreError> { + match self { + StorageBackend::Sqlite(s) => { + ::rename_database(s, id, name).await + } + StorageBackend::Postgres(s) => { + ::rename_database(s, id, name).await + } + } + } + + pub(crate) async fn delete_database_sync(&self, id: &str) -> Result<(), AtomicCoreError> { + match self { + StorageBackend::Sqlite(s) => { + ::delete_database(s, id).await + } + StorageBackend::Postgres(s) => { + ::delete_database(s, id).await + } + } + } + + pub(crate) async fn get_default_database_id_sync(&self) -> Result { + match self { + StorageBackend::Sqlite(s) => { + ::get_default_database_id(s).await + } + StorageBackend::Postgres(s) => { + ::get_default_database_id(s).await + } + } + } + + pub(crate) async fn set_default_database_sync(&self, id: &str) -> Result<(), AtomicCoreError> { + match self { + StorageBackend::Sqlite(s) => { + ::set_default_database(s, id).await + } + StorageBackend::Postgres(s) => { + ::set_default_database(s, id).await + } + } + } + + pub(crate) async fn purge_database_data_sync( + &self, + db_id: &str, + ) -> Result<(), AtomicCoreError> { + match self { + StorageBackend::Sqlite(s) => { + ::purge_database_data(s, db_id).await + } + StorageBackend::Postgres(s) => { + ::purge_database_data(s, db_id).await + } + } + } +} diff --git a/crates/atomic-core/src/storage/postgres/atoms.rs b/crates/atomic-core/src/storage/postgres/atoms.rs index 1f8d843d..13cc50a2 100644 --- a/crates/atomic-core/src/storage/postgres/atoms.rs +++ b/crates/atomic-core/src/storage/postgres/atoms.rs @@ -573,6 +573,129 @@ impl AtomStore for PostgresStorage { Ok(AtomWithTags { atom, tags }) } + async fn update_atom_if_unchanged( + &self, + id: &str, + request: &UpdateAtomRequest, + updated_at: &str, + expected_updated_at: &str, + ) -> StorageResult { + let (title, snippet) = extract_title_and_snippet(&request.content, 300); + let source = request.source_url.as_deref().map(parse_source); + + let mut tx = self + .pool + .begin() + .await + .map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?; + + let result = sqlx::query( + "UPDATE atoms + SET content = $1, + source_url = $2, + source = $3, + published_at = $4, + updated_at = $5, + embedding_status = 'pending', + tagging_status = 'pending', + embedding_error = NULL, + tagging_error = NULL, + title = $6, + snippet = $7 + WHERE id = $8 AND db_id = $9 AND updated_at = $10", + ) + .bind(&request.content) + .bind(&request.source_url) + .bind(&source) + .bind(&request.published_at) + .bind(updated_at) + .bind(&title) + .bind(&snippet) + .bind(id) + .bind(&self.db_id) + .bind(expected_updated_at) + .execute(&mut *tx) + .await + .map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?; + + if result.rows_affected() == 0 { + let current: Option = + sqlx::query_scalar("SELECT updated_at FROM atoms WHERE id = $1 AND db_id = $2") + .bind(id) + .bind(&self.db_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?; + + return match current { + Some(_) => Err(AtomicCoreError::Conflict(format!( + "Atom {} changed before edits could be saved; reload the atom and retry", + id + ))), + None => Err(AtomicCoreError::NotFound(format!("Atom {}", id))), + }; + } + + self.replace_atom_links_for_content(&mut tx, id, &request.content, updated_at) + .await?; + + if let Some(ref tag_ids) = request.tag_ids { + sqlx::query("DELETE FROM atom_tags WHERE atom_id = $1 AND db_id = $2") + .bind(id) + .bind(&self.db_id) + .execute(&mut *tx) + .await + .map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?; + + for tag_id in tag_ids { + sqlx::query("INSERT INTO atom_tags (atom_id, tag_id, db_id, source) VALUES ($1, $2, $3, 'manual')") + .bind(id) + .bind(tag_id) + .bind(&self.db_id) + .execute(&mut *tx) + .await + .map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?; + } + } + + tx.commit() + .await + .map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?; + + let row: ( + String, + String, + String, + String, + Option, + Option, + Option, + String, + String, + String, + String, + Option, + Option, + ) = sqlx::query_as( + "SELECT id, content, title, snippet, source_url, source, published_at, + created_at, updated_at, + COALESCE(embedding_status, 'pending'), + COALESCE(tagging_status, 'pending'), + embedding_error, tagging_error + FROM atoms WHERE id = $1 AND db_id = $2", + ) + .bind(id) + .bind(&self.db_id) + .fetch_one(&self.pool) + .await + .map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?; + + let atom = Self::atom_from_tuple(row); + let tags = self.tags_for_atom(id).await?; + + Ok(AtomWithTags { atom, tags }) + } + async fn update_atom_content_only( &self, id: &str, diff --git a/crates/atomic-core/src/storage/postgres/chunks.rs b/crates/atomic-core/src/storage/postgres/chunks.rs index 5baecee6..6a3e9b95 100644 --- a/crates/atomic-core/src/storage/postgres/chunks.rs +++ b/crates/atomic-core/src/storage/postgres/chunks.rs @@ -1178,8 +1178,14 @@ impl ChunkStore for PostgresStorage { FROM atoms WHERE id = $1 AND db_id = $2 ON CONFLICT(atom_id, db_id) DO UPDATE SET - embed_requested = atom_pipeline_jobs.embed_requested OR EXCLUDED.embed_requested, - tag_requested = atom_pipeline_jobs.tag_requested OR EXCLUDED.tag_requested, + embed_requested = CASE + WHEN $8 THEN EXCLUDED.embed_requested + ELSE atom_pipeline_jobs.embed_requested OR EXCLUDED.embed_requested + END, + tag_requested = CASE + WHEN $8 THEN EXCLUDED.tag_requested + ELSE atom_pipeline_jobs.tag_requested OR EXCLUDED.tag_requested + END, reason = EXCLUDED.reason, not_before = LEAST(atom_pipeline_jobs.not_before, EXCLUDED.not_before), state = 'pending', @@ -1195,13 +1201,11 @@ impl ChunkStore for PostgresStorage { .bind(&job.reason) .bind(not_before) .bind(&now) + .bind(job.replace_existing) .execute(&mut *tx) .await .map_err(|e| { - AtomicCoreError::DatabaseOperation(format!( - "Failed to enqueue pipeline job: {}", - e - )) + AtomicCoreError::DatabaseOperation(format!("Failed to enqueue pipeline job: {}", e)) })?; count += result.rows_affected() as i32; } diff --git a/crates/atomic-core/src/storage/sqlite/atoms.rs b/crates/atomic-core/src/storage/sqlite/atoms.rs index 69166137..c5902fa3 100644 --- a/crates/atomic-core/src/storage/sqlite/atoms.rs +++ b/crates/atomic-core/src/storage/sqlite/atoms.rs @@ -356,7 +356,17 @@ impl SqliteStorage { request: &UpdateAtomRequest, updated_at: &str, ) -> StorageResult { - self.update_atom_inner(id, request, updated_at, true) + self.update_atom_inner(id, request, updated_at, true, None) + } + + pub(crate) fn update_atom_if_unchanged_impl( + &self, + id: &str, + request: &UpdateAtomRequest, + updated_at: &str, + expected_updated_at: &str, + ) -> StorageResult { + self.update_atom_inner(id, request, updated_at, true, Some(expected_updated_at)) } /// Content-only update: saves content/metadata but does NOT reset embedding_status. @@ -367,7 +377,7 @@ impl SqliteStorage { request: &UpdateAtomRequest, updated_at: &str, ) -> StorageResult { - self.update_atom_inner(id, request, updated_at, false) + self.update_atom_inner(id, request, updated_at, false, None) } fn update_atom_inner( @@ -376,6 +386,7 @@ impl SqliteStorage { request: &UpdateAtomRequest, updated_at: &str, reset_embedding_status: bool, + expected_updated_at: Option<&str>, ) -> StorageResult { let (title, snippet) = extract_title_and_snippet(&request.content, 300); let source = request.source_url.as_deref().map(parse_source); @@ -390,6 +401,25 @@ impl SqliteStorage { conn.execute_batch("BEGIN")?; if let Err(e) = (|| -> Result<(), AtomicCoreError> { + if let Some(expected) = expected_updated_at { + let current: Option = conn + .query_row("SELECT updated_at FROM atoms WHERE id = ?1", [id], |row| { + row.get(0) + }) + .optional()?; + + match current { + Some(current) if current == expected => {} + Some(_) => { + return Err(AtomicCoreError::Conflict(format!( + "Atom {} changed before edits could be saved; reload the atom and retry", + id + ))); + } + None => return Err(AtomicCoreError::NotFound(format!("Atom {}", id))), + } + } + let content_changed = if reset_embedding_status { true } else { @@ -403,33 +433,73 @@ impl SqliteStorage { if reset_embedding_status { atoms_fts_delete(&conn, id)?; - conn.execute( - "UPDATE atoms - SET content = ?1, - source_url = ?2, - source = ?3, - published_at = ?4, - updated_at = ?5, - embedding_status = ?6, - tagging_status = ?7, - embedding_error = NULL, - tagging_error = NULL, - title = ?8, - snippet = ?9 - WHERE id = ?10", - ( - &request.content, - &request.source_url, - &source, - &request.published_at, - updated_at, - "pending", - "pending", - &title, - &snippet, - id, - ), - )?; + let changed = if let Some(expected) = expected_updated_at { + conn.execute( + "UPDATE atoms + SET content = ?1, + source_url = ?2, + source = ?3, + published_at = ?4, + updated_at = ?5, + embedding_status = ?6, + tagging_status = ?7, + embedding_error = NULL, + tagging_error = NULL, + title = ?8, + snippet = ?9 + WHERE id = ?10 AND updated_at = ?11", + ( + &request.content, + &request.source_url, + &source, + &request.published_at, + updated_at, + "pending", + "pending", + &title, + &snippet, + id, + expected, + ), + )? + } else { + conn.execute( + "UPDATE atoms + SET content = ?1, + source_url = ?2, + source = ?3, + published_at = ?4, + updated_at = ?5, + embedding_status = ?6, + tagging_status = ?7, + embedding_error = NULL, + tagging_error = NULL, + title = ?8, + snippet = ?9 + WHERE id = ?10", + ( + &request.content, + &request.source_url, + &source, + &request.published_at, + updated_at, + "pending", + "pending", + &title, + &snippet, + id, + ), + )? + }; + if changed == 0 { + return match expected_updated_at { + Some(_) => Err(AtomicCoreError::Conflict(format!( + "Atom {} changed before edits could be saved; reload the atom and retry", + id + ))), + None => Err(AtomicCoreError::NotFound(format!("Atom {}", id))), + }; + } atoms_fts_insert(&conn, id)?; replace_atom_links_for_content(&conn, id, &request.content, updated_at)?; } else { @@ -1403,6 +1473,16 @@ impl AtomStore for SqliteStorage { self.update_atom_impl(id, request, updated_at) } + async fn update_atom_if_unchanged( + &self, + id: &str, + request: &UpdateAtomRequest, + updated_at: &str, + expected_updated_at: &str, + ) -> StorageResult { + self.update_atom_if_unchanged_impl(id, request, updated_at, expected_updated_at) + } + async fn update_atom_content_only( &self, id: &str, diff --git a/crates/atomic-core/src/storage/sqlite/chunks.rs b/crates/atomic-core/src/storage/sqlite/chunks.rs index f2954d19..0f2792ab 100644 --- a/crates/atomic-core/src/storage/sqlite/chunks.rs +++ b/crates/atomic-core/src/storage/sqlite/chunks.rs @@ -839,8 +839,14 @@ impl SqliteStorage { FROM atoms WHERE id = ?1 ON CONFLICT(atom_id) DO UPDATE SET - embed_requested = MAX(atom_pipeline_jobs.embed_requested, excluded.embed_requested), - tag_requested = MAX(atom_pipeline_jobs.tag_requested, excluded.tag_requested), + embed_requested = CASE + WHEN ?7 THEN excluded.embed_requested + ELSE MAX(atom_pipeline_jobs.embed_requested, excluded.embed_requested) + END, + tag_requested = CASE + WHEN ?7 THEN excluded.tag_requested + ELSE MAX(atom_pipeline_jobs.tag_requested, excluded.tag_requested) + END, reason = excluded.reason, not_before = MIN(atom_pipeline_jobs.not_before, excluded.not_before), state = 'pending', @@ -855,6 +861,7 @@ impl SqliteStorage { &job.reason, not_before, &now, + job.replace_existing, ], )?; count += changed as i32; @@ -1449,6 +1456,7 @@ mod tests { tag_requested: true, not_before: None, reason: "initial".to_string(), + replace_existing: false, }; core.storage() .enqueue_pipeline_jobs_sync(&[initial_job]) @@ -1547,6 +1555,7 @@ mod tests { tag_requested: true, not_before: None, reason: "newer".to_string(), + replace_existing: false, }; core.storage() .enqueue_pipeline_jobs_sync(&[newer_job]) @@ -1569,6 +1578,59 @@ mod tests { assert_eq!(row.1, newer_updated_at); } + #[tokio::test] + async fn replacement_pipeline_job_overrides_stale_stage_flags() { + let dir = TempDir::new().expect("create tempdir"); + let core = AtomicCore::open_or_create(dir.path().join("pipeline.db")) + .expect("open sqlite test db"); + let created = core + .create_atom( + CreateAtomRequest { + content: String::new(), + ..Default::default() + }, + |_| {}, + ) + .await + .expect("create atom") + .expect("atom inserted"); + + let original_claim = enqueue_and_claim_pipeline_job(&core, &created.atom.id).await; + assert_eq!(original_claim.len(), 1); + assert!(original_claim[0].tag_requested); + + let replacement_job = crate::models::AtomPipelineJobRequest { + atom_id: created.atom.id.clone(), + embed_requested: true, + tag_requested: false, + not_before: None, + reason: "reembed_all_atoms".to_string(), + replace_existing: true, + }; + core.storage() + .enqueue_pipeline_jobs_sync(&[replacement_job]) + .await + .expect("enqueue replacement job"); + + core.storage() + .clear_pipeline_jobs_sync(&original_claim) + .await + .expect("clear original claim should not delete replacement job"); + + let claimed = core + .storage() + .claim_pipeline_jobs_sync(10, "2099-01-01T00:31:00+00:00", "2099-01-01T00:01:00+00:00") + .await + .expect("claim replacement job"); + + assert_eq!(claimed.len(), 1); + assert!(claimed[0].embed_requested); + assert!( + !claimed[0].tag_requested, + "replacement re-embed job should not preserve stale tagging request" + ); + } + #[tokio::test] async fn status_backfill_requeues_newer_revision_during_active_lease() { let dir = TempDir::new().expect("create tempdir"); diff --git a/crates/atomic-core/src/storage/sqlite/settings.rs b/crates/atomic-core/src/storage/sqlite/settings.rs index f8a98e4c..c3c9c567 100644 --- a/crates/atomic-core/src/storage/sqlite/settings.rs +++ b/crates/atomic-core/src/storage/sqlite/settings.rs @@ -129,78 +129,44 @@ impl SqliteStorage { // ==================== DatabaseStore ==================== -impl SqliteStorage { - pub(crate) fn list_databases_sync(&self) -> StorageResult> { +#[async_trait] +impl DatabaseStore for SqliteStorage { + async fn list_databases(&self) -> StorageResult> { Err(AtomicCoreError::Configuration( "Database management not available on SQLite storage backend".to_string(), )) } - pub(crate) fn create_database_sync( - &self, - _name: &str, - ) -> StorageResult { + async fn create_database(&self, _name: &str) -> StorageResult { Err(AtomicCoreError::Configuration( "Database management not available on SQLite storage backend".to_string(), )) } - pub(crate) fn rename_database_sync(&self, _id: &str, _name: &str) -> StorageResult<()> { + async fn rename_database(&self, _id: &str, _name: &str) -> StorageResult<()> { Err(AtomicCoreError::Configuration( "Database management not available on SQLite storage backend".to_string(), )) } - pub(crate) fn delete_database_sync(&self, _id: &str) -> StorageResult<()> { + async fn delete_database(&self, _id: &str) -> StorageResult<()> { Err(AtomicCoreError::Configuration( "Database management not available on SQLite storage backend".to_string(), )) } - pub(crate) fn get_default_database_id_sync(&self) -> StorageResult { + async fn get_default_database_id(&self) -> StorageResult { Err(AtomicCoreError::Configuration( "Database management not available on SQLite storage backend".to_string(), )) } - pub(crate) fn set_default_database_sync(&self, _id: &str) -> StorageResult<()> { + async fn set_default_database(&self, _id: &str) -> StorageResult<()> { Err(AtomicCoreError::Configuration( "Database management not available on SQLite storage backend".to_string(), )) } - pub(crate) fn purge_database_data_sync(&self, _db_id: &str) -> StorageResult<()> { - // SQLite uses separate .db files — no shared tables to purge. - Ok(()) - } -} - -#[async_trait] -impl DatabaseStore for SqliteStorage { - async fn list_databases(&self) -> StorageResult> { - self.list_databases_sync() - } - - async fn create_database(&self, name: &str) -> StorageResult { - self.create_database_sync(name) - } - - async fn rename_database(&self, id: &str, name: &str) -> StorageResult<()> { - self.rename_database_sync(id, name) - } - - async fn delete_database(&self, id: &str) -> StorageResult<()> { - self.delete_database_sync(id) - } - - async fn get_default_database_id(&self) -> StorageResult { - self.get_default_database_id_sync() - } - - async fn set_default_database(&self, id: &str) -> StorageResult<()> { - self.set_default_database_sync(id) - } - async fn purge_database_data(&self, _db_id: &str) -> StorageResult<()> { // SQLite uses separate .db files per database — no shared tables to purge. Ok(()) diff --git a/crates/atomic-core/src/storage/traits.rs b/crates/atomic-core/src/storage/traits.rs index fddbe49b..ffff021b 100644 --- a/crates/atomic-core/src/storage/traits.rs +++ b/crates/atomic-core/src/storage/traits.rs @@ -55,6 +55,15 @@ pub trait AtomStore: Send + Sync { updated_at: &str, ) -> StorageResult; + /// Update an atom only if it has not changed since the caller read it. + async fn update_atom_if_unchanged( + &self, + id: &str, + request: &UpdateAtomRequest, + updated_at: &str, + expected_updated_at: &str, + ) -> StorageResult; + /// Update atom content/metadata without resetting embedding status. /// Used by auto-save during inline editing. Defaults to regular update_atom. async fn update_atom_content_only( diff --git a/crates/atomic-core/tests/storage_tests.rs b/crates/atomic-core/tests/storage_tests.rs index 2df2eeac..f2d063da 100644 --- a/crates/atomic-core/tests/storage_tests.rs +++ b/crates/atomic-core/tests/storage_tests.rs @@ -15,7 +15,7 @@ use atomic_core::db::Database; use atomic_core::models::*; use atomic_core::storage::traits::*; use atomic_core::storage::SqliteStorage; -use atomic_core::{CreateAtomRequest, ListAtomsParams, UpdateAtomRequest}; +use atomic_core::{AtomicCoreError, CreateAtomRequest, ListAtomsParams, UpdateAtomRequest}; use std::sync::Arc; use tempfile::TempDir; @@ -130,6 +130,48 @@ async fn test_update_atom(storage: &dyn AtomStore) { assert_eq!(fetched.atom.content, "Updated content"); } +async fn test_update_atom_if_unchanged_rejects_stale_write(storage: &dyn AtomStore) { + let id = uuid::Uuid::new_v4().to_string(); + let created_at = "2024-01-01T00:00:00Z"; + storage + .insert_atom( + &id, + &CreateAtomRequest { + content: "Original content".to_string(), + ..Default::default() + }, + created_at, + ) + .await + .unwrap(); + + let update = UpdateAtomRequest { + content: "First edit".to_string(), + source_url: None, + published_at: None, + tag_ids: None, + }; + storage + .update_atom_if_unchanged(&id, &update, "2024-01-02T00:00:00Z", created_at) + .await + .unwrap(); + + let stale_update = UpdateAtomRequest { + content: "Stale edit".to_string(), + source_url: None, + published_at: None, + tag_ids: None, + }; + let error = storage + .update_atom_if_unchanged(&id, &stale_update, "2024-01-03T00:00:00Z", created_at) + .await + .unwrap_err(); + + assert!(matches!(error, AtomicCoreError::Conflict(_))); + let fetched = storage.get_atom(&id).await.unwrap().unwrap(); + assert_eq!(fetched.atom.content, "First edit"); +} + async fn test_atom_links_materialized(storage: &dyn AtomStore) { let target_id = uuid::Uuid::new_v4().to_string(); let source_id = uuid::Uuid::new_v4().to_string(); @@ -521,6 +563,12 @@ async fn sqlite_update_atom() { test_update_atom(&s).await; } +#[tokio::test] +async fn sqlite_update_atom_if_unchanged_rejects_stale_write() { + let (s, _dir) = sqlite_storage().await; + test_update_atom_if_unchanged_rejects_stale_write(&s).await; +} + #[tokio::test] async fn sqlite_atom_links_materialized() { let (s, _dir) = sqlite_storage().await; @@ -632,6 +680,10 @@ mod postgres_tests { pg_test!(pg_get_atom_not_found, test_get_atom_not_found); pg_test!(pg_delete_atom, test_delete_atom); pg_test!(pg_update_atom, test_update_atom); + pg_test!( + pg_update_atom_if_unchanged_rejects_stale_write, + test_update_atom_if_unchanged_rejects_stale_write + ); pg_test!(pg_atom_links_materialized, test_atom_links_materialized); pg_test!( pg_atom_links_replaced_on_update, diff --git a/crates/atomic-core/tests/support/mod.rs b/crates/atomic-core/tests/support/mod.rs index 652a4848..074579cf 100644 --- a/crates/atomic-core/tests/support/mod.rs +++ b/crates/atomic-core/tests/support/mod.rs @@ -345,21 +345,23 @@ pub fn event_collector() -> ( (cb, rx) } -/// Wait until both `EmbeddingComplete` and a terminal tagging event -/// (`TaggingComplete` / `TaggingSkipped` / `TaggingFailed`) have fired for -/// `atom_id`. Returns the captured events so tests can assert on payloads. +/// Wait until both `EmbeddingComplete`, a terminal tagging event +/// (`TaggingComplete` / `TaggingSkipped` / `TaggingFailed`), and the owning +/// queue run's completion have fired. Returns the captured target-atom events +/// so tests can assert on payloads. pub async fn await_pipeline(rx: &mut EventRx, atom_id: &str) -> Vec { use atomic_core::EmbeddingEvent; let mut captured = Vec::new(); let mut embedding_done = false; let mut tagging_done = false; + let mut queue_done = false; // A generous budget — the mock responds instantly, but CI runners can // stall under load. Fails loudly instead of hanging forever. let deadline = tokio::time::Instant::now() + Duration::from_secs(15); - while !(embedding_done && tagging_done) { + while !(embedding_done && tagging_done && queue_done) { let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); if remaining.is_zero() { panic!( @@ -386,8 +388,11 @@ pub async fn await_pipeline(rx: &mut EventRx, atom_id: &str) -> Vec id == atom_id, EmbeddingEvent::BatchProgress { .. } | EmbeddingEvent::PipelineQueueStarted { .. } - | EmbeddingEvent::PipelineQueueProgress { .. } - | EmbeddingEvent::PipelineQueueCompleted { .. } => false, + | EmbeddingEvent::PipelineQueueProgress { .. } => false, + EmbeddingEvent::PipelineQueueCompleted { .. } => { + queue_done = true; + false + } }; if matches_target { diff --git a/crates/atomic-server/src/mcp/server.rs b/crates/atomic-server/src/mcp/server.rs index 90e6e308..5de96f37 100644 --- a/crates/atomic-server/src/mcp/server.rs +++ b/crates/atomic-server/src/mcp/server.rs @@ -3,6 +3,7 @@ use crate::mcp::types::*; use crate::state::ServerEvent; use atomic_core::manager::DatabaseManager; use atomic_core::AtomicCore; +use atomic_core::{apply_atom_edits, AtomEditOperation}; use rmcp::{ handler::server::tool::ToolRouter, handler::server::wrapper::Parameters, @@ -263,9 +264,9 @@ impl AtomicMcpServer { Ok(CallToolResult::success(vec![Content::text(response_text)])) } - /// Update an existing atom's content + /// Update an existing atom with optional full-content, metadata, or tag changes #[tool( - description = "Revise an existing atom. Use this when you find an atom with outdated or incomplete information instead of creating a duplicate. Search first to find the atom to update." + description = "Compatibility full/partial atom update. Omitted fields are preserved. Content is optional so callers can update metadata or tag_ids without rewriting markdown. Prefer edit_atom for content changes unless replacing the whole atom intentionally." )] async fn update_atom( &self, @@ -274,9 +275,8 @@ impl AtomicMcpServer { ) -> Result { let core = self.resolve_core(&context).await?; - // Verify the atom exists first - match core.get_atom(¶ms.atom_id).await { - Ok(Some(_)) => {} + let existing = match core.get_atom(¶ms.atom_id).await { + Ok(Some(atom)) => atom, Ok(None) => { return Ok(CallToolResult::success(vec![Content::text(format!( "Atom not found: {}", @@ -284,19 +284,124 @@ impl AtomicMcpServer { ))])); } Err(e) => return Err(ErrorData::internal_error(e.to_string(), None)), + }; + + if params.content.is_none() + && params.source_url.is_none() + && params.published_at.is_none() + && params.tag_ids.is_none() + { + return Ok(CallToolResult::success(vec![Content::text( + "No update fields provided".to_string(), + )])); } + let content = match params.content { + Some(content) => { + if content.trim().is_empty() { + return Ok(CallToolResult::success(vec![Content::text( + "Cannot update an atom to empty content".to_string(), + )])); + } + content + } + None => existing.atom.content.clone(), + }; + + let source_url = params.source_url.or(existing.atom.source_url.clone()); + let published_at = params.published_at.or(existing.atom.published_at.clone()); + let request = atomic_core::UpdateAtomRequest { - content: params.content, - source_url: params.source_url, - published_at: None, + content, + source_url, + published_at, + tag_ids: params.tag_ids, + }; + + let on_event = embedding_event_callback(self.event_tx.clone()); + + let result = core + .update_atom_if_unchanged( + ¶ms.atom_id, + request, + &existing.atom.updated_at, + on_event, + ) + .await + .map_err(|e| ErrorData::internal_error(e.to_string(), None))?; + + let response = AtomResponse { + atom_id: result.atom.id.clone(), + content_preview: result.atom.content.chars().take(200).collect(), + tags: result.tags.iter().map(|t| t.name.clone()).collect(), + embedding_status: result.atom.embedding_status.clone(), + }; + + let response_text = serde_json::to_string_pretty(&response) + .map_err(|e| ErrorData::internal_error(format!("Serialization error: {}", e), None))?; + + Ok(CallToolResult::success(vec![Content::text(response_text)])) + } + + /// Apply targeted edits to an atom's markdown content + #[tool( + description = "Apply safe edits to an existing atom. Supports replace, insert_after, append, and replace_all. replace and insert_after require exact text that appears exactly once. The whole operation fails without saving if any edit is invalid. Prefer this over update_atom for markdown changes." + )] + async fn edit_atom( + &self, + context: RequestContext, + Parameters(params): Parameters, + ) -> Result { + let core = self.resolve_core(&context).await?; + + let existing = match core.get_atom(¶ms.atom_id).await { + Ok(Some(atom)) => atom, + Ok(None) => { + return Ok(CallToolResult::success(vec![Content::text(format!( + "Atom not found: {}", + params.atom_id + ))])); + } + Err(e) => return Err(ErrorData::internal_error(e.to_string(), None)), + }; + + let edits = params + .edits + .iter() + .map(AtomEditOperation::from) + .collect::>(); + let content = match apply_atom_edits(&existing.atom.content, &edits) { + Ok(content) => content, + Err(error) => return Ok(CallToolResult::success(vec![Content::text(error)])), + }; + + if content == existing.atom.content { + return Ok(CallToolResult::success(vec![Content::text( + "Edits did not change the atom content".to_string(), + )])); + } + if content.trim().is_empty() { + return Ok(CallToolResult::success(vec![Content::text( + "Cannot update an atom to empty content".to_string(), + )])); + } + + let request = atomic_core::UpdateAtomRequest { + content, + source_url: existing.atom.source_url.clone(), + published_at: existing.atom.published_at.clone(), tag_ids: None, }; let on_event = embedding_event_callback(self.event_tx.clone()); let result = core - .update_atom(¶ms.atom_id, request, on_event) + .update_atom_if_unchanged( + ¶ms.atom_id, + request, + &existing.atom.updated_at, + on_event, + ) .await .map_err(|e| ErrorData::internal_error(e.to_string(), None))?; diff --git a/crates/atomic-server/src/mcp/types.rs b/crates/atomic-server/src/mcp/types.rs index ba8fe0e0..3120d72b 100644 --- a/crates/atomic-server/src/mcp/types.rs +++ b/crates/atomic-server/src/mcp/types.rs @@ -51,12 +51,71 @@ pub struct UpdateAtomParams { /// The UUID of the atom to update pub atom_id: String, - /// The new markdown content for the atom - pub content: String, + /// Optional replacement markdown content for the atom. Omit to preserve current content. + #[serde(default)] + pub content: Option, - /// Optional source URL where this content originated + /// Optional replacement source URL. Omit to preserve current source URL. #[serde(default)] pub source_url: Option, + + /// Optional replacement publication date. Omit to preserve current publication date. + #[serde(default)] + pub published_at: Option, + + /// Optional replacement tag IDs. Omit to preserve current tags; pass [] to clear tags. + #[serde(default)] + pub tag_ids: Option>, +} + +/// A single edit operation for edit_atom. +#[derive(Debug, Deserialize, JsonSchema)] +pub struct EditOperation { + /// Operation type: replace, insert_after, append, or replace_all. + pub operation: String, + + /// Exact text to replace. Required for replace and must occur exactly once. + #[serde(default)] + pub old_text: Option, + + /// Replacement text for replace. + #[serde(default)] + pub new_text: Option, + + /// Exact text to insert after. Required for insert_after and must occur exactly once. + #[serde(default)] + pub anchor_text: Option, + + /// Text to insert for insert_after or append. + #[serde(default)] + pub text: Option, + + /// Full replacement markdown content. Required for replace_all. + #[serde(default)] + pub content: Option, +} + +impl From<&EditOperation> for atomic_core::AtomEditOperation { + fn from(value: &EditOperation) -> Self { + Self { + operation: value.operation.clone(), + old_text: value.old_text.clone(), + new_text: value.new_text.clone(), + anchor_text: value.anchor_text.clone(), + text: value.text.clone(), + content: value.content.clone(), + } + } +} + +/// Input parameters for edit_atom tool +#[derive(Debug, Deserialize, JsonSchema)] +pub struct EditAtomParams { + /// The UUID of the atom to edit + pub atom_id: String, + + /// Edits to apply in order. The whole operation fails if any edit is invalid. + pub edits: Vec, } /// Input parameters for ingest_url tool