From b4d123c147208ae4a1f841ac251b46451017c417 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 02:36:39 -0700 Subject: [PATCH 01/12] splunk source --- src/sources/splunk_hec/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index bddf118e5..8c35e9646 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -33,6 +33,8 @@ use vector_lib::{ EstimatedJsonEncodedSizeOf, }; use vector_lib::{configurable::configurable_component, tls::MaybeTlsIncomingStream}; +use vector_lib::ipallowlist::IpAllowlistConfig; + use vrl::path::OwnedTargetPath; use vrl::value::{kind::Collection, Kind}; use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply}; @@ -117,6 +119,9 @@ pub struct SplunkConfig { #[configurable(derived)] #[serde(default)] keepalive: KeepaliveConfig, + + #[configurable(derived)] + pub permit_origin: Option, } impl_generate_config_from_default!(SplunkConfig); @@ -132,6 +137,7 @@ impl Default for SplunkConfig { store_hec_token: false, log_namespace: None, keepalive: Default::default(), + permit_origin: None, } } } @@ -170,6 +176,8 @@ impl SourceConfig for SplunkConfig { .or_else(finish_err); let listener = tls.bind(&self.address).await?; + let listener = listener + .with_allowlist(self.permit_origin.clone().map(Into::into)); let keepalive_settings = self.keepalive.clone(); Ok(Box::pin(async move { @@ -1333,6 +1341,7 @@ mod tests { store_hec_token, log_namespace: None, keepalive: Default::default(), + permit_origin: None, } .build(cx) .await From 50c1a4d4be7ef0aa9586ad983b950cc40dd8f1d8 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 02:38:05 -0700 Subject: [PATCH 02/12] firehose source --- src/sources/aws_kinesis_firehose/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index 298bce5df..83e1785bb 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -9,6 +9,7 @@ use tracing::Span; use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig}; use vector_lib::config::{LegacyKey, LogNamespace}; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use vector_lib::lookup::owned_value_path; use vector_lib::sensitive_string::SensitiveString; use vector_lib::tls::MaybeTlsIncomingStream; @@ -104,6 +105,9 @@ pub struct AwsKinesisFirehoseConfig { #[configurable(derived)] #[serde(default)] keepalive: KeepaliveConfig, + + #[configurable(derived)] + pub permit_origin: Option, } const fn access_keys_example() -> [&'static str; 2] { @@ -181,6 +185,8 @@ impl SourceConfig for AwsKinesisFirehoseConfig { let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?; let listener = tls.bind(&self.address).await?; + let listener = listener + .with_allowlist(self.permit_origin.clone().map(Into::into)); let keepalive_settings = self.keepalive.clone(); let shutdown = cx.shutdown; @@ -261,6 +267,7 @@ impl GenerateConfig for AwsKinesisFirehoseConfig { acknowledgements: Default::default(), log_namespace: None, keepalive: Default::default(), + permit_origin: None, }) .unwrap() } @@ -354,6 +361,7 @@ mod tests { acknowledgements: true.into(), log_namespace: Some(log_namespace), keepalive: Default::default(), + permit_origin: None, } .build(cx) .await From e2b0a0144d787a5cfcaea744c50a5f2f5f477732 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 02:43:38 -0700 Subject: [PATCH 03/12] all http push models inherit allowlist feature from http/prelude.rs - heroku, prometheus pushgateway and remote_write --- src/sources/heroku_logs.rs | 7 +++++++ src/sources/http_server.rs | 8 ++++++++ src/sources/prometheus/pushgateway.rs | 7 +++++++ src/sources/prometheus/remote_write.rs | 9 +++++++++ src/sources/util/http/prelude.rs | 5 +++++ 5 files changed, 36 insertions(+) diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 55ea1d80b..fafc2f4f9 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -18,6 +18,7 @@ use vrl::value::{kind::Collection, Kind}; use warp::http::{HeaderMap, StatusCode}; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use vector_lib::{ config::{LegacyKey, LogNamespace}, schema::Definition, @@ -92,6 +93,9 @@ pub struct LogplexConfig { #[configurable(derived)] #[serde(default)] keepalive: KeepaliveConfig, + + #[configurable(derived)] + pub permit_origin: Option, } impl LogplexConfig { @@ -162,6 +166,7 @@ impl Default for LogplexConfig { acknowledgements: SourceAcknowledgementsConfig::default(), log_namespace: None, keepalive: KeepaliveConfig::default(), + permit_origin: None, } } } @@ -202,6 +207,7 @@ impl SourceConfig for LogplexConfig { cx, self.acknowledgements, self.keepalive.clone(), + self.permit_origin.clone(), ) } @@ -474,6 +480,7 @@ mod tests { acknowledgements: acknowledgements.into(), log_namespace: None, keepalive: Default::default(), + permit_origin: None, } .build(context) .await diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index ae0787023..cf48a26c5 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -14,6 +14,7 @@ use vector_lib::codecs::{ NewlineDelimitedDecoderConfig, }; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; use vector_lib::{ config::{DataType, LegacyKey, LogNamespace}, @@ -174,6 +175,9 @@ pub struct SimpleHttpConfig { #[configurable(derived)] #[serde(default)] keepalive: KeepaliveConfig, + + #[configurable(derived)] + pub permit_origin: Option, } impl SimpleHttpConfig { @@ -285,6 +289,7 @@ impl Default for SimpleHttpConfig { acknowledgements: SourceAcknowledgementsConfig::default(), log_namespace: None, keepalive: KeepaliveConfig::default(), + permit_origin: None, } } } @@ -384,6 +389,7 @@ impl SourceConfig for SimpleHttpConfig { cx, self.acknowledgements, self.keepalive.clone(), + self.permit_origin.clone(), ) } @@ -610,6 +616,7 @@ mod tests { acknowledgements: acknowledgements.into(), log_namespace: None, keepalive: Default::default(), + permit_origin: None, } .build(context) .await @@ -667,6 +674,7 @@ mod tests { acknowledgements: acknowledgements.into(), log_namespace: None, keepalive: Default::default(), + permit_origin: None, } .build(context) .await diff --git a/src/sources/prometheus/pushgateway.rs b/src/sources/prometheus/pushgateway.rs index 537487dd5..52f36e263 100644 --- a/src/sources/prometheus/pushgateway.rs +++ b/src/sources/prometheus/pushgateway.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use itertools::Itertools; use vector_lib::config::LogNamespace; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use warp::http::HeaderMap; use super::parser; @@ -70,6 +71,9 @@ pub struct PrometheusPushgatewayConfig { /// meaningfully aggregated. #[serde(default = "crate::serde::default_false")] aggregate_metrics: bool, + + #[configurable(derived)] + pub permit_origin: Option, } impl GenerateConfig for PrometheusPushgatewayConfig { @@ -81,6 +85,7 @@ impl GenerateConfig for PrometheusPushgatewayConfig { acknowledgements: SourceAcknowledgementsConfig::default(), aggregate_metrics: false, keepalive: KeepaliveConfig::default(), + permit_origin: None, }) .unwrap() } @@ -104,6 +109,7 @@ impl SourceConfig for PrometheusPushgatewayConfig { cx, self.acknowledgements, self.keepalive.clone(), + self.permit_origin.clone(), ) } @@ -374,6 +380,7 @@ mod test { acknowledgements: SourceAcknowledgementsConfig::default(), keepalive: KeepaliveConfig::default(), aggregate_metrics: true, + permit_origin: None, }; let source = source .build(SourceContext::new_test(tx, None)) diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index e4a8c9bff..66c774985 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use prost::Message; use vector_lib::config::LogNamespace; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use vector_lib::prometheus::parser::proto; use warp::http::{HeaderMap, StatusCode}; @@ -50,6 +51,9 @@ pub struct PrometheusRemoteWriteConfig { #[configurable(derived)] #[serde(default)] keepalive: KeepaliveConfig, + + #[configurable(derived)] + pub permit_origin: Option, } impl PrometheusRemoteWriteConfig { @@ -61,6 +65,7 @@ impl PrometheusRemoteWriteConfig { auth: None, acknowledgements: false.into(), keepalive: KeepaliveConfig::default(), + permit_origin: None, } } } @@ -73,6 +78,7 @@ impl GenerateConfig for PrometheusRemoteWriteConfig { auth: None, acknowledgements: SourceAcknowledgementsConfig::default(), keepalive: KeepaliveConfig::default(), + permit_origin: None, }) .unwrap() } @@ -94,6 +100,7 @@ impl SourceConfig for PrometheusRemoteWriteConfig { cx, self.acknowledgements, self.keepalive.clone(), + self.permit_origin.clone(), ) } @@ -192,6 +199,7 @@ mod test { tls: tls.clone(), acknowledgements: SourceAcknowledgementsConfig::default(), keepalive: KeepaliveConfig::default(), + permit_origin: None, }; let source = source .build(SourceContext::new_test(tx, None)) @@ -285,6 +293,7 @@ mod test { tls: None, acknowledgements: SourceAcknowledgementsConfig::default(), keepalive: KeepaliveConfig::default(), + permit_origin: None, }; let source = source .build(SourceContext::new_test(tx, None)) diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 1e452882a..c6560c373 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -27,6 +27,8 @@ use warp::{ Filter, }; +use vector_lib::ipallowlist::IpAllowlistConfig; + use crate::{ config::SourceContext, http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer}, @@ -83,6 +85,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { cx: SourceContext, acknowledgements: SourceAcknowledgementsConfig, keepalive_settings: KeepaliveConfig, + permit_origin: Option, ) -> crate::Result { let tls = MaybeTlsSettings::from_config(tls, true)?; let protocol = tls.http_protocol_name(); @@ -220,6 +223,8 @@ pub trait HttpSource: Clone + Send + Sync + 'static { let listener = tls.bind(&address).await.map_err(|err| { error!("An error occurred: {:?}.", err); })?; + let listener = listener + .with_allowlist(permit_origin.map(Into::into)); Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) .serve(make_svc) From a6c0ba87e9bebc21198fde5707169606373b1b5b Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 02:53:11 -0700 Subject: [PATCH 04/12] datadog agent allowlist --- src/sources/datadog_agent/mod.rs | 7 +++++++ src/sources/datadog_agent/tests.rs | 3 +++ 2 files changed, 10 insertions(+) diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 5d53300fb..8588910fe 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -37,6 +37,7 @@ use tracing::Span; use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig}; use vector_lib::config::{LegacyKey, LogNamespace}; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use vector_lib::event::{BatchNotifier, BatchStatus}; use vector_lib::internal_event::{EventsReceived, Registered}; use vector_lib::lookup::owned_value_path; @@ -141,6 +142,9 @@ pub struct DatadogAgentConfig { #[configurable(derived)] #[serde(default)] keepalive: KeepaliveConfig, + + #[configurable(derived)] + pub permit_origin: Option, } impl GenerateConfig for DatadogAgentConfig { @@ -159,6 +163,7 @@ impl GenerateConfig for DatadogAgentConfig { parse_ddtags: false, log_namespace: Some(false), keepalive: KeepaliveConfig::default(), + permit_origin: None, }) .unwrap() } @@ -190,6 +195,8 @@ impl SourceConfig for DatadogAgentConfig { self.parse_ddtags, ); let listener = tls.bind(&self.address).await?; + let listener = listener + .with_allowlist(self.permit_origin.clone().map(Into::into)); let acknowledgements = cx.do_acknowledgements(self.acknowledgements); let filters = source.build_warp_filters(cx.out, acknowledgements, self)?; let shutdown = cx.shutdown; diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index f328c8861..8c68becfd 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -1578,6 +1578,7 @@ fn test_config_outputs_with_disabled_data_types() { parse_ddtags: false, log_namespace: Some(false), keepalive: Default::default(), + permit_origin: None, }; let outputs: Vec = config @@ -2020,6 +2021,7 @@ fn test_config_outputs() { parse_ddtags: false, log_namespace: Some(false), keepalive: Default::default(), + permit_origin: None, }; let mut outputs = config @@ -2527,6 +2529,7 @@ impl ValidatableComponent for DatadogAgentConfig { parse_ddtags: false, log_namespace: Some(false), keepalive: Default::default(), + permit_origin: None, }; let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into(); From 6c86c723e69c7fd9d4247f04677b270ab07c2cdc Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 03:01:58 -0700 Subject: [PATCH 05/12] opentelemetry http/gRPC allowlist --- src/sources/opentelemetry/http.rs | 5 +++++ src/sources/opentelemetry/mod.rs | 7 +++++++ src/sources/opentelemetry/tests.rs | 4 ++++ src/sources/util/grpc/mod.rs | 5 +++++ 4 files changed, 21 insertions(+) diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index f7472fb94..cadc50f70 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -28,6 +28,8 @@ use warp::{ filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response, Filter, Reply, }; +use vector_lib::ipallowlist::IpAllowlistConfig; + use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer}; use crate::sources::http_server::HttpConfigParamKind; use crate::sources::util::add_headers; @@ -57,8 +59,11 @@ pub(crate) async fn run_http_server( filters: BoxedFilter<(Response,)>, shutdown: ShutdownSignal, keepalive_settings: KeepaliveConfig, + permit_origin: Option, ) -> crate::Result<()> { let listener = tls_settings.bind(&address).await?; + let listener = listener + .with_allowlist(permit_origin.map(Into::into)); let routes = filters.recover(handle_rejection); info!(message = "Building HTTP server.", address = %address); diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index 63acffd26..ff750040a 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -20,6 +20,7 @@ use vector_lib::opentelemetry::logs::{ use tonic::transport::server::RoutesBuilder; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use vector_lib::internal_event::{BytesReceived, EventsReceived, Protocol}; use vector_lib::opentelemetry::proto::collector::{ logs::v1::logs_service_server::LogsServiceServer, @@ -72,6 +73,9 @@ pub struct OpentelemetryConfig { #[configurable(metadata(docs::hidden))] #[serde(default)] log_namespace: Option, + + #[configurable(derived)] + pub permit_origin: Option, } /// Configuration for the `opentelemetry` gRPC server. @@ -149,6 +153,7 @@ impl GenerateConfig for OpentelemetryConfig { http: Option::from(example_http_config()), acknowledgements: Default::default(), log_namespace: None, + permit_origin: None, }) .unwrap() } @@ -211,6 +216,7 @@ impl SourceConfig for OpentelemetryConfig { grpc_tls_settings, builder.routes(), cx.shutdown.clone(), + self.permit_origin.clone(), ) .map_err(|error| { error!(message = "Source future failed.", %error); @@ -239,6 +245,7 @@ impl SourceConfig for OpentelemetryConfig { filters, cx.shutdown, http_config.keepalive.clone(), + self.permit_origin.clone(), )) } else { None diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 4c8b43704..c3616dfd0 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1136,6 +1136,7 @@ async fn http_headers() { http: http_config, acknowledgements: Default::default(), log_namespace: Default::default(), + permit_origin: None, }; let schema_definitions = source .outputs(LogNamespace::Legacy) @@ -1243,6 +1244,7 @@ async fn only_http_config() { http: http_config, acknowledgements: Default::default(), log_namespace: Default::default(), + permit_origin: None, }; let schema_definitions = source .outputs(LogNamespace::Legacy) @@ -1500,6 +1502,7 @@ pub async fn build_otlp_test_env( http: http_config, acknowledgements: Default::default(), log_namespace, + permit_origin: None, }; let (sender, output, _) = new_source(EventStatus::Delivered, event_name.to_string()); @@ -1537,6 +1540,7 @@ pub async fn build_only_grpc_otlp_test_env( http: http_config, acknowledgements: Default::default(), log_namespace, + permit_origin: None, }; let (sender, output, _) = new_source(EventStatus::Delivered, event_name.to_string()); diff --git a/src/sources/util/grpc/mod.rs b/src/sources/util/grpc/mod.rs index 0c19e9795..a378efdd8 100644 --- a/src/sources/util/grpc/mod.rs +++ b/src/sources/util/grpc/mod.rs @@ -1,3 +1,5 @@ +use vector_lib::ipallowlist::IpAllowlistConfig; + use crate::{ internal_events::{GrpcServerRequestReceived, GrpcServerResponseSent}, shutdown::{ShutdownSignal, ShutdownSignalToken}, @@ -68,10 +70,13 @@ pub async fn run_grpc_server_with_routes( tls_settings: MaybeTlsSettings, routes: Routes, shutdown: ShutdownSignal, + permit_origin: Option, ) -> crate::Result<()> { let span = Span::current(); let (tx, rx) = tokio::sync::oneshot::channel::(); let listener = tls_settings.bind(&address).await?; + let listener = listener + .with_allowlist(permit_origin.map(Into::into)); let stream = listener.accept_stream(); info!(%address, "Building gRPC server."); From c9c27b0d6396cb554cf36bf80fb90c292cd79d24 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 03:18:30 -0700 Subject: [PATCH 06/12] UDP allowlist --- src/sources/socket/udp.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index a1b39c2fb..a0d2eea13 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -2,6 +2,7 @@ use super::default_host_key; use bytes::BytesMut; use chrono::Utc; use futures::StreamExt; +use ipnet::IpNet; use listenfd::ListenFd; use tokio_util::codec::FramedRead; use vector_lib::codecs::{ @@ -9,6 +10,7 @@ use vector_lib::codecs::{ StreamDecodingError, }; use vector_lib::configurable::configurable_component; +use vector_lib::ipallowlist::IpAllowlistConfig; use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol}; use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; use vector_lib::{ @@ -84,6 +86,9 @@ pub struct UdpConfig { #[serde(default)] #[configurable(metadata(docs::hidden))] pub log_namespace: Option, + + #[configurable(derived)] + pub permit_origin: Option, } fn default_port_key() -> OptionalValuePath { @@ -125,6 +130,7 @@ impl UdpConfig { framing: None, decoding: default_decoding(), log_namespace: None, + permit_origin: None, } } @@ -164,6 +170,8 @@ pub(super) fn udp( max_length = std::cmp::min(max_length, receive_buffer_bytes); } + let origin_allowlist: Option> = config.permit_origin.clone().map(Into::into); + let bytes_received = register!(BytesReceived::from(Protocol::UDP)); info!(message = "Listening.", address = %config.address); @@ -196,6 +204,12 @@ pub(super) fn udp( } }; + if let Some(ref allowlist) = origin_allowlist { + if !allowlist.iter().any(|net| net.contains(&address.ip())) { + continue; + } + } + bytes_received.emit(ByteSize(byte_size)); let payload = buf.split_to(byte_size); let truncated = byte_size == max_length + 1; From 57a68f8435cb738edb92dc40a53fba26f87e8353 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 10:26:26 -0700 Subject: [PATCH 07/12] create a loop to wait for a valid source or throw an error. If not in allowlist, loop and wait --- lib/vector-core/src/tls/incoming.rs | 45 ++++++++++++++++------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/lib/vector-core/src/tls/incoming.rs b/lib/vector-core/src/tls/incoming.rs index d13c10717..8873e1c04 100644 --- a/lib/vector-core/src/tls/incoming.rs +++ b/lib/vector-core/src/tls/incoming.rs @@ -83,28 +83,33 @@ pub struct MaybeTlsListener { impl MaybeTlsListener { pub async fn accept(&mut self) -> crate::tls::Result> { - let listener = self - .listener - .accept() - .await - .map(|(stream, peer_addr)| { - MaybeTlsIncomingStream::new(stream, peer_addr, self.acceptor.clone()) - }) - .context(IncomingListenerSnafu)?; - - if let Some(origin_filter) = &self.origin_filter { - if origin_filter - .iter() - .any(|net| net.contains(&listener.peer_addr().ip())) - { - Ok(listener) - } else { - Err(TlsError::Connect { - source: std::io::ErrorKind::ConnectionRefused.into(), + loop { + let listener = self + .listener + .accept() + .await + .map(|(stream, peer_addr)| { + MaybeTlsIncomingStream::new(stream, peer_addr, self.acceptor.clone()) }) + .context(IncomingListenerSnafu)?; + + if let Some(origin_filter) = &self.origin_filter { + if origin_filter + .iter() + .any(|net| net.contains(&listener.peer_addr().ip())) + { + return Ok(listener); + } else { + warn!( + message = "Rejected connection from non-allowed origin.", + peer_addr = %listener.peer_addr(), + ); + drop(listener); + continue; + } + } else { + return Ok(listener); } - } else { - Ok(listener) } } From 1e8e038e302fd7db639cca891db679ffa0c5f6cd Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 9 Apr 2026 11:07:18 -0700 Subject: [PATCH 08/12] create unit test to verify allowlist block works --- src/sources/aws_kinesis_firehose/mod.rs | 52 ++++++++ src/sources/datadog_agent/tests.rs | 46 ++++++++ src/sources/heroku_logs.rs | 47 ++++++++ src/sources/http_server.rs | 54 +++++++++ src/sources/opentelemetry/tests.rs | 44 +++++++ src/sources/prometheus/pushgateway.rs | 41 +++++++ src/sources/prometheus/remote_write.rs | 40 +++++++ src/sources/socket/mod.rs | 150 ++++++++++++++++++++++++ src/sources/splunk_hec/mod.rs | 46 ++++++++ 9 files changed, 520 insertions(+) diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index 83e1785bb..7d7b496ee 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -945,4 +945,56 @@ mod tests { .get("aws_kinesis_firehose_access_key") .is_none()); } + + #[tokio::test] + async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + use futures::StreamExt; + + let (sender, mut recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let cx = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + AwsKinesisFirehoseConfig { + address, + tls: None, + access_key: None, + access_keys: None, + store_access_key: false, + record_compression: Compression::None, + framing: default_framing_message_based(), + decoding: default_decoding(), + acknowledgements: true.into(), + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(cx) + .await + .unwrap() + .await + .unwrap() + }); + wait_for_tcp(address).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}", address)) + .header("x-amz-firehose-protocol-version", "1.0") + .header("x-amz-firehose-request-id", REQUEST_ID) + .header("x-amz-firehose-source-arn", SOURCE_ARN) + .header("content-type", "application/json") + .body(r#"{"requestId":"test","timestamp":1234567890,"records":[{"data":"dGVzdA=="}]}"#) + .send() + .await; + + let result = timeout(Duration::from_millis(200), recv.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } } diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 8c68becfd..261af8f78 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -2563,4 +2563,50 @@ impl ValidatableComponent for DatadogAgentConfig { } } +#[tokio::test] +async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + + let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let mut recv = recv; + let address = next_addr(); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + let config = toml::from_str::(&format!( + indoc! { r#" + address = "{}" + compression = "none" + store_api_key = false + "#}, + address + )) + .unwrap(); + + // Override permit_origin since toml deserialization defaults to None + let config = DatadogAgentConfig { + permit_origin, + ..config + }; + + let context = SourceContext::new_test(sender, None); + tokio::spawn(async move { + config.build(context).await.unwrap().await.unwrap(); + }); + wait_for_tcp(address).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}/v1/input/", address)) + .body(r#"[{"message":"blocked"}]"#) + .send() + .await; + + let result = timeout(Duration::from_millis(200), recv.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); +} + register_validatable_component!(DatadogAgentConfig); diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index fafc2f4f9..e924f9d31 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -817,4 +817,51 @@ mod tests { assert_eq!(definitions, Some(expected_definition)) } + + #[tokio::test] + async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + use futures::StreamExt; + + let (sender, mut recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let context = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + LogplexConfig { + address, + query_parameters: vec![], + tls: None, + auth: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + acknowledgements: false.into(), + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(context) + .await + .unwrap() + .await + .unwrap() + }); + wait_for_tcp(address).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}/events", address)) + .header("Logplex-Msg-Count", "1") + .body("test heroku blocked") + .send() + .await; + + let result = timeout(Duration::from_millis(200), recv.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index cf48a26c5..f9a82d2a4 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -1794,5 +1794,59 @@ mod tests { assert_eq!(events.len(), 1); } + #[tokio::test] + async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + use futures::StreamExt; + + let (sender, mut recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let context = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + SimpleHttpConfig { + address, + headers: vec![], + encoding: None, + query_parameters: vec![], + response_code: StatusCode::OK, + tls: None, + auth: None, + strict_path: true, + path_key: OptionalValuePath::from(owned_value_path!("path")), + host_key: OptionalValuePath::from(owned_value_path!("host")), + path: "/".to_owned(), + method: HttpMethod::Post, + framing: None, + decoding: None, + acknowledgements: false.into(), + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(context) + .await + .unwrap() + .await + .unwrap(); + }); + wait_for_tcp(address).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}/", address)) + .body("blocked") + .send() + .await; + + let result = timeout(Duration::from_millis(200), recv.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } + register_validatable_component!(SimpleHttpConfig); } diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index c3616dfd0..38769f9f4 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1588,6 +1588,50 @@ fn vec_into_btmap(arr: Vec<(&'static str, Value)>) -> ObjectMap { ) } +#[tokio::test] +async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + + let http_addr = next_addr(); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + let config = OpentelemetryConfig { + grpc: None, + http: Some(HttpConfig { + address: http_addr, + tls: Default::default(), + keepalive: Default::default(), + headers: Default::default(), + }), + acknowledgements: Default::default(), + log_namespace: None, + permit_origin, + }; + + let (sender, mut output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); + let server = config + .build(SourceContext::new_test(sender, None)) + .await + .expect("Failed to build source"); + tokio::spawn(server); + test_util::wait_for_tcp(http_addr).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}/v1/logs", http_addr)) + .header("Content-Type", "application/x-protobuf") + .body(vec![0u8; 0]) + .send() + .await; + + let result = timeout(Duration::from_millis(200), output.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); +} + fn current_time_and_nanos() -> (SystemTime, u64) { let time = SystemTime::now(); let nanos = time diff --git a/src/sources/prometheus/pushgateway.rs b/src/sources/prometheus/pushgateway.rs index 52f36e263..ea4232597 100644 --- a/src/sources/prometheus/pushgateway.rs +++ b/src/sources/prometheus/pushgateway.rs @@ -494,4 +494,45 @@ mod test { }) .await; } + + #[tokio::test] + async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + use futures::StreamExt; + + let address = test_util::next_addr(); + let (tx, mut rx) = SourceSender::new_test_finalize(EventStatus::Delivered); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + let source = PrometheusPushgatewayConfig { + address, + auth: None, + tls: None, + acknowledgements: SourceAcknowledgementsConfig::default(), + keepalive: KeepaliveConfig::default(), + aggregate_metrics: true, + permit_origin, + }; + let source = source + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(source); + wait_for_tcp(address).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}:{}/metrics/job/test", address.ip(), address.port())) + .header("Content-Type", "text/plain") + .body("test_metric 42") + .send() + .await; + + let result = timeout(Duration::from_millis(200), rx.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } } diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 66c774985..51a2fdf01 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -347,6 +347,45 @@ mod test { vector_lib::assert_event_data_eq!(expected, output); } + + #[tokio::test] + async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + use futures::StreamExt; + + let address = test_util::next_addr(); + let (tx, mut rx) = SourceSender::new_test_finalize(EventStatus::Delivered); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + let source = PrometheusRemoteWriteConfig { + address, + auth: None, + tls: None, + acknowledgements: SourceAcknowledgementsConfig::default(), + keepalive: KeepaliveConfig::default(), + permit_origin, + }; + let source = source + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(source); + wait_for_tcp(address).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}/api/v1/write", address)) + .body("test") + .send() + .await; + + let result = timeout(Duration::from_millis(200), rx.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } } #[cfg(all(test, feature = "prometheus-integration-tests"))] @@ -384,6 +423,7 @@ mod integration_tests { tls: None, acknowledgements: SourceAcknowledgementsConfig::default(), keepalive: KeepaliveConfig::default(), + permit_origin: None, }; let events = run_and_assert_source_compliance( diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index bcc8d7605..18c4b68a3 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -1791,4 +1791,154 @@ mod test { }) .await; } + + //////// PERMIT_ORIGIN / ALLOWLIST TESTS //////// + + fn make_allowlist(cidrs: &[&str]) -> Option { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + Some(IpAllowlistConfig( + cidrs + .iter() + .map(|s| IpNetConfig(s.parse().unwrap())) + .collect(), + )) + } + + #[tokio::test] + async fn tcp_permit_origin_allows_matching_ip() { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { + let (tx, mut rx) = SourceSender::new_test(); + let addr = next_addr(); + + let mut config = TcpConfig::from_address(addr.into()); + config.permit_origin = make_allowlist(&["127.0.0.1/32"]); + + let server = SocketConfig::from(config) + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(server); + + wait_for_tcp(addr).await; + send_lines(addr, vec!["allowed".to_owned()].into_iter()) + .await + .unwrap(); + + let event = rx.next().await.unwrap(); + assert_eq!( + event.as_log()[log_schema().message_key().unwrap().to_string()], + "allowed".into() + ); + }) + .await; + } + + #[tokio::test] + async fn tcp_permit_origin_blocks_non_matching_ip() { + let (tx, mut rx) = SourceSender::new_test(); + let addr = next_addr(); + + let mut config = TcpConfig::from_address(addr.into()); + // Only allow 10.0.0.1 — localhost (127.0.0.1) should be rejected + config.permit_origin = make_allowlist(&["10.0.0.1/32"]); + + let server = SocketConfig::from(config) + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(server); + + wait_for_tcp(addr).await; + // Connection from localhost should be accepted at TCP level but rejected by allowlist + let _ = send_lines(addr, vec!["blocked".to_owned()].into_iter()).await; + + // No events should have been received within a reasonable window + let result = timeout(Duration::from_millis(200), rx.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } + + #[tokio::test] + async fn udp_permit_origin_allows_matching_ip() { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { + let (tx, rx) = SourceSender::new_test(); + let address = next_addr(); + + let mut config = UdpConfig::from_address(address.into()); + config.permit_origin = make_allowlist(&["127.0.0.1/32"]); + + let address = init_udp_with_config(tx, config).await; + + send_lines_udp(address, vec!["allowed".to_string()]); + let events = collect_n(rx, 1).await; + + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "allowed".into() + ); + }) + .await; + } + + #[tokio::test] + async fn udp_permit_origin_blocks_non_matching_ip() { + let (tx, mut rx) = SourceSender::new_test(); + let address = next_addr(); + + let mut config = UdpConfig::from_address(address.into()); + // Only allow 10.0.0.1 — localhost (127.0.0.1) should be rejected + config.permit_origin = make_allowlist(&["10.0.0.1/32"]); + + let address = init_udp_with_config(tx, config).await; + + send_lines_udp(address, vec!["blocked".to_string()]); + + // No events should have been received within a reasonable window + let result = timeout(Duration::from_millis(200), rx.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } + + #[tokio::test] + async fn udp_permit_origin_allows_cidr_range() { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { + let (tx, rx) = SourceSender::new_test(); + let address = next_addr(); + + let mut config = UdpConfig::from_address(address.into()); + // 127.0.0.0/8 covers all of 127.x.x.x + config.permit_origin = make_allowlist(&["127.0.0.0/8"]); + + let address = init_udp_with_config(tx, config).await; + + send_lines_udp(address, vec!["cidr_allowed".to_string()]); + let events = collect_n(rx, 1).await; + + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "cidr_allowed".into() + ); + }) + .await; + } + + #[tokio::test] + async fn udp_no_permit_origin_allows_all() { + assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async { + let (tx, rx) = SourceSender::new_test(); + let address = next_addr(); + + let mut config = UdpConfig::from_address(address.into()); + config.permit_origin = None; + + let address = init_udp_with_config(tx, config).await; + + send_lines_udp(address, vec!["no_filter".to_string()]); + let events = collect_n(rx, 1).await; + + assert_eq!( + events[0].as_log()[log_schema().message_key().unwrap().to_string()], + "no_filter".into() + ); + }) + .await; + } } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 8c35e9646..2477de43d 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -2818,5 +2818,51 @@ mod tests { } } + #[tokio::test] + async fn permit_origin_blocks_non_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + use tokio::time::{timeout, Duration}; + use futures::StreamExt; + + let (sender, mut recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let cx = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("10.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + SplunkConfig { + address, + token: Some(TOKEN.to_owned().into()), + valid_tokens: None, + tls: None, + acknowledgements: Default::default(), + store_hec_token: false, + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(cx) + .await + .unwrap() + .await + .unwrap() + }); + wait_for_tcp(address).await; + + // Send from localhost — should be blocked by allowlist + let _ = reqwest::Client::new() + .post(format!("http://{}/services/collector", address)) + .header("Authorization", format!("Splunk {}", TOKEN)) + .body(r#"{"event":"blocked"}"#) + .send() + .await; + + let result = timeout(Duration::from_millis(200), recv.next()).await; + assert!(result.is_err(), "expected no events from blocked IP"); + } + register_validatable_component!(SplunkConfig); } From ce681e6d4c836e8fde18b01c56edb94fdecaa873 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Fri, 10 Apr 2026 13:20:12 -0700 Subject: [PATCH 09/12] positive happy path works --- src/sources/aws_kinesis_firehose/mod.rs | 50 ++++++++++++++++++++++++ src/sources/datadog_agent/tests.rs | 42 ++++++++++++++++++++ src/sources/heroku_logs.rs | 44 +++++++++++++++++++++ src/sources/http_server.rs | 51 +++++++++++++++++++++++++ src/sources/opentelemetry/tests.rs | 42 ++++++++++++++++++++ src/sources/prometheus/pushgateway.rs | 38 ++++++++++++++++++ src/sources/prometheus/remote_write.rs | 36 +++++++++++++++++ src/sources/splunk_hec/mod.rs | 43 +++++++++++++++++++++ 8 files changed, 346 insertions(+) diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index 7d7b496ee..ece8ced40 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -997,4 +997,54 @@ mod tests { let result = timeout(Duration::from_millis(200), recv.next()).await; assert!(result.is_err(), "expected no events from blocked IP"); } + +#[tokio::test] +async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let cx = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + AwsKinesisFirehoseConfig { + address, + tls: None, + access_key: None, + access_keys: None, + store_access_key: false, + record_compression: Compression::None, + framing: default_framing_message_based(), + decoding: default_decoding(), + acknowledgements: true.into(), + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(cx) + .await + .unwrap() + .await + .unwrap() + }); + wait_for_tcp(address).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}", address)) + .header("x-amz-firehose-protocol-version", "1.0") + .header("x-amz-firehose-request-id", REQUEST_ID) + .header("x-amz-firehose-source-arn", SOURCE_ARN) + .header("content-type", "application/json") + .body(r#"{"requestId":"test","timestamp":1234567890,"records":[{"data":"dGVzdA=="}]}"#) + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); +} + } diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 261af8f78..8b53d949e 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -2609,4 +2609,46 @@ async fn permit_origin_blocks_non_matching_ip() { assert!(result.is_err(), "expected no events from blocked IP"); } +#[tokio::test] +async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + let config = toml::from_str::(&format!( + indoc! { r#" + address = "{}" + compression = "none" + store_api_key = false + "#}, + address + )) + .unwrap(); + + let config = DatadogAgentConfig { + permit_origin, + ..config + }; + + let context = SourceContext::new_test(sender, None); + tokio::spawn(async move { + config.build(context).await.unwrap().await.unwrap(); + }); + wait_for_tcp(address).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}/v1/input/", address)) + .body(r#"[{"message":"allowed"}]"#) + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); +} + register_validatable_component!(DatadogAgentConfig); diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index e924f9d31..203d40a41 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -864,4 +864,48 @@ mod tests { let result = timeout(Duration::from_millis(200), recv.next()).await; assert!(result.is_err(), "expected no events from blocked IP"); } + + #[tokio::test] + async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let context = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + LogplexConfig { + address, + query_parameters: vec![], + tls: None, + auth: None, + framing: default_framing_message_based(), + decoding: default_decoding(), + acknowledgements: false.into(), + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(context) + .await + .unwrap() + .await + .unwrap() + }); + wait_for_tcp(address).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}/events", address)) + .header("Logplex-Msg-Count", "1") + .body("test heroku allowed") + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); + } } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index f9a82d2a4..7b431f626 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -1848,5 +1848,56 @@ mod tests { assert!(result.is_err(), "expected no events from blocked IP"); } + #[tokio::test] + async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let context = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + SimpleHttpConfig { + address, + headers: vec![], + encoding: None, + query_parameters: vec![], + response_code: StatusCode::OK, + tls: None, + auth: None, + strict_path: true, + path_key: OptionalValuePath::from(owned_value_path!("path")), + host_key: OptionalValuePath::from(owned_value_path!("host")), + path: "/".to_owned(), + method: HttpMethod::Post, + framing: None, + decoding: None, + acknowledgements: false.into(), + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(context) + .await + .unwrap() + .await + .unwrap(); + }); + wait_for_tcp(address).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}/", address)) + .body("allowed") + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); + } + register_validatable_component!(SimpleHttpConfig); } diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 38769f9f4..f47060171 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1632,6 +1632,48 @@ async fn permit_origin_blocks_non_matching_ip() { assert!(result.is_err(), "expected no events from blocked IP"); } +#[tokio::test] +async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let http_addr = next_addr(); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + let config = OpentelemetryConfig { + grpc: None, + http: Some(HttpConfig { + address: http_addr, + tls: Default::default(), + keepalive: Default::default(), + headers: Default::default(), + }), + acknowledgements: Default::default(), + log_namespace: None, + permit_origin, + }; + + let (sender, _output, _) = new_source(EventStatus::Delivered, LOGS.to_string()); + let server = config + .build(SourceContext::new_test(sender, None)) + .await + .expect("Failed to build source"); + tokio::spawn(server); + test_util::wait_for_tcp(http_addr).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}/v1/logs", http_addr)) + .header("Content-Type", "application/x-protobuf") + .body(vec![0u8; 0]) + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); +} + fn current_time_and_nanos() -> (SystemTime, u64) { let time = SystemTime::now(); let nanos = time diff --git a/src/sources/prometheus/pushgateway.rs b/src/sources/prometheus/pushgateway.rs index ea4232597..15591a2ac 100644 --- a/src/sources/prometheus/pushgateway.rs +++ b/src/sources/prometheus/pushgateway.rs @@ -535,4 +535,42 @@ mod test { let result = timeout(Duration::from_millis(200), rx.next()).await; assert!(result.is_err(), "expected no events from blocked IP"); } + + #[tokio::test] + async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let address = test_util::next_addr(); + let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + let source = PrometheusPushgatewayConfig { + address, + auth: None, + tls: None, + acknowledgements: SourceAcknowledgementsConfig::default(), + keepalive: KeepaliveConfig::default(), + aggregate_metrics: true, + permit_origin, + }; + let source = source + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(source); + wait_for_tcp(address).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}:{}/metrics/job/test", address.ip(), address.port())) + .header("Content-Type", "text/plain") + .body("test_metric 42") + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); + } } diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 51a2fdf01..22b54ad42 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -386,6 +386,42 @@ mod test { let result = timeout(Duration::from_millis(200), rx.next()).await; assert!(result.is_err(), "expected no events from blocked IP"); } + + #[tokio::test] + async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let address = test_util::next_addr(); + let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + let source = PrometheusRemoteWriteConfig { + address, + auth: None, + tls: None, + acknowledgements: SourceAcknowledgementsConfig::default(), + keepalive: KeepaliveConfig::default(), + permit_origin, + }; + let source = source + .build(SourceContext::new_test(tx, None)) + .await + .unwrap(); + tokio::spawn(source); + wait_for_tcp(address).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}/api/v1/write", address)) + .body("test") + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); + } } #[cfg(all(test, feature = "prometheus-integration-tests"))] diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 2477de43d..5649223d9 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -2864,5 +2864,48 @@ mod tests { assert!(result.is_err(), "expected no events from blocked IP"); } + #[tokio::test] + async fn permit_origin_allows_matching_ip() { + use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig}; + + let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered); + let address = next_addr(); + let cx = SourceContext::new_test(sender, None); + + let permit_origin = Some(IpAllowlistConfig(vec![ + IpNetConfig("127.0.0.1/32".parse().unwrap()), + ])); + + tokio::spawn(async move { + SplunkConfig { + address, + token: Some(TOKEN.to_owned().into()), + valid_tokens: None, + tls: None, + acknowledgements: Default::default(), + store_hec_token: false, + log_namespace: None, + keepalive: Default::default(), + permit_origin, + } + .build(cx) + .await + .unwrap() + .await + .unwrap() + }); + wait_for_tcp(address).await; + + // Send from localhost — should be accepted by allowlist + let response = reqwest::Client::new() + .post(format!("http://{}/services/collector", address)) + .header("Authorization", format!("Splunk {}", TOKEN)) + .body(r#"{"event":"allowed"}"#) + .send() + .await; + + assert!(response.is_ok(), "expected connection to be accepted for allowed IP"); + } + register_validatable_component!(SplunkConfig); } From e935100657ddf6566db26190194c9b733ac5b3c9 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Wed, 15 Apr 2026 13:28:58 -0700 Subject: [PATCH 10/12] working http errors. metrics logged correctly for http --- lib/vector-core/src/tls/incoming.rs | 43 ++++++++++++----------------- lib/vector-core/src/tls/mod.rs | 8 ++++++ src/internal_events/http.rs | 26 +++++++++++++++++ src/sources/util/http/prelude.rs | 20 ++++++++++++-- 4 files changed, 69 insertions(+), 28 deletions(-) diff --git a/lib/vector-core/src/tls/incoming.rs b/lib/vector-core/src/tls/incoming.rs index 8873e1c04..d1a5f9a1f 100644 --- a/lib/vector-core/src/tls/incoming.rs +++ b/lib/vector-core/src/tls/incoming.rs @@ -83,33 +83,26 @@ pub struct MaybeTlsListener { impl MaybeTlsListener { pub async fn accept(&mut self) -> crate::tls::Result> { - loop { - let listener = self - .listener - .accept() - .await - .map(|(stream, peer_addr)| { - MaybeTlsIncomingStream::new(stream, peer_addr, self.acceptor.clone()) - }) - .context(IncomingListenerSnafu)?; - - if let Some(origin_filter) = &self.origin_filter { - if origin_filter - .iter() - .any(|net| net.contains(&listener.peer_addr().ip())) - { - return Ok(listener); - } else { - warn!( - message = "Rejected connection from non-allowed origin.", - peer_addr = %listener.peer_addr(), - ); - drop(listener); - continue; - } + let listener = self + .listener + .accept() + .await + .map(|(stream, peer_addr)| { + MaybeTlsIncomingStream::new(stream, peer_addr, self.acceptor.clone()) + }) + .context(IncomingListenerSnafu)?; + + if let Some(origin_filter) = &self.origin_filter { + if origin_filter + .iter() + .any(|net| net.contains(&listener.peer_addr().ip())) + { + Ok(listener) } else { - return Ok(listener); + Err(TlsError::DisallowedPeer) } + } else { + Ok(listener) } } diff --git a/lib/vector-core/src/tls/mod.rs b/lib/vector-core/src/tls/mod.rs index 9e065cb21..3b300f839 100644 --- a/lib/vector-core/src/tls/mod.rs +++ b/lib/vector-core/src/tls/mod.rs @@ -115,6 +115,8 @@ pub enum TlsError { TcpBind { source: tokio::io::Error }, #[snafu(display("{}", source))] Connect { source: tokio::io::Error }, + #[snafu(display("Connection rejected: origin IP not in permit_origin list"))] + DisallowedPeer, #[snafu(display("Could not get peer address: {}", source))] PeerAddress { source: std::io::Error }, #[snafu(display("Security Framework Error: {}", source))] @@ -134,6 +136,12 @@ pub enum TlsError { CaStackPush { source: ErrorStack }, } +impl TlsError { + pub fn is_fatal(&self) -> bool { + !matches!(self, TlsError::DisallowedPeer) + } +} + impl MaybeTlsStream { pub fn peer_addr(&self) -> std::result::Result { match self { diff --git a/src/internal_events/http.rs b/src/internal_events/http.rs index 72876ed39..72430f498 100644 --- a/src/internal_events/http.rs +++ b/src/internal_events/http.rs @@ -133,6 +133,32 @@ impl InternalEvent for HttpDecompressError<'_> { } } +#[cfg(feature = "sources-utils-http")] +pub struct HttpBadPeerConnectionError<'a> { + pub error: &'a dyn std::fmt::Display, +} + +#[cfg(feature = "sources-utils-http")] +impl InternalEvent for HttpBadPeerConnectionError<'_> { + fn emit(self) { + warn!( + message = "Rejected connection from bad peer.", + error = %self.error, + error_code = "bad_peer", + error_type = error_type::CONNECTION_FAILED, + stage = error_stage::RECEIVING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "bad_peer", + "error_type" => error_type::CONNECTION_FAILED, + "stage" => error_stage::RECEIVING, + ) + .increment(1); + } +} + pub struct HttpInternalError<'a> { pub message: &'a str, } diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index c6560c373..a7eba525c 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -7,7 +7,7 @@ use std::{ }; use bytes::Bytes; -use futures::{FutureExt, TryFutureExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use hyper::{service::make_service_fn, Server}; use tokio::net::TcpStream; use tower::ServiceBuilder; @@ -33,7 +33,7 @@ use crate::{ config::SourceContext, http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer}, internal_events::{ - HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError, + HttpBadPeerConnectionError, HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError, }, sources::util::http::HttpMethod, tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig}, @@ -226,7 +226,21 @@ pub trait HttpSource: Clone + Send + Sync + 'static { let listener = listener .with_allowlist(permit_origin.map(Into::into)); - Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + Server::builder(hyper::server::accept::from_stream( + listener.accept_stream().filter_map(|result| async move { + match result { + Ok(stream) => Some(Ok::<_, Infallible>(stream)), + Err(err) => { + if err.is_fatal() { + warn!(message = "Fatal error accepting connection.", error = %err); + } else { + emit!(HttpBadPeerConnectionError { error: &err }); + } + None + } + } + }), + )) .serve(make_svc) .with_graceful_shutdown(cx.shutdown.map(|_| ())) .await From ade833ba2394d1916205f863bfca0d431f2f17c4 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 16 Apr 2026 15:11:08 -0700 Subject: [PATCH 11/12] splunk_hec --- src/sources/splunk_hec/mod.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 5649223d9..8ff42a90b 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -10,7 +10,7 @@ use std::{ use bytes::{Buf, Bytes}; use chrono::{DateTime, TimeZone, Utc}; use flate2::read::MultiGzDecoder; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use http::StatusCode; use hyper::{service::make_service_fn, Server}; use serde::Serialize; @@ -51,7 +51,7 @@ use crate::{ event::{Event, LogEvent, Value}, http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer}, internal_events::{ - EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError, + EventsReceived, HttpBadPeerConnectionError, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError, }, serde::bool_or_struct, source_sender::ClosedError, @@ -196,7 +196,21 @@ impl SourceConfig for SplunkConfig { futures_util::future::ok::<_, Infallible>(svc) }); - Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + Server::builder(hyper::server::accept::from_stream( + listener.accept_stream().filter_map(|result| async move { + match result { + Ok(stream) => Some(Ok::<_, Infallible>(stream)), + Err(err) => { + if err.is_fatal() { + warn!(message = "Fatal error accepting connection.", error = %err); + } else { + emit!(HttpBadPeerConnectionError { error: &err }); + } + None + } + } + }), + )) .serve(make_svc) .with_graceful_shutdown(shutdown.map(|_| ())) .await From d15e34e146e41a1d8a53fe0b8e02c7aadded9473 Mon Sep 17 00:00:00 2001 From: Yoon Lee Date: Thu, 16 Apr 2026 15:24:39 -0700 Subject: [PATCH 12/12] aws_kinesis_firehose, datadog_agent, opentelemetry sources to continue process after allowlist rejection --- src/sources/aws_kinesis_firehose/mod.rs | 19 +++++++++++++++++-- src/sources/datadog_agent/mod.rs | 20 +++++++++++++++++--- src/sources/opentelemetry/http.rs | 20 +++++++++++++++++--- 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index ece8ced40..c94c28ee9 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -1,7 +1,7 @@ use std::time::Duration; use std::{convert::Infallible, fmt, net::SocketAddr}; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use hyper::{service::make_service_fn, Server}; use tokio::net::TcpStream; use tower::ServiceBuilder; @@ -16,6 +16,7 @@ use vector_lib::tls::MaybeTlsIncomingStream; use vrl::value::Kind; use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer}; +use crate::internal_events::HttpBadPeerConnectionError; use crate::{ codecs::DecodingConfig, config::{ @@ -206,7 +207,21 @@ impl SourceConfig for AwsKinesisFirehoseConfig { futures_util::future::ok::<_, Infallible>(svc) }); - Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + Server::builder(hyper::server::accept::from_stream( + listener.accept_stream().filter_map(|result| async move { + match result { + Ok(stream) => Some(Ok::<_, Infallible>(stream)), + Err(err) => { + if err.is_fatal() { + warn!(message = "Fatal error accepting connection.", error = %err); + } else { + emit!(HttpBadPeerConnectionError { error: &err }); + } + None + } + } + }), + )) .serve(make_svc) .with_graceful_shutdown(shutdown.map(|_| ())) .await diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 8588910fe..ca74f22ce 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -24,7 +24,7 @@ use std::{fmt::Debug, io::Read, net::SocketAddr, sync::Arc}; use bytes::{Buf, Bytes}; use chrono::{serde::ts_milliseconds, DateTime, Utc}; use flate2::read::{MultiGzDecoder, ZlibDecoder}; -use futures::FutureExt; +use futures::{FutureExt, StreamExt}; use http::StatusCode; use hyper::service::make_service_fn; use hyper::Server; @@ -56,7 +56,7 @@ use crate::{ SourceContext, SourceOutput, }, event::Event, - internal_events::{HttpBytesReceived, HttpDecompressError, StreamClosedError}, + internal_events::{HttpBadPeerConnectionError, HttpBytesReceived, HttpDecompressError, StreamClosedError}, schema, serde::{bool_or_struct, default_decoding, default_framing_message_based}, sources::{self, util::ErrorMessage}, @@ -230,7 +230,21 @@ impl SourceConfig for DatadogAgentConfig { futures_util::future::ok::<_, Infallible>(svc) }); - Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + Server::builder(hyper::server::accept::from_stream( + listener.accept_stream().filter_map(|result| async move { + match result { + Ok(stream) => Some(Ok::<_, Infallible>(stream)), + Err(err) => { + if err.is_fatal() { + warn!(message = "Fatal error accepting connection.", error = %err); + } else { + emit!(HttpBadPeerConnectionError { error: &err }); + } + None + } + } + }), + )) .serve(make_svc) .with_graceful_shutdown(shutdown.map(|_| ())) .await diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index cadc50f70..5e65256b8 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -2,7 +2,7 @@ use std::time::Duration; use std::{convert::Infallible, net::SocketAddr}; use bytes::Bytes; -use futures_util::FutureExt; +use futures_util::{FutureExt, StreamExt}; use http::StatusCode; use hyper::{service::make_service_fn, Server}; use prost::Message; @@ -36,7 +36,7 @@ use crate::sources::util::add_headers; use crate::{ event::Event, http::build_http_trace_layer, - internal_events::{EventsReceived, StreamClosedError}, + internal_events::{EventsReceived, HttpBadPeerConnectionError, StreamClosedError}, shutdown::ShutdownSignal, sources::util::{decode, ErrorMessage}, tls::MaybeTlsSettings, @@ -83,7 +83,21 @@ pub(crate) async fn run_http_server( futures_util::future::ok::<_, Infallible>(svc) }); - Server::builder(hyper::server::accept::from_stream(listener.accept_stream())) + Server::builder(hyper::server::accept::from_stream( + listener.accept_stream().filter_map(|result| async move { + match result { + Ok(stream) => Some(Ok::<_, Infallible>(stream)), + Err(err) => { + if err.is_fatal() { + warn!(message = "Fatal error accepting connection.", error = %err); + } else { + emit!(HttpBadPeerConnectionError { error: &err }); + } + None + } + } + }), + )) .serve(make_svc) .with_graceful_shutdown(shutdown.map(|_| ())) .await?;