Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions lib/observo/chkpts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ serde.workspace = true
serde_with.workspace = true
anyhow.workspace = true
chrono.workspace = true
futures.workspace = true
indexmap.workspace = true
itertools.workspace = true
metrics.workspace = true
tokio-postgres = { version = "0.7", default-features = false, features = ["runtime", "with-chrono-0_4"] }
deadpool-postgres = "0.13"
async-trait = { version = "0.1", default-features = false }
rusqlite.workspace = true
toml.workspace = true
tokio.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion lib/observo/private
2 changes: 1 addition & 1 deletion lib/observo/scol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ path = "src/lib.rs"
arc-swap = { version = "1.7", default-features = false }
dashmap = { version = "5.5" }
lext = { path = "../lext" }
mlua = { version = "0.10.2", default-features = false, features = ["lua54", "send", "vendored", "macros"]}
mlua = { version = "0.10.2", default-features = false, features = ["lua54", "send", "vendored", "macros", "async"]}
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
Expand Down
1 change: 1 addition & 0 deletions lib/vector-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sources-utils-net-udp = []

[dependencies]
async-stream = "0.3.6"
async-trait.workspace = true
bytes = { version = "1.9.0", default-features = false }
chrono.workspace = true
crossbeam-utils = { version = "0.8.20", default-features = false }
Expand Down
8 changes: 5 additions & 3 deletions lib/vector-common/src/chkpts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{error::Error, fmt::Display};

use async_trait::async_trait;
use crate::id::ComponentKey;
use chrono::{DateTime, Utc};

Expand Down Expand Up @@ -46,9 +47,10 @@ impl Error for ChkptErr {
}
}

#[async_trait]
pub trait Accessor: Send + dyn_clone::DynClone + Sync {
fn get(&self, id: String) -> Result<Value, ChkptErr>;
fn set(&self, id: String, value: String, ctx: String) -> Result<(), ChkptErr>;
async fn get(&self, id: String) -> Result<Value, ChkptErr>;
async fn set(&self, id: String, value: String, ctx: String) -> Result<(), ChkptErr>;
}

dyn_clone::clone_trait_object!(Accessor);
dyn_clone::clone_trait_object!(Accessor);
23 changes: 13 additions & 10 deletions lib/vector-core/src/chkpts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;

use async_trait::async_trait;
use serde_with::serde_as;
use vector_config::{configurable_component, NamedComponent};

Expand All @@ -22,25 +23,27 @@ pub enum StoreConfig {
None
}

#[async_trait]
pub trait Store : Send + Sync + 'static {
fn accessor(&self, key: ComponentKey) -> Box<dyn Accessor>;
fn reload( &mut self, config: StoreConfig, default_data_dir: Option<PathBuf>) -> crate::Result<()>;
async fn reload( &mut self, config: StoreConfig, default_data_dir: Option<PathBuf>) -> crate::Result<()>;
}

#[cfg(feature = "observo")]
#[async_trait]
impl Store for ObStore {
fn accessor(&self, key: ComponentKey) -> Box<dyn Accessor> {
Box::new(self.accessor(key))
}

fn reload(&mut self, config: StoreConfig, default_data_dir: Option<PathBuf>) -> crate::Result<()> {
async fn reload(&mut self, config: StoreConfig, default_data_dir: Option<PathBuf>) -> crate::Result<()> {
match config {
#[cfg(feature = "observo")]
StoreConfig::Observo(cfg) => {
*self = cfg.build(default_data_dir)?;
*self = cfg.build(default_data_dir).await?;
},
StoreConfig::None => {
warn!("Checkpoint store config has been dropped but unload is not supported. Restart process to unload (if necessary).");
tracing::warn!("Checkpoint store config has been dropped but unload is not supported. Restart process to unload (if necessary).");
},
}
Ok(())
Expand All @@ -49,23 +52,23 @@ impl Store for ObStore {

impl StoreConfig {
#[allow(unused)]
pub fn build(self, data_dir: Option<PathBuf>) -> crate::Result<Option<Box<dyn Store + Send + Sync>>> {
pub async fn build(self, data_dir: Option<PathBuf>) -> crate::Result<Option<Box<dyn Store + Send + Sync>>> {
match self {
#[cfg(feature = "observo")]
StoreConfig::Observo(cfg) => Ok(Some(Box::new(cfg.build(data_dir)?))),
StoreConfig::Observo(cfg) => Ok(Some(Box::new(cfg.build(data_dir).await?))),
StoreConfig::None => Ok(None),
}
}

pub fn merge(&self, other: &Self) -> Self {
pub fn merge(&self, other: &Self) -> Result<Self, String> {
match (self, other) {
#[cfg(feature = "observo")]
(StoreConfig::Observo(l), StoreConfig::Observo(r)) => {
StoreConfig::Observo(l.merge(r))
l.merge(r).map(StoreConfig::Observo).map_err(|e| e.to_string())
},
#[cfg(feature = "observo")]
(l, StoreConfig::None) => l.clone(),
(&StoreConfig::None, r) => r.clone(),
(l, StoreConfig::None) => Ok(l.clone()),
(&StoreConfig::None, r) => Ok(r.clone()),
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion lib/vector-core/src/config/global_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,13 @@ impl GlobalOptions {
let mut telemetry = self.telemetry.clone();
telemetry.merge(&with.telemetry);

let checkpoint = self.checkpoint.merge(&with.checkpoint);
let checkpoint = match self.checkpoint.merge(&with.checkpoint) {
Ok(c) => c,
Err(e) => {
errors.push(e.to_string());
self.checkpoint.clone()
}
};

if errors.is_empty() {
Ok(Self {
Expand Down
8 changes: 4 additions & 4 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<'a> Builder<'a> {
/// Builds the new pieces of the topology found in `self.diff`.
async fn build(mut self) -> Result<TopologyPieces, Vec<ScopedError>> {
let enrichment_tables = self.load_enrichment_tables().await;
let chkpt_store = self.load_checkpoint_store();
let chkpt_store = self.load_checkpoint_store().await;
let source_tasks = self.build_sources(chkpt_store).await;
self.build_transforms(enrichment_tables).await;
self.build_sinks(enrichment_tables).await;
Expand Down Expand Up @@ -151,7 +151,7 @@ impl<'a> Builder<'a> {
finalized_outputs
}

fn load_checkpoint_store(&mut self) -> Arc<Mutex<Option<Box<dyn CheckpointStore>>>> {
async fn load_checkpoint_store(&mut self) -> Arc<Mutex<Option<Box<dyn CheckpointStore>>>> {
let cfg = self.config.global.checkpoint.clone();
if let Ok(mut store) = CHECKPT_STORE.lock() {
let span = error_span!(
Expand All @@ -160,11 +160,11 @@ impl<'a> Builder<'a> {
component_id = "checkpoint_store",
component_type = %cfg.get_component_name());
if let Some(s) = store.as_mut() {
if let Err(e) = s.reload(cfg, self.config.global.data_dir.clone()) {
if let Err(e) = s.reload(cfg, self.config.global.data_dir.clone()).await {
self.errors.push((&span, format!("Checkpoint Store: {}", e)).into());
}
} else {
match cfg.build(self.config.global.data_dir.clone()) {
match cfg.build(self.config.global.data_dir.clone()).await {
Ok(Some(s)) => {
*store = Some(s);
},
Expand Down