Skip to content
Open
4 changes: 1 addition & 3 deletions lib/vector-core/src/tls/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ impl MaybeTlsListener {
{
Ok(listener)
} else {
Err(TlsError::Connect {
source: std::io::ErrorKind::ConnectionRefused.into(),
})
Err(TlsError::DisallowedPeer)
}
} else {
Ok(listener)
Expand Down
8 changes: 8 additions & 0 deletions lib/vector-core/src/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub enum TlsError {
TcpBind { source: tokio::io::Error },
#[snafu(display("{}", source))]
Connect { source: tokio::io::Error },
#[snafu(display("Connection rejected: origin IP not in permit_origin list"))]
DisallowedPeer,
#[snafu(display("Could not get peer address: {}", source))]
PeerAddress { source: std::io::Error },
#[snafu(display("Security Framework Error: {}", source))]
Expand All @@ -134,6 +136,12 @@ pub enum TlsError {
CaStackPush { source: ErrorStack },
}

impl TlsError {
pub fn is_fatal(&self) -> bool {
!matches!(self, TlsError::DisallowedPeer)
}
}

impl MaybeTlsStream<TcpStream> {
pub fn peer_addr(&self) -> std::result::Result<SocketAddr, std::io::Error> {
match self {
Expand Down
26 changes: 26 additions & 0 deletions src/internal_events/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,32 @@ impl InternalEvent for HttpDecompressError<'_> {
}
}

#[cfg(feature = "sources-utils-http")]
pub struct HttpBadPeerConnectionError<'a> {
pub error: &'a dyn std::fmt::Display,
}

#[cfg(feature = "sources-utils-http")]
impl InternalEvent for HttpBadPeerConnectionError<'_> {
fn emit(self) {
warn!(
message = "Rejected connection from bad peer.",
error = %self.error,
error_code = "bad_peer",
error_type = error_type::CONNECTION_FAILED,
stage = error_stage::RECEIVING,
internal_log_rate_limit = true,
);
counter!(
"component_errors_total",
"error_code" => "bad_peer",
"error_type" => error_type::CONNECTION_FAILED,
"stage" => error_stage::RECEIVING,
)
.increment(1);
}
}

pub struct HttpInternalError<'a> {
pub message: &'a str,
}
Expand Down
129 changes: 127 additions & 2 deletions src/sources/aws_kinesis_firehose/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use std::time::Duration;
use std::{convert::Infallible, fmt, net::SocketAddr};

use futures::FutureExt;
use futures::{FutureExt, StreamExt};
use hyper::{service::make_service_fn, Server};
use tokio::net::TcpStream;
use tower::ServiceBuilder;
use tracing::Span;
use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::ipallowlist::IpAllowlistConfig;
use vector_lib::lookup::owned_value_path;
use vector_lib::sensitive_string::SensitiveString;
use vector_lib::tls::MaybeTlsIncomingStream;
use vrl::value::Kind;

use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer};
use crate::internal_events::HttpBadPeerConnectionError;
use crate::{
codecs::DecodingConfig,
config::{
Expand Down Expand Up @@ -104,6 +106,9 @@ pub struct AwsKinesisFirehoseConfig {
#[configurable(derived)]
#[serde(default)]
keepalive: KeepaliveConfig,

#[configurable(derived)]
pub permit_origin: Option<IpAllowlistConfig>,
}

const fn access_keys_example() -> [&'static str; 2] {
Expand Down Expand Up @@ -181,6 +186,8 @@ impl SourceConfig for AwsKinesisFirehoseConfig {

let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
let listener = tls.bind(&self.address).await?;
let listener = listener
.with_allowlist(self.permit_origin.clone().map(Into::into));

let keepalive_settings = self.keepalive.clone();
let shutdown = cx.shutdown;
Expand All @@ -200,7 +207,21 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
futures_util::future::ok::<_, Infallible>(svc)
});

Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
Server::builder(hyper::server::accept::from_stream(
listener.accept_stream().filter_map(|result| async move {
match result {
Ok(stream) => Some(Ok::<_, Infallible>(stream)),
Err(err) => {
if err.is_fatal() {
warn!(message = "Fatal error accepting connection.", error = %err);
} else {
emit!(HttpBadPeerConnectionError { error: &err });
}
None
}
}
}),
))
.serve(make_svc)
.with_graceful_shutdown(shutdown.map(|_| ()))
.await
Expand Down Expand Up @@ -261,6 +282,7 @@ impl GenerateConfig for AwsKinesisFirehoseConfig {
acknowledgements: Default::default(),
log_namespace: None,
keepalive: Default::default(),
permit_origin: None,
})
.unwrap()
}
Expand Down Expand Up @@ -354,6 +376,7 @@ mod tests {
acknowledgements: true.into(),
log_namespace: Some(log_namespace),
keepalive: Default::default(),
permit_origin: None,
}
.build(cx)
.await
Expand Down Expand Up @@ -937,4 +960,106 @@ mod tests {
.get("aws_kinesis_firehose_access_key")
.is_none());
}

#[tokio::test]
async fn permit_origin_blocks_non_matching_ip() {
Comment thread
yoonlee-s1 marked this conversation as resolved.
use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig};
use tokio::time::{timeout, Duration};
use futures::StreamExt;

let (sender, mut recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
let address = next_addr();
let cx = SourceContext::new_test(sender, None);

let permit_origin = Some(IpAllowlistConfig(vec![
IpNetConfig("10.0.0.1/32".parse().unwrap()),
]));

tokio::spawn(async move {
AwsKinesisFirehoseConfig {
address,
tls: None,
access_key: None,
access_keys: None,
store_access_key: false,
record_compression: Compression::None,
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: true.into(),
log_namespace: None,
keepalive: Default::default(),
permit_origin,
}
.build(cx)
.await
.unwrap()
.await
.unwrap()
});
wait_for_tcp(address).await;

// Send from localhost — should be blocked by allowlist
let _ = reqwest::Client::new()
.post(format!("http://{}", address))
.header("x-amz-firehose-protocol-version", "1.0")
.header("x-amz-firehose-request-id", REQUEST_ID)
.header("x-amz-firehose-source-arn", SOURCE_ARN)
.header("content-type", "application/json")
.body(r#"{"requestId":"test","timestamp":1234567890,"records":[{"data":"dGVzdA=="}]}"#)
.send()
.await;

let result = timeout(Duration::from_millis(200), recv.next()).await;
assert!(result.is_err(), "expected no events from blocked IP");
}

#[tokio::test]
async fn permit_origin_allows_matching_ip() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Often it is possible to avoid so much duplication. One way I can think of is to create a helper that accepts permit-origin fragment (or in case of toml config-parsing tests, accepts the entire toml string) and returns the receive channel. Then the test can simply recv the message or assert timeout.

use vector_lib::ipallowlist::{IpAllowlistConfig, IpNetConfig};

let (sender, _recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
let address = next_addr();
let cx = SourceContext::new_test(sender, None);

let permit_origin = Some(IpAllowlistConfig(vec![
IpNetConfig("127.0.0.1/32".parse().unwrap()),
]));

tokio::spawn(async move {
AwsKinesisFirehoseConfig {
address,
tls: None,
access_key: None,
access_keys: None,
store_access_key: false,
record_compression: Compression::None,
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: true.into(),
log_namespace: None,
keepalive: Default::default(),
permit_origin,
}
.build(cx)
.await
.unwrap()
.await
.unwrap()
});
wait_for_tcp(address).await;

// Send from localhost — should be accepted by allowlist
let response = reqwest::Client::new()
.post(format!("http://{}", address))
.header("x-amz-firehose-protocol-version", "1.0")
.header("x-amz-firehose-request-id", REQUEST_ID)
.header("x-amz-firehose-source-arn", SOURCE_ARN)
.header("content-type", "application/json")
.body(r#"{"requestId":"test","timestamp":1234567890,"records":[{"data":"dGVzdA=="}]}"#)
.send()
.await;

assert!(response.is_ok(), "expected connection to be accepted for allowed IP");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert the message was received (don't ignore _recv).

}

}
27 changes: 24 additions & 3 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{fmt::Debug, io::Read, net::SocketAddr, sync::Arc};
use bytes::{Buf, Bytes};
use chrono::{serde::ts_milliseconds, DateTime, Utc};
use flate2::read::{MultiGzDecoder, ZlibDecoder};
use futures::FutureExt;
use futures::{FutureExt, StreamExt};
use http::StatusCode;
use hyper::service::make_service_fn;
use hyper::Server;
Expand All @@ -37,6 +37,7 @@ use tracing::Span;
use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
use vector_lib::config::{LegacyKey, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::ipallowlist::IpAllowlistConfig;
use vector_lib::event::{BatchNotifier, BatchStatus};
use vector_lib::internal_event::{EventsReceived, Registered};
use vector_lib::lookup::owned_value_path;
Expand All @@ -55,7 +56,7 @@ use crate::{
SourceContext, SourceOutput,
},
event::Event,
internal_events::{HttpBytesReceived, HttpDecompressError, StreamClosedError},
internal_events::{HttpBadPeerConnectionError, HttpBytesReceived, HttpDecompressError, StreamClosedError},
schema,
serde::{bool_or_struct, default_decoding, default_framing_message_based},
sources::{self, util::ErrorMessage},
Expand Down Expand Up @@ -141,6 +142,9 @@ pub struct DatadogAgentConfig {
#[configurable(derived)]
#[serde(default)]
keepalive: KeepaliveConfig,

#[configurable(derived)]
pub permit_origin: Option<IpAllowlistConfig>,
}

impl GenerateConfig for DatadogAgentConfig {
Expand All @@ -159,6 +163,7 @@ impl GenerateConfig for DatadogAgentConfig {
parse_ddtags: false,
log_namespace: Some(false),
keepalive: KeepaliveConfig::default(),
permit_origin: None,
})
.unwrap()
}
Expand Down Expand Up @@ -190,6 +195,8 @@ impl SourceConfig for DatadogAgentConfig {
self.parse_ddtags,
);
let listener = tls.bind(&self.address).await?;
let listener = listener
.with_allowlist(self.permit_origin.clone().map(Into::into));
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
let filters = source.build_warp_filters(cx.out, acknowledgements, self)?;
let shutdown = cx.shutdown;
Expand Down Expand Up @@ -223,7 +230,21 @@ impl SourceConfig for DatadogAgentConfig {
futures_util::future::ok::<_, Infallible>(svc)
});

Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
Server::builder(hyper::server::accept::from_stream(
listener.accept_stream().filter_map(|result| async move {
match result {
Ok(stream) => Some(Ok::<_, Infallible>(stream)),
Err(err) => {
if err.is_fatal() {
warn!(message = "Fatal error accepting connection.", error = %err);
} else {
emit!(HttpBadPeerConnectionError { error: &err });
}
None
}
}
}),
))
.serve(make_svc)
.with_graceful_shutdown(shutdown.map(|_| ()))
.await
Expand Down
Loading