From 9331626f64230f301938bc160317cd5888a651bc Mon Sep 17 00:00:00 2001 From: Janmejay Singh Date: Wed, 15 Apr 2026 13:17:43 +0530 Subject: [PATCH 1/6] Feat[OBE-9476] - Add support for batch-headers in HEC sink --- src/internal_events/splunk_hec.rs | 21 + src/sinks/humio/logs.rs | 5 +- .../splunk_hec/common/acknowledgements.rs | 1 + src/sinks/splunk_hec/common/request.rs | 2 + src/sinks/splunk_hec/common/service.rs | 7 + src/sinks/splunk_hec/common/util.rs | 105 ++++ src/sinks/splunk_hec/logs/config.rs | 56 +- src/sinks/splunk_hec/logs/request_builder.rs | 13 + src/sinks/splunk_hec/logs/sink.rs | 156 ++++- src/sinks/splunk_hec/logs/tests.rs | 534 +++++++++++++++++- .../splunk_hec/metrics/request_builder.rs | 1 + src/sources/splunk_hec/mod.rs | 1 + 12 files changed, 893 insertions(+), 9 deletions(-) diff --git a/src/internal_events/splunk_hec.rs b/src/internal_events/splunk_hec.rs index 4ff5838cdc..f0cabd12bb 100644 --- a/src/internal_events/splunk_hec.rs +++ b/src/internal_events/splunk_hec.rs @@ -198,6 +198,27 @@ mod sink { ); } } + + pub struct SplunkBatchHeaderValueInvalid<'a> { + pub header_name: &'a str, + } + + impl InternalEvent for SplunkBatchHeaderValueInvalid<'_> { + fn emit(self) { + warn!( + message = "Batch header value contains invalid characters. Skipping header.", + header_name = %self.header_name, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "invalid_batch_header_value", + "error_type" => error_type::PARSER_FAILED, + "stage" => error_stage::PROCESSING, + ) + .increment(1); + } + } } #[cfg(feature = "sources-splunk_hec")] diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index fbde5622ab..065f6e0458 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -26,7 +26,7 @@ use crate::{ template::Template, tls::TlsConfig, }; -use crate::sinks::splunk_hec::logs::config::TimestampConfiguration; +use crate::sinks::splunk_hec::logs::config::{BatchHeaders, TimestampConfiguration}; pub(super) const HOST: &str = "https://cloud.humio.com"; @@ -218,7 +218,8 @@ impl HumioLogsConfig { format: TimestampFormat::Native, timestamp_nanos_key: self.timestamp_nanos_key.clone(), preserve_timestamp_key: false, - }) + }), + batch_headers: BatchHeaders::default(), } } } diff --git a/src/sinks/splunk_hec/common/acknowledgements.rs b/src/sinks/splunk_hec/common/acknowledgements.rs index 20dedfc0c2..26fc036554 100644 --- a/src/sinks/splunk_hec/common/acknowledgements.rs +++ b/src/sinks/splunk_hec/common/acknowledgements.rs @@ -225,6 +225,7 @@ impl HecAckClient { None, MetadataFields::default(), false, + vec![], ) .map_err(|_| HecAckApiError::ClientBuildRequest)?; diff --git a/src/sinks/splunk_hec/common/request.rs b/src/sinks/splunk_hec/common/request.rs index f644107e46..2abf64d2a6 100644 --- a/src/sinks/splunk_hec/common/request.rs +++ b/src/sinks/splunk_hec/common/request.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use bytes::Bytes; +use http::{HeaderName, HeaderValue}; use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata}; use vector_lib::{ event::{EventFinalizers, Finalizable}, @@ -19,6 +20,7 @@ pub struct HecRequest { pub source: Option, pub sourcetype: Option, pub host: Option, + pub headers: Vec<(HeaderName, HeaderValue)>, } impl ByteSizeOf for HecRequest { diff --git a/src/sinks/splunk_hec/common/service.rs b/src/sinks/splunk_hec/common/service.rs index b44bd1f56e..dc22cb8848 100644 --- a/src/sinks/splunk_hec/common/service.rs +++ b/src/sinks/splunk_hec/common/service.rs @@ -223,6 +223,7 @@ impl HttpRequestBuilder { passthrough_token: Option>, metadata_fields: MetadataFields, auto_extract_timestamp: bool, + dyn_headers: Vec<(HeaderName, HeaderValue)>, ) -> Result, crate::Error> { let uri = match self.endpoint_target { EndpointTarget::Raw => { @@ -267,6 +268,11 @@ impl HttpRequestBuilder { crate::Error::from("Failed to access headers in http::Request builder") })?; + for (header, value) in dyn_headers { + headers.insert(header, value); + } + + // Static headers from request config take precedence over dynamic headers for (header, value) in self.headers.iter() { headers.insert(header, value.clone()); } @@ -376,6 +382,7 @@ mod tests { source: None, sourcetype: None, host: None, + headers: vec![], } } diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index ba729b859a..99cbcd75cd 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -77,6 +77,7 @@ pub fn build_http_batch_service( host: req.host, }, auto_extract_timestamp, + req.headers, ) }); future @@ -296,6 +297,7 @@ mod tests { None, MetadataFields::default(), false, + vec![], ) .unwrap(); @@ -340,6 +342,7 @@ mod tests { None, MetadataFields::default(), false, + vec![], ) .unwrap(); @@ -387,6 +390,7 @@ mod tests { None, MetadataFields::default(), false, + vec![], ) .unwrap_err(); assert_eq!(err.to_string(), "URI parse error: invalid format") @@ -442,6 +446,107 @@ mod tests { ); } } + + #[tokio::test] + async fn test_build_request_with_batch_headers() { + use http::HeaderName; + + let endpoint = "http://localhost:8888"; + let token = "token"; + let compression = Compression::None; + let events = Bytes::from("events"); + let http_request_builder = HttpRequestBuilder::new( + String::from(endpoint), + EndpointTarget::default(), + String::from(token), + compression, + IndexMap::default() + ); + + // Create batch headers + let batch_headers = vec![ + (HeaderName::from_static("x-tenant"), HeaderValue::from_static("acme")), + (HeaderName::from_static("x-region"), HeaderValue::from_static("us-east")), + ]; + + let request = http_request_builder + .build_request( + events.clone(), + "/services/collector/event", + None, + MetadataFields::default(), + false, + batch_headers, + ) + .unwrap(); + + // Verify batch headers are present + assert_eq!( + request.headers().get("X-Tenant"), + Some(&HeaderValue::from_static("acme")) + ); + assert_eq!( + request.headers().get("X-Region"), + Some(&HeaderValue::from_static("us-east")) + ); + + // Standard headers should still be present + assert_eq!( + request.headers().get("Content-Type"), + Some(&HeaderValue::from_static("application/json")) + ); + assert_eq!( + request.headers().get("Authorization"), + Some(&HeaderValue::from_static("Splunk token")) + ); + } + + #[tokio::test] + async fn test_build_request_static_headers_override_batch_headers() { + use http::HeaderName; + + let endpoint = "http://localhost:8888"; + let token = "token"; + let compression = Compression::None; + let events = Bytes::from("events"); + + // Create static headers that will override batch headers + let mut static_headers = IndexMap::new(); + static_headers.insert( + HeaderName::from_static("x-override"), + HeaderValue::from_static("static-value"), + ); + + let http_request_builder = HttpRequestBuilder::new( + String::from(endpoint), + EndpointTarget::default(), + String::from(token), + compression, + static_headers, + ); + + // Batch header with same name should be overridden + let batch_headers = vec![ + (HeaderName::from_static("x-override"), HeaderValue::from_static("dynamic-value")), + ]; + + let request = http_request_builder + .build_request( + events.clone(), + "/services/collector/event", + None, + MetadataFields::default(), + false, + batch_headers, + ) + .unwrap(); + + // Static header should override the batch header + assert_eq!( + request.headers().get("X-Override"), + Some(&HeaderValue::from_static("static-value")) + ); + } } #[cfg(all(test, feature = "splunk-integration-tests"))] diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 57ad1d6494..047520e461 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -1,9 +1,10 @@ use std::sync::Arc; +use http::HeaderName; use vector_lib::{ codecs::TextSerializerConfig, config::TimestampFormat, lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath}, sensitive_string::SensitiveString }; - +use serde_with::serde_as; use crate::{ http::HttpClient, sinks::{ @@ -20,6 +21,49 @@ use crate::{ use crate::sinks::util::http::{validate_headers, RequestConfig}; use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink}; +/// A batch http-header to be sent to Splunk HEC. +#[serde_as] +#[configurable_component] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct BatchHeader { + /// The name of the header. + #[configurable(metadata(docs::examples = "Priority"))] + pub name: String, + + /// The value of the header. + #[configurable(metadata(docs::examples = "pri"))] + pub value: ConfigValuePath, +} + +/// A batch header to be sent to Splunk HEC, represented as a key-value pair. +#[serde_as] +#[configurable_component] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct BatchHeaders(Vec); + +impl From> for BatchHeaders { + fn from(headers: Vec) -> Self { + BatchHeaders(headers) + } +} + +impl BatchHeaders { + /// Validates header names and converts to a vector of (HeaderName, ConfigValuePath) pairs. + /// Returns an error if any header name is invalid. + pub fn into_validated(self) -> Result, crate::Error> { + self.0 + .into_iter() + .map(|header| { + HeaderName::try_from(header.name.as_str()) + .map(|name| (name, header.value)) + .map_err(|e| format!("Invalid batch header name '{}': {}", header.name, e).into()) + }) + .collect() + } +} + /// Configuration for the `splunk_hec_logs` sink. #[configurable_component(sink( "splunk_hec_logs", @@ -105,6 +149,12 @@ pub struct HecLogsSinkConfig { #[serde(default)] pub batch: BatchConfig, + /// Headers to be included in each batch request to Splunk HEC. + /// Headers in request-config take precedence over these batch headers if there are any conflicts. + #[configurable(derived)] + #[serde(default)] + pub batch_headers: BatchHeaders, + #[configurable(derived)] #[serde(default)] pub request: RequestConfig, @@ -202,6 +252,7 @@ impl GenerateConfig for HecLogsSinkConfig { encoding: TextSerializerConfig::default().into(), compression: Compression::default(), batch: BatchConfig::default(), + batch_headers: BatchHeaders::default(), request: RequestConfig::default(), tls: None, acknowledgements: Default::default(), @@ -266,6 +317,7 @@ impl HecLogsSinkConfig { let request_settings = self.request.tower.into_settings(); let headers = validate_headers(&self.request.headers)?; + let batch_headers = self.batch_headers.clone().into_validated()?; let http_request_builder = Arc::new(HttpRequestBuilder::new( self.endpoint.clone(), @@ -309,6 +361,7 @@ impl HecLogsSinkConfig { endpoint_target: self.endpoint_target, auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(), timestamp_configuration: self.timestamp_configuration.clone(), + batch_headers, shutdown: cx.shutdown.clone(), }; @@ -372,6 +425,7 @@ mod tests { indexer_acknowledgements_enabled: false, ..Default::default() }, + batch_headers: BatchHeaders::default(), path: None, auto_extract_timestamp: None, endpoint_target: EndpointTarget::Raw, diff --git a/src/sinks/splunk_hec/logs/request_builder.rs b/src/sinks/splunk_hec/logs/request_builder.rs index 91d37d7b5e..67f594fad6 100644 --- a/src/sinks/splunk_hec/logs/request_builder.rs +++ b/src/sinks/splunk_hec/logs/request_builder.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use bytes::Bytes; +use http::{HeaderName, HeaderValue}; use vector_lib::event::{EventFinalizers, Finalizable}; use vector_lib::request_metadata::RequestMetadata; @@ -29,6 +30,7 @@ pub struct HecRequestMetadata { sourcetype: Option, index: Option, host: Option, + headers: Vec<(HeaderName, Option)>, } impl RequestBuilder<(Option, Vec)> for HecLogsRequestBuilder { @@ -65,6 +67,10 @@ impl RequestBuilder<(Option, Vec)> for HecLogsRe sourcetype: partition.as_mut().and_then(|p| p.sourcetype.take()), index: partition.as_mut().and_then(|p| p.index.take()), host: partition.as_mut().and_then(|p| p.host.take()), + headers: partition + .as_mut() + .map(|p| std::mem::take(&mut p.headers)) + .unwrap_or_default(), }, builder, events, @@ -77,6 +83,12 @@ impl RequestBuilder<(Option, Vec)> for HecLogsRe metadata: RequestMetadata, payload: EncodeResult, ) -> Self::Request { + let headers = hec_metadata + .headers + .into_iter() + .filter_map(|(k, v)| v.map(|v| (k, v))) + .collect(); + HecRequest { body: payload.into_payload(), finalizers: hec_metadata.finalizers, @@ -85,6 +97,7 @@ impl RequestBuilder<(Option, Vec)> for HecLogsRe sourcetype: hec_metadata.sourcetype, index: hec_metadata.index, host: hec_metadata.host, + headers, metadata, } } diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index e470d64f8b..39d176a011 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -1,5 +1,7 @@ use std::{fmt::{self, Display}, sync::Arc}; +use http::{HeaderName, HeaderValue}; + use super::request_builder::HecLogsRequestBuilder; use crate::{ internal_events::SplunkEventTimestampInvalidType, @@ -17,7 +19,7 @@ use futures::future::Either; use stream_cancel::Tripwire; use vector_lib::{ config::{log_schema, LogNamespace, TimestampFormat, TimestampResolutionError}, - lookup::{event_path, lookup_v2::OptionalTargetPath, OwnedValuePath, PathPrefix}, + lookup::{event_path, lookup_v2::{ConfigValuePath, OptionalTargetPath}, OwnedValuePath, PathPrefix}, schema::meaning, }; use vrl::path::OwnedTargetPath; @@ -38,6 +40,7 @@ pub struct HecLogsSink { pub endpoint_target: EndpointTarget, pub auto_extract_timestamp: bool, pub timestamp_configuration: Option, + pub batch_headers: Vec<(HeaderName, ConfigValuePath)>, pub shutdown: Tripwire, } @@ -72,6 +75,7 @@ where }; let batch_settings = self.batch_settings; + let batch_headers = self.batch_headers.clone(); let run = input .map(move |event| process_log(event, &data)) .batched_partitioned( @@ -83,9 +87,10 @@ where self.source.clone(), self.index.clone(), self.host_key.clone(), + batch_headers, ) } else { - EventPartitioner::new(None, None, None, None) + EventPartitioner::new(None, None, None, None, batch_headers) }, move || batch_settings.as_byte_size_config(), ) @@ -139,6 +144,7 @@ pub(super) struct Partitioned { pub(super) sourcetype: Option, pub(super) index: Option, pub(super) host: Option, + pub(super) headers: Vec<(HeaderName, Option)>, } #[derive(Default)] @@ -147,20 +153,23 @@ struct EventPartitioner { pub source: Option