diff --git a/Cargo.lock b/Cargo.lock index 4ae43c3fc7..685735814e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12979,6 +12979,8 @@ dependencies = [ "async-stream", "futures 0.3.31", "futures-util", + "metrics", + "metrics-util 0.18.0", "pin-project", "proptest", "rand 0.8.5", diff --git a/lib/vector-stream/Cargo.toml b/lib/vector-stream/Cargo.toml index 45b1e77aaa..682cfe75bc 100644 --- a/lib/vector-stream/Cargo.toml +++ b/lib/vector-stream/Cargo.toml @@ -14,11 +14,14 @@ tokio = { version = "1.43.0", default-features = false, features = ["net"] } tokio-util = { version = "0.7.0", default-features = false, features = ["time"] } tower = { version = "0.4", default-features = false, features = ["util"] } tracing = { version = "0.1.34", default-features = false } +metrics.workspace = true twox-hash = { version = "2.1.0", default-features = false, features = ["xxhash64"] } vector-common = { path = "../vector-common" } vector-core = { path = "../vector-core" } [dev-dependencies] +metrics-util = "0.18" proptest = "1.5" rand = "0.8.5" rand_distr = "0.4.3" +tokio = { version = "1.43.0", features = ["macros", "rt", "time", "test-util"] } diff --git a/lib/vector-stream/src/partitioned_batcher.rs b/lib/vector-stream/src/partitioned_batcher.rs index b0ba86b85b..b494894e85 100644 --- a/lib/vector-stream/src/partitioned_batcher.rs +++ b/lib/vector-stream/src/partitioned_batcher.rs @@ -8,6 +8,7 @@ use std::{ }; use futures::stream::{Fuse, Stream, StreamExt}; +use metrics::{gauge, Gauge}; use pin_project::pin_project; use tokio_util::time::{delay_queue::Key, DelayQueue}; use twox_hash::XxHash64; @@ -201,6 +202,10 @@ where #[pin] /// The stream this `Batcher` wraps stream: Fuse, + /// Gauge tracking the number of open batches + m_open: Gauge, + /// Gauge tracking the number of closed batches waiting to be emitted + m_closed: Gauge, } impl PartitionedBatcher, C, F, B> @@ -221,6 +226,8 @@ where timer: ExpirationQueue::new(timeout), partitioner, stream: stream.fuse(), + m_open: gauge!("open_batches"), + m_closed: gauge!("closed_batches"), } } } @@ -243,6 +250,8 @@ where timer, partitioner, stream: stream.fuse(), + m_open: gauge!("open_batches"), + m_closed: gauge!("closed_batches"), } } } @@ -267,6 +276,7 @@ where let mut this = self.project(); loop { if !this.closed_batches.is_empty() { + this.m_closed.decrement(1.0); return Poll::Ready(this.closed_batches.pop()); } match this.stream.as_mut().poll_next(cx) { @@ -280,6 +290,8 @@ where .remove(&item_key) .expect("batch should exist if it is set to expire"); this.closed_batches.push((item_key, batch.take_batch())); + this.m_open.decrement(1.0); + this.m_closed.increment(1.0); } }, Poll::Ready(None) => { @@ -289,12 +301,15 @@ where // continue looping so the caller can drain them all before // we finish. if !this.batches.is_empty() { + let batch_count = this.batches.len() as f64; this.timer.clear(); this.closed_batches.extend( this.batches .drain() .map(|(key, mut batch)| (key, batch.take_batch())), ); + this.m_open.decrement(batch_count); + this.m_closed.increment(batch_count); continue; } return Poll::Ready(None); @@ -309,6 +324,7 @@ where let batch = (this.state)(); this.batches.insert(item_key.clone(), batch); this.timer.insert(item_key.clone()); + this.m_open.increment(1.0); this.batches .get_mut(&item_key) .expect("batch has just been inserted so should exist") @@ -321,6 +337,7 @@ where // next iteration. this.closed_batches .push((item_key.clone(), batch.take_batch())); + this.m_closed.increment(1.0); // The batch for this partition key was set to // expire, but now it's overflowed and must be @@ -337,6 +354,8 @@ where .push((item_key.clone(), batch.take_batch())); this.batches.remove(&item_key); this.timer.remove(&item_key); + this.m_open.decrement(1.0); + this.m_closed.increment(1.0); } } } @@ -696,4 +715,81 @@ mod test { f(&mut cx) } + + #[tokio::test] + async fn gauge_values_track_batch_counts() { + use metrics_util::debugging::DebuggingRecorder; + + // Install debugging recorder to capture metrics + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + // Use with_local_recorder to avoid global state issues in tests + metrics::with_local_recorder(&recorder, || { + let noop_waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&noop_waker); + + // Create items that will form 3 partitions: keys 1, 2, 3 + // Each partition will get 2 items, triggering batch full at item_limit=2 + let items = vec![1u64, 2, 3, 1, 2, 3]; + let stream = stream::iter(items); + + let partitioner = TestPartitioner { + key_space: NonZeroU8::new(10).unwrap(), + }; + let settings = BatcherSettings::new( + Duration::from_secs(1), + NonZeroUsize::new(100).unwrap(), + NonZeroUsize::new(2).unwrap(), // 2 items per batch triggers full + ); + + let mut batcher = PartitionedBatcher::new(stream, partitioner, move || { + settings.as_byte_size_config() + }); + let mut batcher = Pin::new(&mut batcher); + + // Process items and verify gauge interactions + // Items 1, 2, 3 each create a new open batch (3 open, 0 closed) + // Items 1, 2, 3 (second occurrence) fill batches, moving them to closed + // Then polling returns each closed batch + + let mut batches_received = 0; + loop { + match batcher.as_mut().poll_next(&mut cx) { + Poll::Pending => break, + Poll::Ready(None) => break, + Poll::Ready(Some(_)) => { + batches_received += 1; + } + } + } + + // We should have received 3 batches + assert_eq!(batches_received, 3); + + // Verify gauges were recorded by checking snapshot + let snapshot = snapshotter.snapshot(); + let gauges = snapshot.into_hashmap(); + + // Verify gauge metrics were recorded + let mut found_open = false; + let mut found_closed = false; + + for (composite_key, _) in gauges.iter() { + let name = composite_key.key().name(); + if name == "open_batches" { + found_open = true; + } + if name == "closed_batches" { + found_closed = true; + } + } + + assert!(found_open, "open_batches gauge should have been recorded"); + assert!( + found_closed, + "closed_batches gauge should have been recorded" + ); + }); + } } diff --git a/src/internal_events/splunk_hec.rs b/src/internal_events/splunk_hec.rs index 4ff5838cdc..f0cabd12bb 100644 --- a/src/internal_events/splunk_hec.rs +++ b/src/internal_events/splunk_hec.rs @@ -198,6 +198,27 @@ mod sink { ); } } + + pub struct SplunkBatchHeaderValueInvalid<'a> { + pub header_name: &'a str, + } + + impl InternalEvent for SplunkBatchHeaderValueInvalid<'_> { + fn emit(self) { + warn!( + message = "Batch header value contains invalid characters. Skipping header.", + header_name = %self.header_name, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "invalid_batch_header_value", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + } + } } #[cfg(feature = "sources-splunk_hec")] diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index fbde5622ab..065f6e0458 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -26,7 +26,7 @@ use crate::{ template::Template, tls::TlsConfig, }; -use crate::sinks::splunk_hec::logs::config::TimestampConfiguration; +use crate::sinks::splunk_hec::logs::config::{BatchHeaders, TimestampConfiguration}; pub(super) const HOST: &str = "https://cloud.humio.com"; @@ -218,7 +218,8 @@ impl HumioLogsConfig { format: TimestampFormat::Native, timestamp_nanos_key: self.timestamp_nanos_key.clone(), preserve_timestamp_key: false, - }) + }), + batch_headers: BatchHeaders::default(), } } } diff --git a/src/sinks/splunk_hec/common/acknowledgements.rs b/src/sinks/splunk_hec/common/acknowledgements.rs index 20dedfc0c2..26fc036554 100644 --- a/src/sinks/splunk_hec/common/acknowledgements.rs +++ b/src/sinks/splunk_hec/common/acknowledgements.rs @@ -225,6 +225,7 @@ impl HecAckClient { None, MetadataFields::default(), false, + vec![], ) .map_err(|_| HecAckApiError::ClientBuildRequest)?; diff --git a/src/sinks/splunk_hec/common/request.rs b/src/sinks/splunk_hec/common/request.rs index f644107e46..2abf64d2a6 100644 --- a/src/sinks/splunk_hec/common/request.rs +++ b/src/sinks/splunk_hec/common/request.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use bytes::Bytes; +use http::{HeaderName, HeaderValue}; use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata}; use vector_lib::{ event::{EventFinalizers, Finalizable}, @@ -19,6 +20,7 @@ pub struct HecRequest { pub source: Option, pub sourcetype: Option, pub host: Option, + pub headers: Vec<(HeaderName, HeaderValue)>, } impl ByteSizeOf for HecRequest { diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index b44bd1f56e..dc22cb8848 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -223,6 +223,7 @@ impl HttpRequestBuilder { passthrough_token: Option>, metadata_fields: MetadataFields, auto_extract_timestamp: bool, + dyn_headers: Vec<(HeaderName, HeaderValue)>, ) -> Result, crate::Error> { let uri = match self.endpoint_target { EndpointTarget::Raw => { @@ -267,6 +268,11 @@ impl HttpRequestBuilder { crate::Error::from("Failed to access headers in http::Request builder") })?; + for (header, value) in dyn_headers { + headers.insert(header, value); + } + + // Static headers from request config take precedence over dynamic headers for (header, value) in self.headers.iter() { headers.insert(header, value.clone()); } @@ -376,6 +382,7 @@ mod tests { source: None, sourcetype: None, host: None, + headers: vec![], } } diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index ba729b859a..99cbcd75cd 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -77,6 +77,7 @@ pub fn build_http_batch_service( host: req.host, }, auto_extract_timestamp, + req.headers, ) }); future @@ -296,6 +297,7 @@ mod tests { None, MetadataFields::default(), false, + vec![], ) .unwrap(); @@ -340,6 +342,7 @@ mod tests { None, MetadataFields::default(), false, + vec![], ) .unwrap(); @@ -387,6 +390,7 @@ mod tests { None, MetadataFields::default(), false, + vec![], ) .unwrap_err(); assert_eq!(err.to_string(), "URI parse error: invalid format") @@ -442,6 +446,107 @@ mod tests { ); } } + + #[tokio::test] + async fn test_build_request_with_batch_headers() { + use http::HeaderName; + + let endpoint = "http://localhost:8888"; + let token = "token"; + let compression = Compression::None; + let events = Bytes::from("events"); + let http_request_builder = HttpRequestBuilder::new( + String::from(endpoint), + EndpointTarget::default(), + String::from(token), + compression, + IndexMap::default() + ); + + // Create batch headers + let batch_headers = vec![ + (HeaderName::from_static("x-tenant"), HeaderValue::from_static("acme")), + (HeaderName::from_static("x-region"), HeaderValue::from_static("us-east")), + ]; + + let request = http_request_builder + .build_request( + events.clone(), + "/services/collector/event", + None, + MetadataFields::default(), + false, + batch_headers, + ) + .unwrap(); + + // Verify batch headers are present + assert_eq!( + request.headers().get("X-Tenant"), + Some(&HeaderValue::from_static("acme")) + ); + assert_eq!( + request.headers().get("X-Region"), + Some(&HeaderValue::from_static("us-east")) + ); + + // Standard headers should still be present + assert_eq!( + request.headers().get("Content-Type"), + Some(&HeaderValue::from_static("application/json")) + ); + assert_eq!( + request.headers().get("Authorization"), + Some(&HeaderValue::from_static("Splunk token")) + ); + } + + #[tokio::test] + async fn test_build_request_static_headers_override_batch_headers() { + use http::HeaderName; + + let endpoint = "http://localhost:8888"; + let token = "token"; + let compression = Compression::None; + let events = Bytes::from("events"); + + // Create static headers that will override batch headers + let mut static_headers = IndexMap::new(); + static_headers.insert( + HeaderName::from_static("x-override"), + HeaderValue::from_static("static-value"), + ); + + let http_request_builder = HttpRequestBuilder::new( + String::from(endpoint), + EndpointTarget::default(), + String::from(token), + compression, + static_headers, + ); + + // Batch header with same name should be overridden + let batch_headers = vec![ + (HeaderName::from_static("x-override"), HeaderValue::from_static("dynamic-value")), + ]; + + let request = http_request_builder + .build_request( + events.clone(), + "/services/collector/event", + None, + MetadataFields::default(), + false, + batch_headers, + ) + .unwrap(); + + // Static header should override the batch header + assert_eq!( + request.headers().get("X-Override"), + Some(&HeaderValue::from_static("static-value")) + ); + } } #[cfg(all(test, feature = "splunk-integration-tests"))] diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 57ad1d6494..e5d96d3807 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -1,9 +1,10 @@ use std::sync::Arc; +use http::HeaderName; use vector_lib::{ codecs::TextSerializerConfig, config::TimestampFormat, lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath}, sensitive_string::SensitiveString }; - +use serde_with::serde_as; use crate::{ http::HttpClient, sinks::{ @@ -20,6 +21,49 @@ use crate::{ use crate::sinks::util::http::{validate_headers, RequestConfig}; use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink}; +/// A batch http-header to be sent to Splunk HEC. +#[serde_as] +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct BatchHeader { + /// The name of the header. + #[configurable(metadata(docs::examples = "X-Priority"))] + pub name: String, + + /// The value of the header. + #[configurable(metadata(docs::examples = "pri"))] + pub value: ConfigValuePath, +} + +/// A batch header to be sent to Splunk HEC, represented as a key-value pair. +#[serde_as] +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct BatchHeaders(Vec); + +impl From> for BatchHeaders { + fn from(headers: Vec) -> Self { + BatchHeaders(headers) + } +} + +impl BatchHeaders { + /// Validates header names and converts to a vector of (HeaderName, ConfigValuePath) pairs. + /// Returns an error if any header name is invalid. + pub fn into_validated(self) -> Result, crate::Error> { + self.0 + .into_iter() + .map(|header| { + HeaderName::try_from(header.name.as_str()) + .map(|name| (name, header.value)) + .map_err(|e| format!("Invalid batch header name '{}': {}", header.name, e).into()) + }) + .collect() + } +} + /// Configuration for the `splunk_hec_logs` sink. #[configurable_component(sink( "splunk_hec_logs", @@ -105,6 +149,12 @@ pub struct HecLogsSinkConfig { #[serde(default)] pub batch: BatchConfig, + /// Headers to be included in each batch request to Splunk HEC. + /// Headers in request-config take precedence over these batch headers if there are any conflicts. + #[configurable(derived)] + #[serde(default)] + pub batch_headers: BatchHeaders, + #[configurable(derived)] #[serde(default)] pub request: RequestConfig, @@ -202,6 +252,7 @@ impl GenerateConfig for HecLogsSinkConfig { encoding: TextSerializerConfig::default().into(), compression: Compression::default(), batch: BatchConfig::default(), + batch_headers: BatchHeaders::default(), request: RequestConfig::default(), tls: None, acknowledgements: Default::default(), @@ -266,6 +317,7 @@ impl HecLogsSinkConfig { let request_settings = self.request.tower.into_settings(); let headers = validate_headers(&self.request.headers)?; + let batch_headers = self.batch_headers.clone().into_validated()?; let http_request_builder = Arc::new(HttpRequestBuilder::new( self.endpoint.clone(), @@ -309,6 +361,7 @@ impl HecLogsSinkConfig { endpoint_target: self.endpoint_target, auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(), timestamp_configuration: self.timestamp_configuration.clone(), + batch_headers, shutdown: cx.shutdown.clone(), }; @@ -331,6 +384,82 @@ mod tests { crate::test_util::test_generate_config::(); } + #[test] + fn test_config_serde() { + let config_toml = r#" + default_token = "my-token" + endpoint = "https://hec.example.com:8088" + host_key = "hostname" + indexed_fields = ["field1", "field2"] + index = "{{ index_field }}" + sourcetype = "{{ sourcetype_field }}" + source = "/var/log/app.log" + compression = "gzip" + endpoint_target = "raw" + auto_extract_timestamp = true + path = "/custom/path" + + [encoding] + codec = "json" + except_fields = ["secret_field"] + + [batch] + max_bytes = 1048576 + max_events = 100 + timeout_secs = 5 + + [[batch_headers]] + name = "X-Tenant" + value = "tenant" + + [[batch_headers]] + name = "X-Priority" + value = "priority" + + [request] + timeout_secs = 30 + retry_attempts = 3 + + [request.headers] + X-Custom = "custom-value" + + [acknowledgements] + indexer_acknowledgements_enabled = true + max_pending_acks = 1000 + + [timestamp_configuration] + timestamp_key = "ts" + format = "native" + preserve_timestamp_key = true + "#; + + let config: HecLogsSinkConfig = toml::from_str(config_toml) + .expect("Failed to parse config"); + + assert_eq!(config.default_token.inner(), "my-token"); + assert_eq!(config.endpoint, "https://hec.example.com:8088"); + assert_eq!(config.endpoint_target, EndpointTarget::Raw); + assert_eq!(config.auto_extract_timestamp, Some(true)); + assert_eq!(config.path, Some("/custom/path".to_string())); + assert_eq!(config.indexed_fields.len(), 2); + assert!(config.index.is_some()); + assert!(config.sourcetype.is_some()); + assert_eq!(config.source, Some(crate::template::Template::try_from("/var/log/app.log").unwrap())); + assert!(config.host_key.is_some()); + + let batch_headers = config.batch_headers.into_validated().expect("batch_headers should be valid"); + assert_eq!(batch_headers.len(), 2); + assert_eq!(batch_headers[0].0.as_str(), "x-tenant"); + assert_eq!(batch_headers[1].0.as_str(), "x-priority"); + + assert!(config.acknowledgements.indexer_acknowledgements_enabled); + assert_eq!(config.acknowledgements.max_pending_acks.get(), 1000); + + let ts_config = config.timestamp_configuration.expect("timestamp_configuration should be set"); + assert!(ts_config.timestamp_key.is_some()); + assert!(ts_config.preserve_timestamp_key); + } + impl ValidatableComponent for HecLogsSinkConfig { fn validation_configuration() -> ValidationConfiguration { let endpoint = "http://127.0.0.1:9001".to_string(); @@ -372,6 +501,7 @@ mod tests { indexer_acknowledgements_enabled: false, ..Default::default() }, + batch_headers: BatchHeaders::default(), path: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, diff --git a/src/sinks/splunk_hec/logs/request_builder.rs b/src/sinks/splunk_hec/logs/request_builder.rs index 91d37d7b5e..67f594fad6 100644 --- a/src/sinks/splunk_hec/logs/request_builder.rs +++ b/src/sinks/splunk_hec/logs/request_builder.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use bytes::Bytes; +use http::{HeaderName, HeaderValue}; use vector_lib::event::{EventFinalizers, Finalizable}; use vector_lib::request_metadata::RequestMetadata; @@ -29,6 +30,7 @@ pub struct HecRequestMetadata { sourcetype: Option, index: Option, host: Option, + headers: Vec<(HeaderName, Option)>, } impl RequestBuilder<(Option, Vec)> for HecLogsRequestBuilder { @@ -65,6 +67,10 @@ impl RequestBuilder<(Option, Vec)> for HecLogsRe sourcetype: partition.as_mut().and_then(|p| p.sourcetype.take()), index: partition.as_mut().and_then(|p| p.index.take()), host: partition.as_mut().and_then(|p| p.host.take()), + headers: partition + .as_mut() + .map(|p| std::mem::take(&mut p.headers)) + .unwrap_or_default(), }, builder, events, @@ -77,6 +83,12 @@ impl RequestBuilder<(Option, Vec)> for HecLogsRe metadata: RequestMetadata, payload: EncodeResult, ) -> Self::Request { + let headers = hec_metadata + .headers + .into_iter() + .filter_map(|(k, v)| v.map(|v| (k, v))) + .collect(); + HecRequest { body: payload.into_payload(), finalizers: hec_metadata.finalizers, @@ -85,6 +97,7 @@ impl RequestBuilder<(Option, Vec)> for HecLogsRe sourcetype: hec_metadata.sourcetype, index: hec_metadata.index, host: hec_metadata.host, + headers, metadata, } } diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index e470d64f8b..39d176a011 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -1,5 +1,7 @@ use std::{fmt::{self, Display}, sync::Arc}; +use http::{HeaderName, HeaderValue}; + use super::request_builder::HecLogsRequestBuilder; use crate::{ internal_events::SplunkEventTimestampInvalidType, @@ -17,7 +19,7 @@ use futures::future::Either; use stream_cancel::Tripwire; use vector_lib::{ config::{log_schema, LogNamespace, TimestampFormat, TimestampResolutionError}, - lookup::{event_path, lookup_v2::OptionalTargetPath, OwnedValuePath, PathPrefix}, + lookup::{event_path, lookup_v2::{ConfigValuePath, OptionalTargetPath}, OwnedValuePath, PathPrefix}, schema::meaning, }; use vrl::path::OwnedTargetPath; @@ -38,6 +40,7 @@ pub struct HecLogsSink { pub endpoint_target: EndpointTarget, pub auto_extract_timestamp: bool, pub timestamp_configuration: Option, + pub batch_headers: Vec<(HeaderName, ConfigValuePath)>, pub shutdown: Tripwire, } @@ -72,6 +75,7 @@ where }; let batch_settings = self.batch_settings; + let batch_headers = self.batch_headers.clone(); let run = input .map(move |event| process_log(event, &data)) .batched_partitioned( @@ -83,9 +87,10 @@ where self.source.clone(), self.index.clone(), self.host_key.clone(), + batch_headers, ) } else { - EventPartitioner::new(None, None, None, None) + EventPartitioner::new(None, None, None, None, batch_headers) }, move || batch_settings.as_byte_size_config(), ) @@ -139,6 +144,7 @@ pub(super) struct Partitioned { pub(super) sourcetype: Option, pub(super) index: Option, pub(super) host: Option, + pub(super) headers: Vec<(HeaderName, Option)>, } #[derive(Default)] @@ -147,20 +153,23 @@ struct EventPartitioner { pub source: Option