diff --git a/Cargo.lock b/Cargo.lock index 5f6661df3..0311ba1e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,6 +18,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" +[[package]] +name = "actson" +version = "2.1.0" +source = "git+https://github.com/jsbali/actson-rs.git#6d1631138d3c0b9cbc528fd72eb93d5310213e99" +dependencies = [ + "btoi", + "num-traits", + "thiserror 2.0.17", +] + [[package]] name = "addr2line" version = "0.21.0" @@ -2086,6 +2096,15 @@ dependencies = [ "serde", ] +[[package]] +name = "btoi" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b5ab9db53bcda568284df0fd39f6eac24ad6f7ba7ff1168b9e76eba6576b976" +dependencies = [ + "num-traits", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -2172,7 +2191,7 @@ dependencies = [ "cached_proc_macro_types", "hashbrown 0.14.5", "once_cell", - "thiserror 2.0.3", + "thiserror 2.0.17", "web-time", ] @@ -2544,6 +2563,7 @@ dependencies = [ name = "codecs" version = "0.1.0" dependencies = [ + "actson", "apache-avro", "base64 0.13.1", "byteorder", @@ -5192,9 +5212,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes 1.9.0", "futures-channel", @@ -7225,7 +7245,7 @@ name = "oauth2" version = "5.0.0" source = "git+https://github.com/janmejay/oauth2-rs.git#57fb92849941d52c820502bf058cc87d4bf11f7a" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "chrono", "getrandom 0.2.15", "http 1.1.0", @@ -7580,7 +7600,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "thiserror 2.0.3", + "thiserror 2.0.17", "tokio", "url", ] @@ -8025,7 +8045,7 @@ dependencies = [ "serde_urlencoded", "smallvec", "sync_wrapper 1.0.1", - "thiserror 2.0.3", + "thiserror 2.0.17", "tokio", "tokio-rustls 0.25.0", "tokio-util", @@ -9761,6 +9781,7 @@ dependencies = [ "http 0.2.9", "http 1.1.0", "hyper 0.14.28", + "hyper-util", "itertools 0.14.0", "oauth2 5.0.0", "oxide-auth", @@ -9777,7 +9798,7 @@ dependencies = [ "serde_urlencoded", "serde_with 3.12.0", "tempfile", - "thiserror 2.0.3", + "thiserror 2.0.17", "tokio", "tokio-util", "toml", @@ -9814,6 +9835,7 @@ dependencies = [ "assert_matches", "async-channel 2.5.0", "async-recursion", + "async-trait", "base64 0.13.1", "bstr 1.11.0", "bytes 1.9.0", @@ -9836,6 +9858,7 @@ dependencies = [ "oxide-auth", "oxide-auth-poem", "pathsearch", + "poem", "rand 0.8.5", "regex", "rstest", @@ -9851,11 +9874,13 @@ dependencies = [ "sha2", "smallvec", "snafu 0.7.5", + "stream-cancel", "tempfile", "test-case", "textwrap", "tokio", "tokio-stream", + "tokio-util", "toml", "tracing 0.1.41", "tracing-test", @@ -11040,11 +11065,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.3" +version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ - "thiserror-impl 2.0.3", + "thiserror-impl 2.0.17", ] [[package]] @@ -11060,9 +11085,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.3" +version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2 1.0.92", "quote 1.0.37", @@ -12923,7 +12948,7 @@ dependencies = [ "strip-ansi-escapes", "syslog_loose", "termcolor", - "thiserror 2.0.3", + "thiserror 2.0.17", "tokio", "tracing 0.1.41", "uaparser", diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 0b17209ed..c4f7c4d06 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -49,6 +49,7 @@ obvrl = { path = "../observo/obvrl", optional = true } futures.workspace = true indexmap.workspace = true bytesize.workspace = true +actson = { git = "https://github.com/jsbali/actson-rs.git" } [dev-dependencies] futures.workspace = true diff --git a/lib/codecs/src/decoding/format/avro.rs b/lib/codecs/src/decoding/format/avro.rs index 7a4976317..a385fddbe 100644 --- a/lib/codecs/src/decoding/format/avro.rs +++ b/lib/codecs/src/decoding/format/avro.rs @@ -87,7 +87,7 @@ impl From<&AvroDeserializerOptions> for AvroSerializerOptions { } /// Apache Avro serializer options. #[configurable_component] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct AvroDeserializerOptions { /// The Avro schema definition. /// Please note that the following [`apache_avro::types::Value`] variants are currently *not* supported: diff --git a/lib/codecs/src/decoding/format/gelf.rs b/lib/codecs/src/decoding/format/gelf.rs index c37924072..09ec54c3e 100644 --- a/lib/codecs/src/decoding/format/gelf.rs +++ b/lib/codecs/src/decoding/format/gelf.rs @@ -32,7 +32,7 @@ use crate::{gelf_fields::*, VALID_FIELD_REGEX}; /// Config used to build a `GelfDeserializer`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct GelfDeserializerConfig { /// GELF-specific decoding options. #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] diff --git a/lib/codecs/src/decoding/format/influxdb.rs b/lib/codecs/src/decoding/format/influxdb.rs index a4cb5cbfe..2b059e216 100644 --- a/lib/codecs/src/decoding/format/influxdb.rs +++ b/lib/codecs/src/decoding/format/influxdb.rs @@ -19,7 +19,7 @@ use super::Deserializer; /// Config used to build a `InfluxdbDeserializer`. /// - [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_tutorial/): #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct InfluxdbDeserializerConfig { /// Influxdb-specific decoding options. #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] diff --git a/lib/codecs/src/decoding/format/json.rs b/lib/codecs/src/decoding/format/json.rs index a017cba3c..37bf00075 100644 --- a/lib/codecs/src/decoding/format/json.rs +++ b/lib/codecs/src/decoding/format/json.rs @@ -14,7 +14,7 @@ use super::{default_lossy, Deserializer}; /// Config used to build a `JsonDeserializer`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct JsonDeserializerConfig { /// JSON-specific decoding options. #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] diff --git a/lib/codecs/src/decoding/format/json_paths.rs b/lib/codecs/src/decoding/format/json_paths.rs new file mode 100644 index 000000000..887f17a7e --- /dev/null +++ b/lib/codecs/src/decoding/format/json_paths.rs @@ -0,0 +1,1182 @@ +use actson::feeder::PushJsonFeeder; +use actson::options::JsonParserOptionsBuilder; +use actson::{JsonEvent, JsonParser}; +use bytes::Bytes; +use derivative::Derivative; +use lookup::OwnedValuePath; +use smallvec::{smallvec, SmallVec}; +use std::collections::{BTreeMap, HashMap}; +use vector_config::configurable_component; +use vector_core::{ + config::{DataType, LogNamespace}, + event::{Event, LogEvent}, + schema, +}; +use vrl::value::{Kind, Value}; + +use super::{default_lossy, Deserializer}; + +/// Operations that can be performed on JSON paths +#[configurable_component] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum PathOperation { + /// Emit the value as-is when encountered + Identity, + /// Emit each array element as a separate event + Explode, + /// Emit the value as bytes + Bytes, +} + + +/// Configuration for path-based operations +#[configurable_component] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PathOperationConfig { + /// Map of JSON paths (as strings) to operations to perform on them + pub paths: BTreeMap, +} + +impl PathOperationConfig { + /// Create a new configuration with at least one path + pub fn new(first_path: OwnedValuePath, first_op: PathOperation) -> Self { + let mut paths = BTreeMap::new(); + paths.insert(first_path.to_string(), first_op); + Self { paths } + } + + /// Add a path to the configuration (builder pattern) + pub fn with_path(mut self, path: OwnedValuePath, operation: PathOperation) -> Self { + self.paths.insert(path.to_string(), operation); + self + } + + /// Create from HashMap (fallible, for compatibility) + pub fn from_map(paths: HashMap) -> Result { + if paths.is_empty() { + return Err("At least one path must be configured".to_string()); + } + Ok(Self { + paths: paths.into_iter().map(|(k, v)| (k.to_string(), v)).collect(), + }) + } + + /// Get the paths as a BTreeMap + fn as_map(&self) -> BTreeMap { + self.paths.clone() + } +} + +/// Config used to build a `JsonPathDeserializer`. +#[configurable_component] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JsonPathDeserializerConfig { + /// Path-based operation configuration + pub config: PathOperationConfig, + + /// JSON-specific decoding options + #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] + pub options: JsonPathDeserializerOptions, +} + +impl JsonPathDeserializerConfig { + /// Creates a new `JsonPathDeserializerConfig`. + pub fn new(config: PathOperationConfig, options: JsonPathDeserializerOptions) -> Self { + Self { config, options } + } + + /// Build the `JsonPathDeserializer` from this configuration. + pub fn build(&self) -> JsonPathDeserializer { + JsonPathDeserializer::new(self.config.clone(), self.options.clone()) + } + + /// Return the type of event built by this deserializer. + pub fn output_type(&self) -> DataType { + DataType::Log + } + + /// The schema produced by the deserializer. + pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition { + match log_namespace { + LogNamespace::Legacy => { + schema::Definition::empty_legacy_namespace().unknown_fields(Kind::json()) + } + LogNamespace::Vector => { + schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace]) + } + } + } +} + +fn default_feeder_capacity() -> usize { + 8192 +} + +/// JSON-specific decoding options for path-based decoder. +#[configurable_component] +#[derive(Debug, Clone, PartialEq, Eq, Derivative)] +#[derivative(Default)] +pub struct JsonPathDeserializerOptions { + /// Determines whether or not to replace invalid UTF-8 sequences instead of failing. + #[serde( + default = "default_lossy", + skip_serializing_if = "vector_core::serde::is_default" + )] + #[derivative(Default(value = "default_lossy()"))] + pub lossy: bool, + + /// Initial capacity for the JSON feeder buffer. + #[serde( + default = "default_feeder_capacity", + skip_serializing_if = "vector_core::serde::is_default" + )] + #[derivative(Default(value = "default_feeder_capacity()"))] + pub feeder_capacity: usize, +} + +/// Deserializer that builds `Event`s from JSON using path-based operations. +/// +/// This deserializer maintains parser state across calls to support streaming, +/// allowing JSON to be split across multiple `parse()` calls. +#[derive(Clone)] +pub struct JsonPathDeserializer { + path_map: BTreeMap, + lossy: bool, + /// Streaming parser state (wrapped in Arc for interior mutability) + parser_state: std::sync::Arc>, +} + +/// State maintained across parse() calls for streaming +struct StreamingParserState { + parser: JsonParser, + state: ParserState, +} + +impl JsonPathDeserializer { + /// Creates a new `JsonPathDeserializer`. + pub fn new(config: PathOperationConfig, options: JsonPathDeserializerOptions) -> Self { + let feeder = PushJsonFeeder::with_capacity(options.feeder_capacity); + let parser_options = JsonParserOptionsBuilder::default() + .with_streaming(true) + .build(); + let parser = JsonParser::new_with_options(feeder, parser_options); + + Self { + path_map: config.as_map(), + lossy: options.lossy, + parser_state: std::sync::Arc::new(std::sync::Mutex::new(StreamingParserState { + parser, + state: ParserState::new(), + })), + } + } +} + +/// Internal state for tracking during parsing +#[derive(Debug)] +struct ParserState { + /// Current path segments + path: Vec, + /// Stack of values being built + value_stack: Vec, + /// Events that have been emitted + events: Vec<(String, Value)>, + /// Track the path of the array that should be exploded + explode_path: Option, +} + +#[derive(Debug)] +enum ValueBuilder { + Object(BTreeMap), + Array(Vec), +} + +impl ParserState { + fn new() -> Self { + Self { + path: Vec::with_capacity(8), + value_stack: Vec::with_capacity(8), + events: Vec::with_capacity(16), + explode_path: None, + } + } + + /// Build current path string from segments + fn current_path(&self) -> String { + if self.path.is_empty() { + String::new() + } else { + self.path.join(".") + } + } + + fn push_path(&mut self, segment: String) { + self.path.push(segment); + } + + fn pop_path(&mut self) { + self.path.pop(); + } +} + +impl Deserializer for JsonPathDeserializer { + fn parse( + &self, + bytes: Bytes, + log_namespace: LogNamespace, + ) -> vector_common::Result> { + if bytes.is_empty() { + return Ok(smallvec![]); + } + + let bytes_slice = if self.lossy { + match std::str::from_utf8(&bytes) { + Ok(_) => bytes.to_vec(), + Err(_) => String::from_utf8_lossy(&bytes).into_owned().into_bytes(), + } + } else { + bytes.to_vec() + }; + + let mut streaming_state = self + .parser_state + .lock() + .unwrap_or_else(|poison| poison.into_inner()); + + let mut byte_offset = 0usize; + let total_len = bytes_slice.len(); + while byte_offset < total_len { + if streaming_state.parser.feeder.is_full() { + self.drain_parser_events(&mut streaming_state)?; + if streaming_state.parser.feeder.is_full() { + return Err("JSON feeder is full and cannot accept more bytes".into()); + } + } + + let pushed = streaming_state.parser.feeder.push_bytes(&bytes_slice[byte_offset..]); + if pushed == 0 { + return Err("JSON feeder could not accept bytes (0 bytes pushed)".into()); + } + byte_offset += pushed; + } + + self.drain_parser_events(&mut streaming_state)?; + + let mut result = SmallVec::new(); + let events_to_emit = std::mem::take(&mut streaming_state.state.events); + + for (expr, data) in events_to_emit { + let mut map = vrl::value::ObjectMap::new(); + map.insert("expr".into(), Value::from(expr)); + map.insert("data".into(), data); + + let log = LogEvent::from_map(map, vector_core::event::EventMetadata::default()); + let _ = log_namespace; + + result.push(Event::Log(log)); + } + + Ok(result) + } +} + +impl JsonPathDeserializer { + /// Process events from the parser until NeedMoreInput or None + fn drain_parser_events(&self, streaming_state: &mut StreamingParserState) -> Result<(), String> { + loop { + match streaming_state.parser.next_event().map_err(|e| format!("JSON parsing error: {:?}", e))? { + Some(JsonEvent::NeedMoreInput) => break, + Some(event) => { + self.process_event_with_streaming_state(streaming_state, event)?; + } + None => break, + } + } + Ok(()) + } + + fn process_event_with_streaming_state( + &self, + streaming_state: &mut StreamingParserState, + event: JsonEvent, + ) -> Result<(), String> { + self.process_event(&mut streaming_state.state, event, &streaming_state.parser) + } + + fn process_event( + &self, + state: &mut ParserState, + event: JsonEvent, + parser: &JsonParser, + ) -> Result<(), String> { + match event { + JsonEvent::StartObject => { + state.value_stack.push(ValueBuilder::Object(BTreeMap::new())); + } + JsonEvent::EndObject => { + if let Some(ValueBuilder::Object(map)) = state.value_stack.pop() { + let value = Value::Object(map.into_iter().map(|(k, v)| (k.into(), v)).collect()); + self.handle_value_and_pop_if_in_object(state, value)?; + } + } + JsonEvent::StartArray => { + let current_path = state.current_path(); + let should_explode = self + .path_map + .get(¤t_path) + .map(|op| *op == PathOperation::Explode) + .unwrap_or(false); + + if should_explode && state.explode_path.is_none() { + state.explode_path = Some(current_path.clone()); + } + + state.value_stack.push(ValueBuilder::Array(Vec::new())); + } + JsonEvent::EndArray => { + let should_pop_path = !state.path.is_empty(); + let current_path_owned = state.current_path().to_owned(); + let is_exploded = state.explode_path.as_ref().map(|s| s.as_str()) == Some(current_path_owned.as_str()); + + if let Some(ValueBuilder::Array(arr)) = state.value_stack.pop() { + if is_exploded { + state.explode_path = None; + } else { + let value = Value::Array(arr); + self.handle_value(state, value)?; + } + } + + if should_pop_path { + state.pop_path(); + } + } + JsonEvent::FieldName => { + let field_name = parser + .current_str() + .map_err(|e| format!("Error reading field name: {:?}", e))? + .to_string(); + state.push_path(field_name); + } + JsonEvent::ValueString => { + let s = parser + .current_str() + .map_err(|e| format!("Error reading string: {:?}", e))? + .to_string(); + let value = Value::from(s); + self.handle_value_and_pop_if_in_object(state, value)?; + } + JsonEvent::ValueInt => { + let i: i64 = parser + .current_int() + .map_err(|e| format!("Error reading int: {:?}", e))?; + let value = Value::from(i); + self.handle_value_and_pop_if_in_object(state, value)?; + } + JsonEvent::ValueFloat => { + let f: f64 = parser + .current_float() + .map_err(|e| format!("Error reading float: {:?}", e))?; + let value = Value::from(f); + self.handle_value_and_pop_if_in_object(state, value)?; + } + JsonEvent::ValueTrue => { + self.handle_value_and_pop_if_in_object(state, Value::from(true))?; + } + JsonEvent::ValueFalse => { + self.handle_value_and_pop_if_in_object(state, Value::from(false))?; + } + JsonEvent::ValueNull => { + self.handle_value_and_pop_if_in_object(state, Value::Null)?; + } + JsonEvent::NeedMoreInput => {} + } + Ok(()) + } + + fn handle_value(&self, state: &mut ParserState, value: Value) -> Result<(), String> { + let current_path = state.current_path(); + let operation = self.path_map.get(¤t_path).copied(); + + let inside_exploded_array = state.explode_path.as_ref().map(|s| s.as_str()) == Some(current_path.as_str()) + && matches!(state.value_stack.last(), Some(ValueBuilder::Array(_))); + + if inside_exploded_array { + state.events.push((current_path.clone(), value.clone())); + } else if let Some(operation) = operation { + match operation { + PathOperation::Identity => { + state.events.push((current_path.clone(), value.clone())); + } + PathOperation::Bytes => { + let bytes_value = match &value { + Value::Bytes(b) => Value::Bytes(b.clone()), + v => { + let s = v.to_string(); + Value::Bytes(s.into()) + } + }; + state.events.push((current_path.clone(), bytes_value)); + } + PathOperation::Explode => { + // Don't emit the array itself, elements are emitted above + } + } + } + + if !inside_exploded_array { + if let Some(parent) = state.value_stack.last_mut() { + match parent { + ValueBuilder::Object(map) => { + if let Some(key) = state.path.last() { + map.insert(key.clone(), value); + } + } + ValueBuilder::Array(arr) => { + arr.push(value); + } + } + } + } + + Ok(()) + } + + fn handle_value_and_pop_if_in_object(&self, state: &mut ParserState, value: Value) -> Result<(), String> { + self.handle_value(state, value)?; + if matches!(state.value_stack.last(), Some(ValueBuilder::Object(_))) { + state.pop_path(); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use lookup::owned_value_path; + + #[test] + fn test_identity_operation() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"meta": {"source": "foo"}}"#); + let events = deserializer + .parse(input, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events.len(), 1); + let log = events[0].as_log(); + assert_eq!(log["expr"], "meta".into()); + } + + #[test] + fn test_explode_operation() { + let config = PathOperationConfig::new( + owned_value_path!("results", "records"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = + Bytes::from(r#"{"results": {"records": [{"log": "bar"}, {"log": "baz"}]}}"#); + let events = deserializer + .parse(input, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events.len(), 2); + assert_eq!(events[0].as_log()["expr"], "results.records".into()); + assert_eq!(events[1].as_log()["expr"], "results.records".into()); + } + + #[test] + fn test_bytes_operation() { + let config = PathOperationConfig::new( + owned_value_path!("tail"), + PathOperation::Bytes, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"tail": "foo bar baz"}"#); + let events = deserializer + .parse(input, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events.len(), 1); + let log = events[0].as_log(); + assert_eq!(log["expr"], "tail".into()); + } + + #[test] + fn test_multiple_operations() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("results", "records"), PathOperation::Explode) + .with_path(owned_value_path!("tail"), PathOperation::Bytes); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from( + r#"{"meta": {"source": "foo"}, "results": {"records": [{"log": "bar"}, {"log": "baz"}]}, "tail": "foo bar baz"}"#, + ); + let events = deserializer + .parse(input, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events.len(), 4); + + let exprs: Vec = events + .iter() + .map(|e| e.as_log()["expr"].clone()) + .collect(); + + assert!(exprs.contains(&"meta".into())); + assert!(exprs.iter().filter(|e| *e == &Value::from("results.records")).count() == 2); + assert!(exprs.contains(&"tail".into())); + } + + #[test] + fn test_order_preservation() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("results", "records"), PathOperation::Explode) + .with_path(owned_value_path!("tail"), PathOperation::Bytes); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from( + r#"{"meta": {"source": "foo"}, "results": {"records": [{"log": "bar"}, {"log": "baz"}]}, "tail": "foo bar baz"}"#, + ); + let events = deserializer + .parse(input, LogNamespace::Vector) + .unwrap(); + + let exprs: Vec = events + .iter() + .map(|e| e.as_log()["expr"].clone()) + .collect(); + + assert_eq!(events.len(), 4); + assert_eq!(exprs[0], "meta".into()); + assert_eq!(exprs[1], "results.records".into()); + assert_eq!(exprs[2], "results.records".into()); + assert_eq!(exprs[3], "tail".into()); + } + + #[test] + fn test_multiple_concatenated_json() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("results", "records"), PathOperation::Explode); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from( + r#"{"meta": {"source": "first"}, "results": {"records": [{"log": "a"}]}}{"meta": {"source": "second"}, "results": {"records": [{"log": "b"}, {"log": "c"}]}}"#, + ); + let events = deserializer + .parse(input, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events.len(), 5); + + let exprs: Vec = events + .iter() + .map(|e| e.as_log()["expr"].clone()) + .collect(); + + assert_eq!(exprs[0], "meta".into()); + assert_eq!(exprs[1], "results.records".into()); + assert_eq!(exprs[2], "meta".into()); + assert_eq!(exprs[3], "results.records".into()); + assert_eq!(exprs[4], "results.records".into()); + + let first_meta = &events[0].as_log()["data"]; + let second_meta = &events[2].as_log()["data"]; + assert_ne!(first_meta, second_meta); + } + + #[test] + fn test_newline_delimited_json() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("results", "records"), PathOperation::Explode); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from( + r#"{"meta": {"source": "line1"}, "results": {"records": [{"log": "x"}]}} +{"meta": {"source": "line2"}, "results": {"records": [{"log": "y"}]}} +{"meta": {"source": "line3"}, "results": {"records": [{"log": "z"}]}}"#, + ); + let events = deserializer + .parse(input, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events.len(), 6); + + let exprs: Vec = events + .iter() + .map(|e| e.as_log()["expr"].clone()) + .collect(); + + for i in 0..3 { + assert_eq!(exprs[i * 2], "meta".into()); + assert_eq!(exprs[i * 2 + 1], "results.records".into()); + } + } + + #[test] + fn test_streaming_maintains_state_across_calls() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("results", "records"), PathOperation::Explode); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let chunk1 = Bytes::from(r#"{"meta": {"source": "first"}, "results": {"records": [{"log": {"a":"b"}}]}}"#); + let result1 = deserializer.parse(chunk1, LogNamespace::Vector).unwrap(); + assert_eq!(result1.len(), 2); + + let chunk2 = Bytes::from(r#"{"meta": {"source": "second"}, "results": {"records": [{"log": "b"}]}}"#); + let result2 = deserializer.parse(chunk2, LogNamespace::Vector).unwrap(); + assert_eq!(result2.len(), 2); + + assert_eq!(result1[0].as_log()["expr"], "meta".into()); + assert_eq!(result2[0].as_log()["expr"], "meta".into()); + } + + #[test] + fn test_multiple_parse_calls_with_complete_objects() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("results", "records"), PathOperation::Explode); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let frame1 = Bytes::from( + r#"{"meta": {"source": "first"}, "results": {"records": [{"log": "a"}]}}"#, + ); + let events1 = deserializer + .parse(frame1, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events1.len(), 2); + assert_eq!(events1[0].as_log()["expr"], "meta".into()); + assert_eq!(events1[1].as_log()["expr"], "results.records".into()); + + let frame2 = Bytes::from( + r#"{"meta": {"source": "second"}, "results": {"records": [{"log": "b"}, {"log": "c"}]}}"#, + ); + let events2 = deserializer + .parse(frame2, LogNamespace::Vector) + .unwrap(); + + assert_eq!(events2.len(), 3); + assert_eq!(events2[0].as_log()["expr"], "meta".into()); + assert_eq!(events2[1].as_log()["expr"], "results.records".into()); + assert_eq!(events2[2].as_log()["expr"], "results.records".into()); + + let first_meta = &events1[0].as_log()["data"]; + let second_meta = &events2[0].as_log()["data"]; + assert_ne!(first_meta, second_meta); + } + + #[test] + fn test_split_json_object_across_parse_calls() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("results", "records"), PathOperation::Explode); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let chunk1 = Bytes::from(r#"{"meta": {"source": "foo"}, "results": {"records": [{"log": "ba"#); + let result1 = deserializer.parse(chunk1, LogNamespace::Vector).unwrap(); + + assert_eq!(result1.len(), 1); + assert_eq!(result1[0].as_log()["expr"], "meta".into()); + + let chunk2 = Bytes::from(r#"r"}]}}"#); + let result2 = deserializer.parse(chunk2, LogNamespace::Vector).unwrap(); + + assert_eq!(result2.len(), 1); + assert_eq!(result2[0].as_log()["expr"], "results.records".into()); + + let log_value = &result2[0].as_log()["data"]; + let obj = log_value.as_object().expect("data should be an object"); + assert_eq!(obj.get("log"), Some(&Value::from("bar"))); + } + + #[test] + fn test_invalid_path_validation() { + assert!(OwnedValuePath::try_from("".to_string()).is_err()); + assert!(OwnedValuePath::try_from("[0]".to_string()).is_ok()); + + assert!(OwnedValuePath::try_from(".".to_string()).is_ok()); + assert!(OwnedValuePath::try_from("meta".to_string()).is_ok()); + assert!(OwnedValuePath::try_from("results.records".to_string()).is_ok()); + } + + #[test] + fn test_empty_config_rejected() { + let empty_map: HashMap = HashMap::new(); + let result = PathOperationConfig::from_map(empty_map); + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "At least one path must be configured"); + } + + #[test] + fn test_duplicate_paths_handled() { + let config = PathOperationConfig::new( + owned_value_path!("meta"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("meta"), PathOperation::Bytes); + + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + let input = Bytes::from(r#"{"meta": "test"}"#); + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + assert_eq!(events.len(), 1); + assert!(matches!(events[0].as_log()["data"], Value::Bytes(_))); + } + + #[test] + fn test_multiple_arrays_at_same_depth() { + let config = PathOperationConfig::new( + owned_value_path!("array1"), + PathOperation::Explode, + ) + .with_path(owned_value_path!("array2"), PathOperation::Explode); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"array1": [1, 2], "array2": [3, 4, 5]}"#); + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + assert_eq!(events.len(), 5); + + let array1_events: Vec<_> = events + .iter() + .filter(|e| e.as_log()["expr"] == "array1".into()) + .collect(); + let array2_events: Vec<_> = events + .iter() + .filter(|e| e.as_log()["expr"] == "array2".into()) + .collect(); + + assert_eq!(array1_events.len(), 2); + assert_eq!(array2_events.len(), 3); + } + + #[test] + fn test_explode_with_deeply_nested_objects() { + let config = PathOperationConfig::new( + owned_value_path!("events"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from( + r#"{"events": [{"id": 1, "user": {"name": "Alice", "profile": {"age": 30}}}]}"#, + ); + + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + assert_eq!(events.len(), 1); + assert_eq!(events[0].as_log()["expr"], "events".into()); + + let data = &events[0].as_log()["data"]; + let obj = data.as_object().expect("data should be an object"); + assert_eq!(obj.get("id"), Some(&Value::Integer(1))); + } + + #[test] + fn test_explode_primitive_array() { + let config = PathOperationConfig::new( + owned_value_path!("numbers"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"numbers": [1, 2, 3]}"#); + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + assert_eq!(events.len(), 3); + + for event in &events { + assert_eq!(event.as_log()["expr"], "numbers".into()); + } + + assert_eq!(events[0].as_log()["data"], Value::Integer(1)); + assert_eq!(events[1].as_log()["data"], Value::Integer(2)); + assert_eq!(events[2].as_log()["data"], Value::Integer(3)); + } + + #[test] + fn test_explode_string_array() { + let config = PathOperationConfig::new( + owned_value_path!("items"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"items": ["a", "b", "c"]}"#); + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + assert_eq!(events.len(), 3); + + assert_eq!(events[0].as_log()["data"], Value::from("a")); + assert_eq!(events[1].as_log()["data"], Value::from("b")); + assert_eq!(events[2].as_log()["data"], Value::from("c")); + } + + #[test] + fn test_explode_mixed_primitive_array() { + let config = PathOperationConfig::new( + owned_value_path!("mixed"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"mixed": [1, "two", true, null]}"#); + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + assert_eq!(events.len(), 4); + + assert_eq!(events[0].as_log()["data"], Value::Integer(1)); + assert_eq!(events[1].as_log()["data"], Value::from("two")); + assert_eq!(events[2].as_log()["data"], Value::from(true)); + assert_eq!(events[3].as_log()["data"], Value::Null); + } + + #[test] + fn test_large_complex_nested_json_with_explode() { + let config = PathOperationConfig::new( + owned_value_path!("departments"), + PathOperation::Identity, + ) + .with_path(owned_value_path!("departments", "teams"), PathOperation::Explode) + .with_path(owned_value_path!("metrics"), PathOperation::Identity); + + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let large_json = r#"{ + "company": "Acme Corporation", + "year": 2024, + "metrics": { + "revenue": 1000000, + "employees": 250, + "growth_rate": 0.15 + }, + "departments": { + "name": "Engineering", + "budget": 5000000, + "teams": [ + { + "team_id": 1, + "team_name": "Backend Team", + "location": "San Francisco", + "members": [ + { + "id": 101, + "name": "Alice Johnson", + "role": "Senior Engineer", + "skills": ["Rust", "Python", "Docker"], + "projects": [ + {"name": "API Gateway", "status": "active", "priority": "high"}, + {"name": "Data Pipeline", "status": "completed", "priority": "medium"} + ] + }, + { + "id": 102, + "name": "Bob Smith", + "role": "Tech Lead", + "skills": ["Go", "Kubernetes", "PostgreSQL"], + "projects": [ + {"name": "Microservices", "status": "active", "priority": "critical"}, + {"name": "Monitoring", "status": "active", "priority": "high"} + ] + }, + { + "id": 103, + "name": "Carol White", + "role": "Engineer", + "skills": ["JavaScript", "React", "Node.js"], + "projects": [ + {"name": "Admin Dashboard", "status": "planning", "priority": "medium"} + ] + } + ], + "quarterly_goals": { + "q1": "Improve latency by 50%", + "q2": "Launch new API version", + "q3": "Scale to 10M requests/day", + "q4": "Reduce costs by 30%" + } + }, + { + "team_id": 2, + "team_name": "Frontend Team", + "location": "New York", + "members": [ + { + "id": 201, + "name": "David Brown", + "role": "Senior Engineer", + "skills": ["TypeScript", "React", "GraphQL"], + "projects": [ + {"name": "User Portal", "status": "active", "priority": "critical"}, + {"name": "Mobile App", "status": "active", "priority": "high"} + ] + }, + { + "id": 202, + "name": "Emma Davis", + "role": "UI/UX Engineer", + "skills": ["CSS", "Figma", "Accessibility"], + "projects": [ + {"name": "Design System", "status": "completed", "priority": "high"} + ] + } + ], + "quarterly_goals": { + "q1": "Redesign homepage", + "q2": "Mobile-first approach", + "q3": "A11y compliance", + "q4": "Performance optimization" + } + }, + { + "team_id": 3, + "team_name": "Data Science Team", + "location": "Austin", + "members": [ + { + "id": 301, + "name": "Frank Miller", + "role": "Data Scientist", + "skills": ["Python", "TensorFlow", "SQL"], + "projects": [ + {"name": "Recommendation Engine", "status": "active", "priority": "critical"}, + {"name": "Anomaly Detection", "status": "research", "priority": "medium"} + ] + }, + { + "id": 302, + "name": "Grace Lee", + "role": "ML Engineer", + "skills": ["PyTorch", "Kubernetes", "MLOps"], + "projects": [ + {"name": "Model Training Pipeline", "status": "active", "priority": "high"} + ] + }, + { + "id": 303, + "name": "Henry Wilson", + "role": "Data Analyst", + "skills": ["R", "Tableau", "Statistics"], + "projects": [ + {"name": "Business Intelligence", "status": "active", "priority": "medium"} + ] + } + ], + "quarterly_goals": { + "q1": "Deploy ML model v2", + "q2": "Real-time predictions", + "q3": "Expand datasets by 5x", + "q4": "Automated retraining" + } + } + ] + } +}"#; + + assert!(large_json.len() > 1024, "JSON should be larger than 1KB, got {} bytes", large_json.len()); + + let input = Bytes::from(large_json); + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + assert_eq!(events.len(), 5, "Expected 5 events: 2 identity + 3 exploded teams"); + + let metrics_events: Vec<_> = events + .iter() + .filter(|e| e.as_log()["expr"] == "metrics".into()) + .collect(); + assert_eq!(metrics_events.len(), 1); + let metrics_obj = metrics_events[0].as_log()["data"] + .as_object() + .expect("metrics data should be an object"); + assert_eq!(metrics_obj.get("revenue"), Some(&Value::Integer(1000000))); + assert_eq!(metrics_obj.get("employees"), Some(&Value::Integer(250))); + + let dept_events: Vec<_> = events + .iter() + .filter(|e| e.as_log()["expr"] == "departments".into()) + .collect(); + assert_eq!(dept_events.len(), 1); + let dept_obj = dept_events[0].as_log()["data"] + .as_object() + .expect("departments data should be an object"); + assert_eq!(dept_obj.get("name"), Some(&Value::from("Engineering"))); + assert_eq!(dept_obj.get("budget"), Some(&Value::Integer(5000000))); + + let team_events: Vec<_> = events + .iter() + .filter(|e| e.as_log()["expr"] == "departments.teams".into()) + .collect(); + assert_eq!(team_events.len(), 3, "Expected 3 exploded team events"); + + let team = team_events[0].as_log()["data"] + .as_object() + .expect("team data should be an object"); + assert_eq!(team.get("team_id"), Some(&Value::Integer(1))); + assert_eq!(team.get("team_name"), Some(&Value::from("Backend Team"))); + assert_eq!(team.get("location"), Some(&Value::from("San Francisco"))); + + let members = team.get("members") + .and_then(|v| v.as_array()) + .expect("members should be an array"); + assert_eq!(members.len(), 3, "Backend team should have 3 members"); + + let member = members[0].as_object().expect("member should be an object"); + assert_eq!(member.get("id"), Some(&Value::Integer(101))); + assert_eq!(member.get("name"), Some(&Value::from("Alice Johnson"))); + assert_eq!(member.get("role"), Some(&Value::from("Senior Engineer"))); + + let skills = member.get("skills") + .and_then(|v| v.as_array()) + .expect("skills should be an array"); + assert_eq!(skills.len(), 3); + assert_eq!(skills[0], Value::from("Rust")); + + let projects = member.get("projects") + .and_then(|v| v.as_array()) + .expect("projects should be an array"); + assert_eq!(projects.len(), 2); + let project = projects[0].as_object().expect("project should be an object"); + assert_eq!(project.get("name"), Some(&Value::from("API Gateway"))); + assert_eq!(project.get("status"), Some(&Value::from("active"))); + + let goals = team.get("quarterly_goals") + .and_then(|v| v.as_object()) + .expect("quarterly_goals should be an object"); + assert_eq!(goals.get("q1"), Some(&Value::from("Improve latency by 50%"))); + + let team2 = team_events[1].as_log()["data"] + .as_object() + .expect("team data should be an object"); + assert_eq!(team2.get("team_id"), Some(&Value::Integer(2))); + assert_eq!(team2.get("team_name"), Some(&Value::from("Frontend Team"))); + + let members2 = team2.get("members") + .and_then(|v| v.as_array()) + .expect("members should be an array"); + assert_eq!(members2.len(), 2, "Frontend team should have 2 members"); + + let team3 = team_events[2].as_log()["data"] + .as_object() + .expect("team data should be an object"); + assert_eq!(team3.get("team_id"), Some(&Value::Integer(3))); + assert_eq!(team3.get("team_name"), Some(&Value::from("Data Science Team"))); + + let members3 = team3.get("members") + .and_then(|v| v.as_array()) + .expect("members should be an array"); + assert_eq!(members3.len(), 3, "Data Science team should have 3 members"); + + } + + #[test] + fn test_bad_json() { + let config = PathOperationConfig::new( + owned_value_path!("data"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{invalid json}"#); + + let result = deserializer.parse(input, LogNamespace::Vector); + assert!(result.is_err(), "Should fail on malformed JSON"); + } + + #[test] + fn test_partial_json_then_bogus() { + let config = PathOperationConfig::new( + owned_value_path!("items"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + // First parse with partial but valid JSON + let input1 = Bytes::from(r#"{"items": [1"#); + let result1 = deserializer.parse(input1, LogNamespace::Vector); + assert!(result1.is_ok(), "Partial JSON should be accepted in streaming mode"); + + // Now send completely invalid JSON that cannot be parsed + let input2 = Bytes::from(r#"xxx invalid xxx"#); + let result2 = deserializer.parse(input2, LogNamespace::Vector); + assert!(result2.is_err(), "Should fail on bogus continuation"); + } + + #[test] + fn test_explode_on_non_array() { + let config = PathOperationConfig::new( + owned_value_path!("user"), + PathOperation::Explode, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"user": {"id": 1, "name": "Alice"}}"#); + + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + assert_eq!(events.len(), 0, "Explode on non-array should produce no events"); + } + + #[test] + fn test_identity_for_array() { + let config = PathOperationConfig::new( + owned_value_path!("items"), + PathOperation::Identity, + ); + let deserializer = JsonPathDeserializer::new(config, JsonPathDeserializerOptions::default()); + + let input = Bytes::from(r#"{"items": [1, 2, 3, 4, 5]}"#); + + let events = deserializer.parse(input, LogNamespace::Vector).unwrap(); + + // Find the event that contains the complete array + let array_event = events + .iter() + .find(|e| { + e.as_log()["expr"] == Value::from("items") + && matches!(e.as_log()["data"], Value::Array(_)) + }) + .expect("Should have an event with the full array"); + + assert_eq!(array_event.as_log()["expr"], Value::from("items")); + + let arr = array_event.as_log()["data"] + .as_array() + .expect("data should be an array"); + + assert_eq!(arr.len(), 5); + assert_eq!(arr[0], Value::Integer(1)); + assert_eq!(arr[1], Value::Integer(2)); + assert_eq!(arr[2], Value::Integer(3)); + assert_eq!(arr[3], Value::Integer(4)); + assert_eq!(arr[4], Value::Integer(5)); + } +} diff --git a/lib/codecs/src/decoding/format/mod.rs b/lib/codecs/src/decoding/format/mod.rs index 9e2dee7de..345a07687 100644 --- a/lib/codecs/src/decoding/format/mod.rs +++ b/lib/codecs/src/decoding/format/mod.rs @@ -8,6 +8,7 @@ mod bytes; mod gelf; mod influxdb; mod json; +mod json_paths; mod native; mod native_json; mod protobuf; @@ -21,6 +22,10 @@ use dyn_clone::DynClone; pub use gelf::{GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions}; pub use influxdb::{InfluxdbDeserializer, InfluxdbDeserializerConfig}; pub use json::{JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions}; +pub use json_paths::{ + JsonPathDeserializer, JsonPathDeserializerConfig, JsonPathDeserializerOptions, + PathOperation, PathOperationConfig, +}; pub use native::{NativeDeserializer, NativeDeserializerConfig}; pub use native_json::{ NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, diff --git a/lib/codecs/src/decoding/format/native_json.rs b/lib/codecs/src/decoding/format/native_json.rs index eac15b305..62d9190da 100644 --- a/lib/codecs/src/decoding/format/native_json.rs +++ b/lib/codecs/src/decoding/format/native_json.rs @@ -11,7 +11,7 @@ use vector_core::config::LogNamespace; /// Config used to build a `NativeJsonDeserializer`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct NativeJsonDeserializerConfig { /// Vector's native JSON-specific decoding options. #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index 42f866589..cd9269728 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -18,7 +18,7 @@ use super::Deserializer; /// Config used to build a `ProtobufDeserializer`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct ProtobufDeserializerConfig { /// Protobuf-specific decoding options. #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] diff --git a/lib/codecs/src/decoding/format/syslog.rs b/lib/codecs/src/decoding/format/syslog.rs index 6a5d7e135..409ae4fe1 100644 --- a/lib/codecs/src/decoding/format/syslog.rs +++ b/lib/codecs/src/decoding/format/syslog.rs @@ -18,7 +18,7 @@ use super::{default_lossy, Deserializer}; /// Config used to build a `SyslogDeserializer`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct SyslogDeserializerConfig { #[serde(skip)] source: Option<&'static str>, diff --git a/lib/codecs/src/decoding/format/vrl.rs b/lib/codecs/src/decoding/format/vrl.rs index 191ce9bf1..62318cba3 100644 --- a/lib/codecs/src/decoding/format/vrl.rs +++ b/lib/codecs/src/decoding/format/vrl.rs @@ -14,7 +14,7 @@ use vrl::value::Kind; /// Config used to build a `VrlDeserializer`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct VrlDeserializerConfig { /// VRL-specific decoding options. pub vrl: VrlDeserializerOptions, diff --git a/lib/codecs/src/decoding/framing/character_delimited.rs b/lib/codecs/src/decoding/framing/character_delimited.rs index bea46b49d..60ecc601c 100644 --- a/lib/codecs/src/decoding/framing/character_delimited.rs +++ b/lib/codecs/src/decoding/framing/character_delimited.rs @@ -8,7 +8,7 @@ use super::BoxedFramingError; /// Config used to build a `CharacterDelimitedDecoder`. #[configurable_component] -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct CharacterDelimitedDecoderConfig { /// Options for the character delimited decoder. pub character_delimited: CharacterDelimitedDecoderOptions, diff --git a/lib/codecs/src/decoding/framing/chunked_gelf.rs b/lib/codecs/src/decoding/framing/chunked_gelf.rs index b0160cd87..f8fcc8da4 100644 --- a/lib/codecs/src/decoding/framing/chunked_gelf.rs +++ b/lib/codecs/src/decoding/framing/chunked_gelf.rs @@ -26,7 +26,7 @@ const fn default_timeout_secs() -> f64 { /// Config used to build a `ChunkedGelfDecoder`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct ChunkedGelfDecoderConfig { /// Options for the chunked GELF decoder. #[serde(default)] @@ -80,6 +80,19 @@ pub struct ChunkedGelfDecoderOptions { pub decompression: ChunkedGelfDecompressionConfig, } +impl PartialEq for ChunkedGelfDecoderOptions { + fn eq(&self, other: &Self) -> bool { + !self.timeout_secs.is_nan() + && !other.timeout_secs.is_nan() + && (self.timeout_secs - other.timeout_secs).abs() < 1e-8 + && self.pending_messages_limit == other.pending_messages_limit + && self.max_length == other.max_length + && self.decompression == other.decompression + } +} + +impl Eq for ChunkedGelfDecoderOptions {} + /// Decompression options for ChunkedGelfDecoder. #[configurable_component] #[derive(Clone, Copy, Debug, PartialEq, Eq, Derivative)] diff --git a/lib/codecs/src/decoding/framing/length_delimited.rs b/lib/codecs/src/decoding/framing/length_delimited.rs index b72442e1c..718ddd7a3 100644 --- a/lib/codecs/src/decoding/framing/length_delimited.rs +++ b/lib/codecs/src/decoding/framing/length_delimited.rs @@ -9,7 +9,7 @@ use super::BoxedFramingError; /// Config used to build a `LengthDelimitedDecoder`. #[configurable_component] -#[derive(Debug, Clone, Derivative)] +#[derive(Debug, Clone, Derivative, PartialEq, Eq)] #[derivative(Default)] pub struct LengthDelimitedDecoderConfig { /// Options for the length delimited decoder. diff --git a/lib/codecs/src/decoding/framing/octet_counting.rs b/lib/codecs/src/decoding/framing/octet_counting.rs index b5248b26b..f10570d44 100644 --- a/lib/codecs/src/decoding/framing/octet_counting.rs +++ b/lib/codecs/src/decoding/framing/octet_counting.rs @@ -10,7 +10,7 @@ use super::BoxedFramingError; /// Config used to build a `OctetCountingDecoder`. #[configurable_component] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct OctetCountingDecoderConfig { #[serde(default, skip_serializing_if = "vector_core::serde::is_default")] /// Options for the octet counting decoder. diff --git a/lib/codecs/src/decoding/mod.rs b/lib/codecs/src/decoding/mod.rs index 45da11e8c..57f524c8d 100644 --- a/lib/codecs/src/decoding/mod.rs +++ b/lib/codecs/src/decoding/mod.rs @@ -12,6 +12,7 @@ pub use format::{ BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer, GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer, InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions, + JsonPathDeserializer, JsonPathDeserializerConfig, JsonPathDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions, @@ -22,9 +23,9 @@ pub use framing::{ BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder, CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder, ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder, - LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig, - NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig, - OctetCountingDecoderOptions, NetflowDecoderOptions, NetflowDecoder, NetflowDecoderConfig + LengthDelimitedDecoderConfig, NetflowDecoder, NetflowDecoderConfig, NetflowDecoderOptions, + NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig, NewlineDelimitedDecoderOptions, + OctetCountingDecoder, OctetCountingDecoderConfig, OctetCountingDecoderOptions, }; use smallvec::SmallVec; use std::fmt::Debug; @@ -80,7 +81,7 @@ impl StreamDecodingError for Error { /// a frame that must be prefixed, or delimited, in a way that marks where an event begins and /// ends within the byte stream. #[configurable_component] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] #[serde(tag = "method", rename_all = "snake_case")] #[configurable(metadata(docs::enum_tag_description = "The framing method."))] pub enum FramingConfig { @@ -164,12 +165,12 @@ impl FramingConfig { FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()), FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()), FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()), - FramingConfig::Netflow { - netflow_decoder - } => Framer::Netflow (NetflowDecoderConfig { - netflow_decoder_options: netflow_decoder.clone(), - }.build(), - ), + FramingConfig::Netflow { netflow_decoder } => Framer::Netflow( + NetflowDecoderConfig { + netflow_decoder_options: netflow_decoder.clone(), + } + .build(), + ), FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()), FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()), FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()), @@ -231,7 +232,7 @@ impl tokio_util::codec::Decoder for Framer { /// Deserializer configuration. #[configurable_component] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] #[serde(tag = "codec", rename_all = "snake_case")] #[configurable(description = "Configures how events are decoded from raw bytes.")] #[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))] @@ -306,6 +307,12 @@ pub enum DeserializerConfig { avro: AvroDeserializerOptions, }, + /// Decodes JSON with JSONPath-based extraction and array explosion. + /// + /// Allows selective extraction of fields from JSON documents using JSONPath expressions, + /// with support for exploding arrays into individual events. + JsonPaths(JsonPathDeserializerConfig), + /// Decodes the raw bytes as a string and passes them as input to a [VRL][vrl] program. /// /// [vrl]: https://vector.dev/docs/reference/vrl @@ -355,6 +362,12 @@ impl From for DeserializerConfig { } } +impl From for DeserializerConfig { + fn from(config: JsonPathDeserializerConfig) -> Self { + Self::JsonPaths(config) + } +} + impl DeserializerConfig { /// Build the `Deserializer` from this configuration. pub fn build(&self) -> vector_common::Result { @@ -376,6 +389,7 @@ impl DeserializerConfig { DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())), DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())), DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())), + DeserializerConfig::JsonPaths(config) => Ok(Deserializer::JsonPaths(config.build())), DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)), } } @@ -387,6 +401,7 @@ impl DeserializerConfig { DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()), DeserializerConfig::Bytes | DeserializerConfig::Json(_) + | DeserializerConfig::JsonPaths(_) | DeserializerConfig::Influxdb(_) | DeserializerConfig::NativeJson(_) => { FramingConfig::NewlineDelimited(Default::default()) @@ -418,6 +433,7 @@ impl DeserializerConfig { .output_type(), DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(), DeserializerConfig::Json(config) => config.output_type(), + DeserializerConfig::JsonPaths(config) => config.output_type(), DeserializerConfig::Protobuf(config) => config.output_type(), #[cfg(feature = "syslog")] DeserializerConfig::Syslog(config) => config.output_type(), @@ -438,6 +454,7 @@ impl DeserializerConfig { .schema_definition(log_namespace), DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace), DeserializerConfig::Json(config) => config.schema_definition(log_namespace), + DeserializerConfig::JsonPaths(config) => config.schema_definition(log_namespace), DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace), #[cfg(feature = "syslog")] DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace), @@ -474,6 +491,7 @@ impl DeserializerConfig { (DeserializerConfig::Protobuf(_), _) => "application/octet-stream", ( DeserializerConfig::Json(_) + | DeserializerConfig::JsonPaths(_) | DeserializerConfig::NativeJson(_) | DeserializerConfig::Bytes | DeserializerConfig::Gelf(_) @@ -496,6 +514,8 @@ pub enum Deserializer { Bytes(BytesDeserializer), /// Uses a `JsonDeserializer` for deserialization. Json(JsonDeserializer), + /// Uses a `JsonPathDeserializer` for deserialization. + JsonPaths(JsonPathDeserializer), /// Uses a `ProtobufDeserializer` for deserialization. Protobuf(ProtobufDeserializer), #[cfg(feature = "syslog")] @@ -525,6 +545,7 @@ impl format::Deserializer for Deserializer { Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace), Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace), Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace), + Deserializer::JsonPaths(deserializer) => deserializer.parse(bytes, log_namespace), Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace), #[cfg(feature = "syslog")] Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace), diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 3f473c726..a6da6cdb0 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -13,8 +13,8 @@ pub use format::{ JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, ParquetSerializer, ParquetSerializerConfig, ParquetSerializerOptions, - ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, - RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, + ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, + RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; pub use framing::{ BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder, @@ -259,11 +259,7 @@ pub enum SerializerConfig { /// Encodes events in [Apache Parquet format][parquet]. /// /// [parquet]: https://parquet.apache.org/ - Parquet { - /// Apache Parquet-specific encoder options. - parquet: ParquetSerializerOptions, - }, - + Parquet(ParquetSerializerOptions), /// Plain text encoding. /// /// This encoding uses the `message` field of a log event. For metrics, it uses an @@ -363,7 +359,7 @@ impl SerializerConfig { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())), - SerializerConfig::Parquet { .. } => { + SerializerConfig::Parquet(..) => { Err("Parquet serializer is not for single event encoding.".into()) } } @@ -375,12 +371,13 @@ impl SerializerConfig { &self, ) -> Result, Box> { match self { - SerializerConfig::Parquet { parquet } => Ok(Some(BatchSerializer::Parquet( + SerializerConfig::Parquet(parquet) => Ok(Some(BatchSerializer::Parquet( ParquetSerializerConfig::new( parquet.schema.clone(), parquet.record_complete_event.clone(), - parquet.ignore_type_mismatch_for_optional.clone() - ).build()?, + parquet.ignore_type_mismatch_for_optional.clone(), + ) + .build()?, ))), SerializerConfig::Avro { .. } | SerializerConfig::Cef(..) @@ -422,7 +419,7 @@ impl SerializerConfig { | SerializerConfig::NativeJson | SerializerConfig::RawMessage | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited, - SerializerConfig::Parquet { .. } => FramingConfig::Bytes, + SerializerConfig::Parquet(..) => FramingConfig::Bytes, SerializerConfig::Gelf => { FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0)) } @@ -445,13 +442,12 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), - SerializerConfig::Parquet { parquet } => { - ParquetSerializerConfig::new( - parquet.schema.clone(), - parquet.record_complete_event.clone(), - parquet.ignore_type_mismatch_for_optional.clone() - ).input_type() - } + SerializerConfig::Parquet(parquet) => ParquetSerializerConfig::new( + parquet.schema.clone(), + parquet.record_complete_event.clone(), + parquet.ignore_type_mismatch_for_optional.clone(), + ) + .input_type(), } } @@ -471,13 +467,12 @@ impl SerializerConfig { SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), - SerializerConfig::Parquet { parquet } => { - ParquetSerializerConfig::new( - parquet.schema.clone(), - parquet.record_complete_event.clone(), - parquet.ignore_type_mismatch_for_optional.clone() - ).schema_requirement() - } + SerializerConfig::Parquet(parquet) => ParquetSerializerConfig::new( + parquet.schema.clone(), + parquet.record_complete_event.clone(), + parquet.ignore_type_mismatch_for_optional.clone(), + ) + .schema_requirement(), } } } diff --git a/lib/observo/private b/lib/observo/private index 47b8fd565..ba75115b5 160000 --- a/lib/observo/private +++ b/lib/observo/private @@ -1 +1 @@ -Subproject commit 47b8fd565985cec57336a7b9c13b21a88b4203f7 +Subproject commit ba75115b5ec6e8d72ebb85c92a100b6d3079bc2f diff --git a/lib/observo/sauth/Cargo.toml b/lib/observo/sauth/Cargo.toml index eae0c3c47..f30a6caaa 100644 --- a/lib/observo/sauth/Cargo.toml +++ b/lib/observo/sauth/Cargo.toml @@ -36,11 +36,14 @@ toml.workspace = true hyper.workspace = true rand.workspace = true url = { workspace = true, optional = true } +# this is a hack to enable tracing in hyper-util (debugging can be difficult without this) +hyper-util = { version = "0.1.10", optional = true, features = ["full"] } [features] default = [] test-support = ["dep:thiserror", "dep:poem", "dep:poem-derive", "dep:oxide-auth", - "dep:oxide-auth-poem", "dep:tempfile", "dep:rcgen", "dep:serde_urlencoded", "dep:url"] + "dep:oxide-auth-poem", "dep:tempfile", "dep:rcgen", "dep:serde_urlencoded", "dep:url", + "dep:hyper-util"] [dev-dependencies] sauth = {features = ["test-support"], path = "."} diff --git a/lib/observo/scol/Cargo.toml b/lib/observo/scol/Cargo.toml index a9a62aef7..f746c9d32 100644 --- a/lib/observo/scol/Cargo.toml +++ b/lib/observo/scol/Cargo.toml @@ -57,6 +57,8 @@ sha2 ={ version = "0.10.9" } hmac = { version = "0.12.1" } data-encoding = { version = "2.9.0" } urlencoding = { version = "2.1.3", default-features = false } +tokio-util.workspace = true +async-trait.workspace = true # Optional dependencies for testing (below) textwrap = { version = "0.16", optional = true } @@ -66,6 +68,9 @@ oxide-auth-poem = { version = "0.3.0", path = "../vendor/oxide-auth/oxide-auth-p tempfile = { version = "3.15.0", optional = true } assert_matches = { version = "1.5", optional = true} pathsearch = {version = "0.2.0", optional = true} +stream-cancel = { version = "0.8.2", default-features = false, optional = true} +poem = { version = "3.1.7", default-features = true, features = ["rustls"], optional = true} +vrl = {workspace = true, optional = true} [dev-dependencies] assert_matches = { version = "1.5" } @@ -73,13 +78,15 @@ vrl.workspace = true regex = "1" serde_urlencoded = "0.7" tempfile = "3.15.0" -tracing-test = { version = "0.2.5" } +tracing-test = { version = "0.2.5", features = [ "no-env-filter" ] } test-case = { version = "3.3" } tokio = { version = "1.43.0", features = ["test-util"] } scol = { path = ".", features = ["test-scenarios"] } chkpts = { path = "../chkpts" } - [features] -test-scenarios = ["dep:textwrap", "dep:rstest", "dep:oxide-auth", "dep:oxide-auth-poem", "dep:tempfile", "dep:assert_matches", "sauth/test-support", "dep:pathsearch"] +test-scenarios = ["vector-lib/test", "dep:textwrap", "dep:rstest", "dep:oxide-auth", + "dep:oxide-auth-poem", "dep:tempfile", "dep:assert_matches", "sauth/test-support", + "dep:pathsearch", "tokio/macros", "tokio/rt-multi-thread", "dep:stream-cancel", "dep:poem", + "dep:vrl"] default = [] diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index dc991c993..30279f2e0 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -152,6 +152,7 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S // `message` field... but it's close enough for now. DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()), DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()), + DeserializerConfig::JsonPaths { .. } => SerializerConfig::Json(JsonSerializerConfig::default()), DeserializerConfig::Protobuf(config) => { SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig { protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions { @@ -229,7 +230,7 @@ fn serializer_config_to_deserializer( }) } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, - SerializerConfig::Parquet { .. } => todo!(), + SerializerConfig::Parquet(..) => todo!(), }; deserializer_config.build()