Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 60 additions & 287 deletions crates/atomic-core/src/agent.rs

Large diffs are not rendered by default.

192 changes: 192 additions & 0 deletions crates/atomic-core/src/atom_edit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
use serde::{Deserialize, Serialize};

/// A single markdown edit operation for an atom.
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
pub struct AtomEditOperation {
pub operation: String,
#[serde(default)]
pub old_text: Option<String>,
#[serde(default)]
pub new_text: Option<String>,
#[serde(default)]
pub anchor_text: Option<String>,
#[serde(default)]
pub text: Option<String>,
#[serde(default)]
pub content: Option<String>,
}

fn exact_match_range(
content: &str,
needle: &str,
edit_index: usize,
) -> Result<(usize, usize), String> {
if needle.is_empty() {
return Err(format!("Edit {} has empty anchor text", edit_index + 1));
}

let mut matches = content.match_indices(needle);
let Some((start, matched)) = matches.next() else {
return Err(format!(
"Edit {} anchor text was not found exactly once",
edit_index + 1
));
};
if matches.next().is_some() {
return Err(format!(
"Edit {} anchor text matched more than once; use a more specific anchor",
edit_index + 1
));
}

Ok((start, start + matched.len()))
}

pub fn apply_atom_edits(content: &str, edits: &[AtomEditOperation]) -> Result<String, String> {
if edits.is_empty() {
return Err("At least one edit is required".to_string());
}

let mut updated = content.to_string();
for (index, edit) in edits.iter().enumerate() {
match edit.operation.as_str() {
"replace" => {
let old_text = edit
.old_text
.as_deref()
.ok_or_else(|| format!("Edit {} is missing old_text", index + 1))?;
let new_text = edit
.new_text
.as_deref()
.ok_or_else(|| format!("Edit {} is missing new_text", index + 1))?;
let (start, end) = exact_match_range(&updated, old_text, index)?;
updated.replace_range(start..end, new_text);
}
"insert_after" => {
let anchor_text = edit
.anchor_text
.as_deref()
.ok_or_else(|| format!("Edit {} is missing anchor_text", index + 1))?;
let text = edit
.text
.as_deref()
.ok_or_else(|| format!("Edit {} is missing text", index + 1))?;
let (_, end) = exact_match_range(&updated, anchor_text, index)?;
updated.insert_str(end, text);
}
"append" => {
let text = edit
.text
.as_deref()
.ok_or_else(|| format!("Edit {} is missing text", index + 1))?;
updated.push_str(text);
}
"replace_all" => {
let content = edit
.content
.as_deref()
.ok_or_else(|| format!("Edit {} is missing content", index + 1))?;
updated = content.to_string();
}
operation => {
return Err(format!(
"Edit {} has unsupported operation '{}'",
index + 1,
operation
));
}
}
}

Ok(updated)
}

#[cfg(test)]
mod tests {
use super::{apply_atom_edits, AtomEditOperation};

#[test]
fn apply_atom_edits_supports_replace_insert_and_append() {
let edits = vec![
AtomEditOperation {
operation: "replace".to_string(),
old_text: Some("old item".to_string()),
new_text: Some("new item".to_string()),
anchor_text: None,
text: None,
content: None,
},
AtomEditOperation {
operation: "insert_after".to_string(),
old_text: None,
new_text: None,
anchor_text: Some("## Tasks\n".to_string()),
text: Some("\nIntro line\n".to_string()),
content: None,
},
AtomEditOperation {
operation: "append".to_string(),
old_text: None,
new_text: None,
anchor_text: None,
text: Some("\n\nDone.".to_string()),
content: None,
},
];

let updated = apply_atom_edits("# Note\n\n## Tasks\n- old item", &edits).unwrap();

assert_eq!(
updated,
"# Note\n\n## Tasks\n\nIntro line\n- new item\n\nDone."
);
}

#[test]
fn apply_atom_edits_supports_replace_all() {
let edits = vec![AtomEditOperation {
operation: "replace_all".to_string(),
old_text: None,
new_text: None,
anchor_text: None,
text: None,
content: Some("# Replacement\n\nFull body.".to_string()),
}];

let updated = apply_atom_edits("# Original\n\nOld body.", &edits).unwrap();

assert_eq!(updated, "# Replacement\n\nFull body.");
}

#[test]
fn apply_atom_edits_rejects_missing_anchor() {
let edits = vec![AtomEditOperation {
operation: "replace".to_string(),
old_text: Some("missing".to_string()),
new_text: Some("replacement".to_string()),
anchor_text: None,
text: None,
content: None,
}];

let error = apply_atom_edits("content", &edits).unwrap_err();

assert!(error.contains("not found"));
}

#[test]
fn apply_atom_edits_rejects_ambiguous_anchor() {
let edits = vec![AtomEditOperation {
operation: "insert_after".to_string(),
old_text: None,
new_text: None,
anchor_text: Some("same".to_string()),
text: Some("!".to_string()),
content: None,
}];

let error = apply_atom_edits("same and same", &edits).unwrap_err();

assert!(error.contains("matched more than once"));
}
}
46 changes: 46 additions & 0 deletions crates/atomic-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//! ```

pub mod agent;
pub mod atom_edit;
pub(crate) mod atom_links;
pub mod briefing;
pub mod canvas_level;
Expand Down Expand Up @@ -59,6 +60,7 @@ pub mod wiki;

// Re-exports for convenience
pub use agent::{CanvasClusterSummary, CanvasContext, ChatEvent, PageContext};
pub use atom_edit::{apply_atom_edits, AtomEditOperation};
pub use briefing::{
Briefing, BriefingCitation, BriefingFrequency, BriefingSchedule, BriefingScheduleStatus,
BriefingWeekday, BriefingWithCitations,
Expand Down Expand Up @@ -936,6 +938,7 @@ impl AtomicCore {
tag_requested: true,
not_before: None,
reason: "create_atom".to_string(),
replace_existing: false,
};
self.storage.enqueue_pipeline_jobs_sync(&[job]).await?;
self.process_queued_pipeline_jobs(on_event).await?;
Expand Down Expand Up @@ -1025,6 +1028,7 @@ impl AtomicCore {
tag_requested: true,
not_before: None,
reason: "create_atoms_bulk".to_string(),
replace_existing: false,
})
.collect();
self.storage.enqueue_pipeline_jobs_sync(&jobs).await?;
Expand Down Expand Up @@ -1060,6 +1064,41 @@ impl AtomicCore {
tag_requested: true,
not_before: None,
reason: "update_atom".to_string(),
replace_existing: false,
};
self.storage.enqueue_pipeline_jobs_sync(&[job]).await?;
self.process_queued_pipeline_jobs(on_event).await?;

Ok(atom_with_tags)
}

/// Update an atom only if its `updated_at` still matches the caller's
/// previously-read value, then trigger re-embedding and re-tagging.
pub async fn update_atom_if_unchanged<F>(
&self,
id: &str,
request: UpdateAtomRequest,
expected_updated_at: &str,
on_event: F,
) -> Result<AtomWithTags, AtomicCoreError>
where
F: Fn(EmbeddingEvent) + Send + Sync + 'static,
{
let now = Utc::now().to_rfc3339();

let atom_with_tags = self
.storage
.update_atom_if_unchanged_impl(id, &request, &now, expected_updated_at)
.await?;
self.canvas_cache.invalidate();

let job = AtomPipelineJobRequest {
atom_id: id.to_string(),
embed_requested: true,
tag_requested: true,
not_before: None,
reason: "update_atom_if_unchanged".to_string(),
replace_existing: false,
};
self.storage.enqueue_pipeline_jobs_sync(&[job]).await?;
self.process_queued_pipeline_jobs(on_event).await?;
Expand Down Expand Up @@ -1986,6 +2025,7 @@ impl AtomicCore {
tag_requested: tagging_status == "pending",
not_before: None,
reason: "retry_embedding".to_string(),
replace_existing: false,
};
self.storage.enqueue_pipeline_jobs_sync(&[job]).await?;
self.process_queued_pipeline_jobs(on_event).await?;
Expand Down Expand Up @@ -2013,6 +2053,7 @@ impl AtomicCore {
tag_requested: false,
not_before: None,
reason: "spawn_reembed_pending".to_string(),
replace_existing: true,
})
.collect();
self.storage.enqueue_pipeline_jobs_sync(&jobs).await?;
Expand Down Expand Up @@ -2043,6 +2084,7 @@ impl AtomicCore {
tag_requested: false,
not_before: None,
reason: "reembed_all_atoms".to_string(),
replace_existing: true,
})
.collect();
self.storage.enqueue_pipeline_jobs_sync(&jobs).await?;
Expand Down Expand Up @@ -2078,6 +2120,7 @@ impl AtomicCore {
tag_requested: true,
not_before: None,
reason: "retag_all_atoms".to_string(),
replace_existing: false,
})
.collect();
self.storage.enqueue_pipeline_jobs_sync(&jobs).await?;
Expand Down Expand Up @@ -2121,6 +2164,7 @@ impl AtomicCore {
tag_requested: true,
not_before: None,
reason: "retry_tagging".to_string(),
replace_existing: false,
};
self.storage.enqueue_pipeline_jobs_sync(&[job]).await?;
embedding::process_queued_pipeline_jobs_with_settings(
Expand Down Expand Up @@ -2724,6 +2768,7 @@ impl AtomicCore {
tag_requested: true,
not_before: None,
reason: "process_atom_pipeline".to_string(),
replace_existing: false,
};
self.storage.enqueue_pipeline_jobs_sync(&[job]).await?;
self.process_queued_pipeline_jobs(on_event).await?;
Expand Down Expand Up @@ -3207,6 +3252,7 @@ impl AtomicCore {
tag_requested: true,
not_before: None,
reason: "import_markdown".to_string(),
replace_existing: false,
})
.collect();
self.canvas_cache.invalidate();
Expand Down
1 change: 1 addition & 0 deletions crates/atomic-core/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ pub struct AtomPipelineJobRequest {
pub tag_requested: bool,
pub not_before: Option<String>,
pub reason: String,
pub replace_existing: bool,
}

/// Existing chunk content reused by embed-only re-embedding.
Expand Down
Loading
Loading