diff --git a/Cargo.lock b/Cargo.lock index 7e5ab0396..b4b7ae0ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2378,10 +2378,9 @@ version = "0.1.0" dependencies = [ "anyhow", "assert_matches", + "async-trait", "chrono", - "futures 0.3.31", - "indexmap 2.7.0", - "itertools 0.14.0", + "deadpool-postgres", "metrics", "rand 0.8.5", "rusqlite", @@ -2389,6 +2388,7 @@ dependencies = [ "serde_with 3.12.0", "tempfile", "tokio", + "tokio-postgres", "toml", "tracing 0.1.41", "tracing-test", @@ -12767,6 +12767,7 @@ name = "vector-common" version = "0.1.0" dependencies = [ "async-stream", + "async-trait", "bytes 1.9.0", "chrono", "crossbeam-utils", diff --git a/lib/observo/chkpts/Cargo.toml b/lib/observo/chkpts/Cargo.toml index 94d0c4238..434efc771 100644 --- a/lib/observo/chkpts/Cargo.toml +++ b/lib/observo/chkpts/Cargo.toml @@ -19,10 +19,10 @@ serde.workspace = true serde_with.workspace = true anyhow.workspace = true chrono.workspace = true -futures.workspace = true -indexmap.workspace = true -itertools.workspace = true metrics.workspace = true +tokio-postgres = { version = "0.7", default-features = false, features = ["runtime", "with-chrono-0_4"] } +deadpool-postgres = "0.13" +async-trait = { version = "0.1", default-features = false } rusqlite.workspace = true toml.workspace = true tokio.workspace = true diff --git a/lib/observo/private b/lib/observo/private index 3eaa86e56..ef02b8bb1 160000 --- a/lib/observo/private +++ b/lib/observo/private @@ -1 +1 @@ -Subproject commit 3eaa86e563de0b17613209d9eda6ddf1c309d26a +Subproject commit ef02b8bb19110c09ec2762e5539bc13a61cecaf4 diff --git a/lib/observo/scol/Cargo.toml b/lib/observo/scol/Cargo.toml index d2d83e8db..803b55254 100644 --- a/lib/observo/scol/Cargo.toml +++ b/lib/observo/scol/Cargo.toml @@ -16,7 +16,7 @@ path = "src/lib.rs" arc-swap = { version = "1.7", default-features = false } dashmap = { version = "5.5" } lext = { path = "../lext" } -mlua = { version = "0.10.2", default-features = false, features = ["lua54", "send", "vendored", "macros"]} +mlua = { version = "0.10.2", default-features = false, features = ["lua54", "send", "vendored", "macros", "async"]} serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/lib/vector-common/Cargo.toml b/lib/vector-common/Cargo.toml index 3fb7bf50b..fb865ddc9 100644 --- a/lib/vector-common/Cargo.toml +++ b/lib/vector-common/Cargo.toml @@ -36,6 +36,7 @@ sources-utils-net-udp = [] [dependencies] async-stream = "0.3.6" +async-trait.workspace = true bytes = { version = "1.9.0", default-features = false } chrono.workspace = true crossbeam-utils = { version = "0.8.20", default-features = false } diff --git a/lib/vector-common/src/chkpts.rs b/lib/vector-common/src/chkpts.rs index f07ca3b08..8493a84d3 100644 --- a/lib/vector-common/src/chkpts.rs +++ b/lib/vector-common/src/chkpts.rs @@ -1,5 +1,6 @@ use std::{error::Error, fmt::Display}; +use async_trait::async_trait; use crate::id::ComponentKey; use chrono::{DateTime, Utc}; @@ -46,9 +47,10 @@ impl Error for ChkptErr { } } +#[async_trait] pub trait Accessor: Send + dyn_clone::DynClone + Sync { - fn get(&self, id: String) -> Result; - fn set(&self, id: String, value: String, ctx: String) -> Result<(), ChkptErr>; + async fn get(&self, id: String) -> Result; + async fn set(&self, id: String, value: String, ctx: String) -> Result<(), ChkptErr>; } -dyn_clone::clone_trait_object!(Accessor); \ No newline at end of file +dyn_clone::clone_trait_object!(Accessor); diff --git a/lib/vector-core/src/chkpts.rs b/lib/vector-core/src/chkpts.rs index ba549c34e..f8f7bf20f 100644 --- a/lib/vector-core/src/chkpts.rs +++ b/lib/vector-core/src/chkpts.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; +use async_trait::async_trait; use serde_with::serde_as; use vector_config::{configurable_component, NamedComponent}; @@ -22,25 +23,27 @@ pub enum StoreConfig { None } +#[async_trait] pub trait Store : Send + Sync + 'static { fn accessor(&self, key: ComponentKey) -> Box; - fn reload( &mut self, config: StoreConfig, default_data_dir: Option) -> crate::Result<()>; + async fn reload( &mut self, config: StoreConfig, default_data_dir: Option) -> crate::Result<()>; } #[cfg(feature = "observo")] +#[async_trait] impl Store for ObStore { fn accessor(&self, key: ComponentKey) -> Box { Box::new(self.accessor(key)) } - fn reload(&mut self, config: StoreConfig, default_data_dir: Option) -> crate::Result<()> { + async fn reload(&mut self, config: StoreConfig, default_data_dir: Option) -> crate::Result<()> { match config { #[cfg(feature = "observo")] StoreConfig::Observo(cfg) => { - *self = cfg.build(default_data_dir)?; + *self = cfg.build(default_data_dir).await?; }, StoreConfig::None => { - warn!("Checkpoint store config has been dropped but unload is not supported. Restart process to unload (if necessary)."); + tracing::warn!("Checkpoint store config has been dropped but unload is not supported. Restart process to unload (if necessary)."); }, } Ok(()) @@ -49,23 +52,23 @@ impl Store for ObStore { impl StoreConfig { #[allow(unused)] - pub fn build(self, data_dir: Option) -> crate::Result>> { + pub async fn build(self, data_dir: Option) -> crate::Result>> { match self { #[cfg(feature = "observo")] - StoreConfig::Observo(cfg) => Ok(Some(Box::new(cfg.build(data_dir)?))), + StoreConfig::Observo(cfg) => Ok(Some(Box::new(cfg.build(data_dir).await?))), StoreConfig::None => Ok(None), } } - pub fn merge(&self, other: &Self) -> Self { + pub fn merge(&self, other: &Self) -> Result { match (self, other) { #[cfg(feature = "observo")] (StoreConfig::Observo(l), StoreConfig::Observo(r)) => { - StoreConfig::Observo(l.merge(r)) + l.merge(r).map(StoreConfig::Observo).map_err(|e| e.to_string()) }, #[cfg(feature = "observo")] - (l, StoreConfig::None) => l.clone(), - (&StoreConfig::None, r) => r.clone(), + (l, StoreConfig::None) => Ok(l.clone()), + (&StoreConfig::None, r) => Ok(r.clone()), } } } diff --git a/lib/vector-core/src/config/global_options.rs b/lib/vector-core/src/config/global_options.rs index b5522c0dd..73013596a 100644 --- a/lib/vector-core/src/config/global_options.rs +++ b/lib/vector-core/src/config/global_options.rs @@ -225,7 +225,13 @@ impl GlobalOptions { let mut telemetry = self.telemetry.clone(); telemetry.merge(&with.telemetry); - let checkpoint = self.checkpoint.merge(&with.checkpoint); + let checkpoint = match self.checkpoint.merge(&with.checkpoint) { + Ok(c) => c, + Err(e) => { + errors.push(e.to_string()); + self.checkpoint.clone() + } + }; if errors.is_empty() { Ok(Self { diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 4713e11d4..1c4a0bc6c 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -112,7 +112,7 @@ impl<'a> Builder<'a> { /// Builds the new pieces of the topology found in `self.diff`. async fn build(mut self) -> Result> { let enrichment_tables = self.load_enrichment_tables().await; - let chkpt_store = self.load_checkpoint_store(); + let chkpt_store = self.load_checkpoint_store().await; let source_tasks = self.build_sources(chkpt_store).await; self.build_transforms(enrichment_tables).await; self.build_sinks(enrichment_tables).await; @@ -151,7 +151,7 @@ impl<'a> Builder<'a> { finalized_outputs } - fn load_checkpoint_store(&mut self) -> Arc>>> { + async fn load_checkpoint_store(&mut self) -> Arc>>> { let cfg = self.config.global.checkpoint.clone(); if let Ok(mut store) = CHECKPT_STORE.lock() { let span = error_span!( @@ -160,11 +160,11 @@ impl<'a> Builder<'a> { component_id = "checkpoint_store", component_type = %cfg.get_component_name()); if let Some(s) = store.as_mut() { - if let Err(e) = s.reload(cfg, self.config.global.data_dir.clone()) { + if let Err(e) = s.reload(cfg, self.config.global.data_dir.clone()).await { self.errors.push((&span, format!("Checkpoint Store: {}", e)).into()); } } else { - match cfg.build(self.config.global.data_dir.clone()) { + match cfg.build(self.config.global.data_dir.clone()).await { Ok(Some(s)) => { *store = Some(s); },