Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 95 additions & 6 deletions crates/atomic-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1562,13 +1562,51 @@ impl AtomicCore {

let (strategy, ctx) = self.build_wiki_strategy_context(tag_id, tag_name).await?;

let draft = match wiki::strategy_propose(&strategy, &ctx, &existing)
let draft = match wiki::strategy_propose_outcome(&strategy, &ctx, &existing)
.await
.map_err(|e| AtomicCoreError::Wiki(e))?
{
Some(d) => d,
None => {
tracing::info!(tag_id, "[wiki] No update warranted; no proposal created");
wiki::WikiProposalOutcome::Draft(d) => d,
wiki::WikiProposalOutcome::NoChange => {
// The LLM evaluated update chunks and decided nothing needs to
// change. Advance the baseline so the same atoms are not
// re-evaluated on every subsequent "Generate Update" click.
if let Err(e) = self.storage.advance_wiki_baseline_sync(tag_id, None).await {
tracing::warn!(tag_id, error = %e, "[wiki] Failed to advance article baseline on no-change");
} else {
tracing::info!(
tag_id,
"[wiki] No update warranted; article baseline advanced"
);
}
return Ok(None);
}
wiki::WikiProposalOutcome::NoUpdateChunks => {
// No chunks were selected. This can mean there are truly no new
// atoms, but it can also mean older atoms were newly associated
// with this tag hierarchy. Only advance if the current tag count
// has not increased beyond the article's recorded baseline.
match self
.storage
.advance_wiki_baseline_sync(tag_id, Some(existing.article.atom_count))
.await
{
Ok(true) => {
tracing::info!(
tag_id,
"[wiki] No update chunks selected; article baseline advanced"
);
}
Ok(false) => {
tracing::info!(
tag_id,
"[wiki] No update chunks selected; article baseline left unchanged because atom count increased"
);
}
Err(e) => {
tracing::warn!(tag_id, error = %e, "[wiki] Failed to advance article baseline after empty update selection");
}
}
return Ok(None);
}
};
Expand Down Expand Up @@ -1640,13 +1678,12 @@ impl AtomicCore {
));
}

let now = chrono::Utc::now().to_rfc3339();
let article = WikiArticle {
id: existing.article.id.clone(),
tag_id: tag_id.to_string(),
content: proposal.content.clone(),
created_at: existing.article.created_at.clone(),
updated_at: now,
updated_at: proposal.created_at.clone(),
atom_count: existing.article.atom_count + proposal.new_atom_count,
};

Expand Down Expand Up @@ -4815,6 +4852,58 @@ mod tests {
assert_eq!(remaining_fts, 0);
}

#[tokio::test]
async fn test_guarded_wiki_baseline_advance_keeps_older_retagged_atoms_pending() {
let (db, _temp) = create_empty_test_db();
let tag = db.create_tag("Retagged", None).await.unwrap();
let article_updated_at = "2026-01-02T00:00:00+00:00";

{
let sqlite = db.storage.as_sqlite().unwrap();
let conn = sqlite.db.conn.lock().unwrap();
for atom_id in ["atom1", "atom2"] {
conn.execute(
"INSERT INTO atoms (id, content, created_at, updated_at)
VALUES (?1, ?2, ?3, ?3)",
rusqlite::params![atom_id, "older atom content", "2026-01-01T00:00:00+00:00"],
)
.unwrap();
conn.execute(
"INSERT INTO atom_tags (atom_id, tag_id) VALUES (?1, ?2)",
rusqlite::params![atom_id, &tag.id],
)
.unwrap();
}
conn.execute(
"INSERT INTO wiki_articles (id, tag_id, content, created_at, updated_at, atom_count)
VALUES (?1, ?2, ?3, ?4, ?4, 1)",
rusqlite::params![
"wiki1",
&tag.id,
"Existing article",
article_updated_at
],
)
.unwrap();
}

let advanced = db
.storage
.advance_wiki_baseline_sync(&tag.id, Some(1))
.await
.unwrap();
assert!(
!advanced,
"baseline must not advance when current atom count increased"
);

let status = db.get_wiki_status(&tag.id).await.unwrap();
assert_eq!(status.article_atom_count, 1);
assert_eq!(status.current_atom_count, 2);
assert_eq!(status.new_atoms_available, 1);
assert_eq!(status.updated_at.as_deref(), Some(article_updated_at));
}

#[tokio::test]
async fn test_global_search_ignores_stale_wiki_fts_rows() {
let (db, _temp) = create_test_db().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
"type": "string"
},
"content": {
"description": "New markdown content for the operation. For NoChange: empty string.",
"description": "New markdown content for the operation. Only NoChange may use empty content. AppendToSection, ReplaceSection, and InsertSection must provide non-empty markdown content with citations.",
"type": "string"
},
"heading": {
Expand Down
2 changes: 2 additions & 0 deletions crates/atomic-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ dispatch! {
=> sqlite: get_wiki_proposal_sync, pg_trait: WikiStore, pg_method: get_wiki_proposal;
fn delete_wiki_proposal_sync(&self, tag_id: &str) -> Result<(), AtomicCoreError>
=> sqlite: delete_wiki_proposal_sync, pg_trait: WikiStore, pg_method: delete_wiki_proposal;
fn advance_wiki_baseline_sync(&self, tag_id: &str, max_current_count: Option<i32>) -> Result<bool, AtomicCoreError>
=> sqlite: advance_wiki_baseline_sync, pg_trait: WikiStore, pg_method: advance_wiki_baseline;

// ---- BriefingStore ----
fn list_new_atoms_since_sync(&self, since: &str, limit: i32) -> Result<Vec<AtomWithTags>, AtomicCoreError>
Expand Down
111 changes: 99 additions & 12 deletions crates/atomic-core/src/storage/postgres/wiki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,18 @@ impl WikiStore for PostgresStorage {
last_update: &str,
max_source_tokens: usize,
) -> StorageResult<Option<(Vec<ChunkWithContext>, i32)>> {
// Get atoms added after the last update
// Get atoms added after the last update, spanning the full tag hierarchy.
let new_atom_ids: Vec<String> = sqlx::query_scalar(
"SELECT DISTINCT a.id FROM atoms a
"WITH RECURSIVE descendant_tags(id) AS (
SELECT $1
UNION ALL
SELECT t.id FROM tags t
INNER JOIN descendant_tags dt ON t.parent_id = dt.id
)
SELECT DISTINCT a.id FROM atoms a
INNER JOIN atom_tags at ON a.id = at.atom_id
WHERE at.tag_id = $1 AND a.created_at > $2 AND a.db_id = $3 AND at.db_id = $3",
WHERE at.tag_id IN (SELECT id FROM descendant_tags)
AND a.created_at > $2 AND a.db_id = $3 AND at.db_id = $3",
)
.bind(tag_id)
.bind(last_update)
Expand All @@ -560,7 +567,7 @@ impl WikiStore for PostgresStorage {
.await
.map_err(|e| AtomicCoreError::Wiki(e.to_string()))?;

let new_chunks = if let Some(ref centroid_vec) = centroid {
let mut new_chunks = if let Some(ref centroid_vec) = centroid {
let rows: Vec<(String, i32, String, f64)> = sqlx::query_as(
"SELECT ac.atom_id, ac.chunk_index, ac.content,
1 - (e.embedding <=> $1::vector) as similarity
Expand Down Expand Up @@ -622,17 +629,58 @@ impl WikiStore for PostgresStorage {
chunks
};

if new_chunks.is_empty() && centroid.is_some() {
let rows: Vec<(String, i32, String)> = sqlx::query_as(
"SELECT atom_id, chunk_index, content FROM atom_chunks
WHERE atom_id = ANY($1) AND db_id = $2 ORDER BY atom_id, chunk_index",
)
.bind(&new_atom_ids)
.bind(&self.db_id)
.fetch_all(&self.pool)
.await
.map_err(|e| AtomicCoreError::Wiki(e.to_string()))?;

let mut chunks = Vec::new();
let mut total_tokens = 0;
for (atom_id, chunk_index, content) in rows {
let tokens = count_tokens(&content);
if total_tokens + tokens > max_source_tokens && !chunks.is_empty() {
break;
}
total_tokens += tokens;
chunks.push(ChunkWithContext {
atom_id,
chunk_index,
content,
similarity_score: 1.0,
});
}
new_chunks = chunks;
}

if new_chunks.is_empty() {
return Ok(None);
return Err(AtomicCoreError::Wiki(
"New atoms are not ready for wiki update yet; chunking or embedding is still pending"
.to_string(),
));
}

let atom_count: Option<i64> =
sqlx::query_scalar("SELECT COUNT(*) FROM atom_tags WHERE tag_id = $1 AND db_id = $2")
.bind(tag_id)
.bind(&self.db_id)
.fetch_one(&self.pool)
.await
.map_err(|e| AtomicCoreError::Wiki(e.to_string()))?;
// Count uses the same descendant CTE as get_article_status.
let atom_count: Option<i64> = sqlx::query_scalar(
"WITH RECURSIVE descendant_tags(id) AS (
SELECT $1
UNION ALL
SELECT t.id FROM tags t
INNER JOIN descendant_tags dt ON t.parent_id = dt.id
)
SELECT COUNT(DISTINCT atom_id) FROM atom_tags
WHERE tag_id IN (SELECT id FROM descendant_tags) AND db_id = $2",
)
.bind(tag_id)
.bind(&self.db_id)
.fetch_one(&self.pool)
.await
.map_err(|e| AtomicCoreError::Wiki(e.to_string()))?;

Ok(Some((new_chunks, atom_count.unwrap_or(0) as i32)))
}
Expand Down Expand Up @@ -802,6 +850,45 @@ impl WikiStore for PostgresStorage {
.map_err(|e| AtomicCoreError::DatabaseOperation(e.to_string()))?;
Ok(())
}

async fn advance_wiki_baseline(
&self,
tag_id: &str,
max_current_count: Option<i32>,
) -> StorageResult<bool> {
let now = chrono::Utc::now().to_rfc3339();
let advanced = sqlx::query_scalar::<_, bool>(
"WITH RECURSIVE descendant_tags(id) AS (
SELECT $1::text
UNION ALL
SELECT t.id FROM tags t
INNER JOIN descendant_tags dt ON t.parent_id = dt.id
WHERE t.db_id = $2
),
current_total(atom_count) AS (
SELECT COUNT(DISTINCT atom_id)::int FROM atom_tags
WHERE tag_id IN (SELECT id FROM descendant_tags) AND db_id = $2
),
updated AS (
UPDATE wiki_articles
SET atom_count = current_total.atom_count, updated_at = $3
FROM current_total
WHERE wiki_articles.tag_id = $1
AND wiki_articles.db_id = $2
AND ($4::int IS NULL OR current_total.atom_count <= $4)
RETURNING 1
)
SELECT EXISTS(SELECT 1 FROM updated)",
)
.bind(tag_id)
.bind(&self.db_id)
.bind(&now)
.bind(max_current_count)
.fetch_one(&self.pool)
.await
.map_err(|e| AtomicCoreError::Wiki(e.to_string()))?;
Ok(advanced)
}
}

// Private helper methods
Expand Down
Loading
Loading