Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions lib/vector-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ tokio = { version = "1.43.0", default-features = false, features = ["net"] }
tokio-util = { version = "0.7.0", default-features = false, features = ["time"] }
tower = { version = "0.4", default-features = false, features = ["util"] }
tracing = { version = "0.1.34", default-features = false }
metrics.workspace = true
twox-hash = { version = "2.1.0", default-features = false, features = ["xxhash64"] }
vector-common = { path = "../vector-common" }
vector-core = { path = "../vector-core" }

[dev-dependencies]
metrics-util = "0.18"
proptest = "1.5"
rand = "0.8.5"
rand_distr = "0.4.3"
tokio = { version = "1.43.0", features = ["macros", "rt", "time", "test-util"] }
Comment thread
janmejay-s1 marked this conversation as resolved.
96 changes: 96 additions & 0 deletions lib/vector-stream/src/partitioned_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use futures::stream::{Fuse, Stream, StreamExt};
use metrics::{gauge, Gauge};
use pin_project::pin_project;
use tokio_util::time::{delay_queue::Key, DelayQueue};
use twox_hash::XxHash64;
Expand Down Expand Up @@ -201,6 +202,10 @@ where
#[pin]
/// The stream this `Batcher` wraps
stream: Fuse<St>,
/// Gauge tracking the number of open batches
m_open: Gauge,
/// Gauge tracking the number of closed batches waiting to be emitted
m_closed: Gauge,
}

impl<St, Prt, C, F, B> PartitionedBatcher<St, Prt, ExpirationQueue<Prt::Key>, C, F, B>
Expand All @@ -221,6 +226,8 @@ where
timer: ExpirationQueue::new(timeout),
partitioner,
stream: stream.fuse(),
m_open: gauge!("open_batches"),
m_closed: gauge!("closed_batches"),
}
}
}
Expand All @@ -243,6 +250,8 @@ where
timer,
partitioner,
stream: stream.fuse(),
m_open: gauge!("open_batches"),
m_closed: gauge!("closed_batches"),
}
}
}
Expand All @@ -267,6 +276,7 @@ where
let mut this = self.project();
loop {
if !this.closed_batches.is_empty() {
this.m_closed.decrement(1.0);
return Poll::Ready(this.closed_batches.pop());
}
match this.stream.as_mut().poll_next(cx) {
Expand All @@ -280,6 +290,8 @@ where
.remove(&item_key)
.expect("batch should exist if it is set to expire");
this.closed_batches.push((item_key, batch.take_batch()));
this.m_open.decrement(1.0);
this.m_closed.increment(1.0);
}
},
Poll::Ready(None) => {
Expand All @@ -289,12 +301,15 @@ where
// continue looping so the caller can drain them all before
// we finish.
if !this.batches.is_empty() {
let batch_count = this.batches.len() as f64;
this.timer.clear();
this.closed_batches.extend(
this.batches
.drain()
.map(|(key, mut batch)| (key, batch.take_batch())),
);
this.m_open.decrement(batch_count);
this.m_closed.increment(batch_count);
continue;
}
return Poll::Ready(None);
Expand All @@ -309,6 +324,7 @@ where
let batch = (this.state)();
this.batches.insert(item_key.clone(), batch);
this.timer.insert(item_key.clone());
this.m_open.increment(1.0);
this.batches
.get_mut(&item_key)
.expect("batch has just been inserted so should exist")
Expand All @@ -321,6 +337,7 @@ where
// next iteration.
this.closed_batches
.push((item_key.clone(), batch.take_batch()));
this.m_closed.increment(1.0);

// The batch for this partition key was set to
// expire, but now it's overflowed and must be
Expand All @@ -337,6 +354,8 @@ where
.push((item_key.clone(), batch.take_batch()));
this.batches.remove(&item_key);
this.timer.remove(&item_key);
this.m_open.decrement(1.0);
this.m_closed.increment(1.0);
}
}
}
Expand Down Expand Up @@ -696,4 +715,81 @@ mod test {

f(&mut cx)
}

#[tokio::test]
async fn gauge_values_track_batch_counts() {
use metrics_util::debugging::DebuggingRecorder;

// Install debugging recorder to capture metrics
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

// Use with_local_recorder to avoid global state issues in tests
metrics::with_local_recorder(&recorder, || {
let noop_waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&noop_waker);

// Create items that will form 3 partitions: keys 1, 2, 3
// Each partition will get 2 items, triggering batch full at item_limit=2
let items = vec![1u64, 2, 3, 1, 2, 3];
let stream = stream::iter(items);

let partitioner = TestPartitioner {
key_space: NonZeroU8::new(10).unwrap(),
};
let settings = BatcherSettings::new(
Duration::from_secs(1),
NonZeroUsize::new(100).unwrap(),
NonZeroUsize::new(2).unwrap(), // 2 items per batch triggers full
);

let mut batcher = PartitionedBatcher::new(stream, partitioner, move || {
settings.as_byte_size_config()
});
let mut batcher = Pin::new(&mut batcher);

// Process items and verify gauge interactions
// Items 1, 2, 3 each create a new open batch (3 open, 0 closed)
// Items 1, 2, 3 (second occurrence) fill batches, moving them to closed
// Then polling returns each closed batch

let mut batches_received = 0;
loop {
match batcher.as_mut().poll_next(&mut cx) {
Poll::Pending => break,
Poll::Ready(None) => break,
Poll::Ready(Some(_)) => {
batches_received += 1;
}
}
}

// We should have received 3 batches
assert_eq!(batches_received, 3);

// Verify gauges were recorded by checking snapshot
let snapshot = snapshotter.snapshot();
let gauges = snapshot.into_hashmap();

// Verify gauge metrics were recorded
let mut found_open = false;
let mut found_closed = false;

for (composite_key, _) in gauges.iter() {
let name = composite_key.key().name();
if name == "open_batches" {
found_open = true;
}
if name == "closed_batches" {
found_closed = true;
}
}

assert!(found_open, "open_batches gauge should have been recorded");
assert!(
found_closed,
"closed_batches gauge should have been recorded"
);
});
}
}
21 changes: 21 additions & 0 deletions src/internal_events/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,27 @@ mod sink {
);
}
}

pub struct SplunkBatchHeaderValueInvalid<'a> {
pub header_name: &'a str,
}

impl InternalEvent for SplunkBatchHeaderValueInvalid<'_> {
fn emit(self) {
warn!(
message = "Batch header value contains invalid characters. Skipping header.",
header_name = %self.header_name,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
"error_code" => "invalid_batch_header_value",
"error_type" => error_type::PARSER_FAILED,
"stage" => error_stage::PROCESSING,
)
.increment(1);
}
}
}

#[cfg(feature = "sources-splunk_hec")]
Expand Down
5 changes: 3 additions & 2 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
template::Template,
tls::TlsConfig,
};
use crate::sinks::splunk_hec::logs::config::TimestampConfiguration;
use crate::sinks::splunk_hec::logs::config::{BatchHeaders, TimestampConfiguration};

pub(super) const HOST: &str = "https://cloud.humio.com";

Expand Down Expand Up @@ -218,7 +218,8 @@ impl HumioLogsConfig {
format: TimestampFormat::Native,
timestamp_nanos_key: self.timestamp_nanos_key.clone(),
preserve_timestamp_key: false,
})
}),
batch_headers: BatchHeaders::default(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/splunk_hec/common/acknowledgements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ impl HecAckClient {
None,
MetadataFields::default(),
false,
vec![],
)
.map_err(|_| HecAckApiError::ClientBuildRequest)?;

Expand Down
2 changes: 2 additions & 0 deletions src/sinks/splunk_hec/common/request.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use bytes::Bytes;
use http::{HeaderName, HeaderValue};
use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
use vector_lib::{
event::{EventFinalizers, Finalizable},
Expand All @@ -19,6 +20,7 @@ pub struct HecRequest {
pub source: Option<String>,
pub sourcetype: Option<String>,
pub host: Option<String>,
pub headers: Vec<(HeaderName, HeaderValue)>,
}

impl ByteSizeOf for HecRequest {
Expand Down
7 changes: 7 additions & 0 deletions src/sinks/splunk_hec/common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl HttpRequestBuilder {
passthrough_token: Option<Arc<str>>,
metadata_fields: MetadataFields,
auto_extract_timestamp: bool,
dyn_headers: Vec<(HeaderName, HeaderValue)>,
) -> Result<Request<Bytes>, crate::Error> {
let uri = match self.endpoint_target {
EndpointTarget::Raw => {
Expand Down Expand Up @@ -267,6 +268,11 @@ impl HttpRequestBuilder {
crate::Error::from("Failed to access headers in http::Request builder")
})?;

for (header, value) in dyn_headers {
headers.insert(header, value);
}

// Static headers from request config take precedence over dynamic headers
for (header, value) in self.headers.iter() {
headers.insert(header, value.clone());
}
Expand Down Expand Up @@ -376,6 +382,7 @@ mod tests {
source: None,
sourcetype: None,
host: None,
headers: vec![],
}
}

Expand Down
Loading