Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/fix_metric_set_memory_growth.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- Add a TTL-based cache for metrics sets
- Add `expire_metrics_secs` config for Prometheus remote write sink which uses the TTL-based cache
- This fixes an issue where incremental metrics are preserved for the lifetime of Vector's runtime, which causes
indefinite memory growth

authors: GreyLilac09
9 changes: 9 additions & 0 deletions src/sinks/prometheus/remote_write/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ pub struct RemoteWriteConfig {
#[configurable(metadata(docs::advanced))]
pub tenant_id: Option<Template>,

/// The amount of time, in seconds, that incremental metrics will persist in the internal metrics cache
/// after having not been updated before they expire and are removed.
///
/// If unset, sending unique incremental metrics to this sink will cause indefinite memory growth.
#[serde(skip_serializing_if = "crate::serde::is_default")]
#[configurable(metadata(docs::common = false, docs::required = false))]
pub expire_metrics_secs: Option<f64>,

#[configurable(derived)]
pub tls: Option<TlsConfig>,

Expand Down Expand Up @@ -202,6 +210,7 @@ impl SinkConfig for RemoteWriteConfig {
buckets,
quantiles,
default_namespace,
expire_metrics_secs: self.expire_metrics_secs,
service,
};

Expand Down
4 changes: 3 additions & 1 deletion src/sinks/prometheus/remote_write/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub(super) struct RemoteWriteSink<S> {
pub(super) default_namespace: Option<String>,
pub(super) buckets: Vec<f64>,
pub(super) quantiles: Vec<f64>,
pub(super) expire_metrics_secs: Option<f64>,
pub(super) service: S,
}

Expand All @@ -172,10 +173,11 @@ where
let batch_settings = self.batch_settings;
let tenant_id = self.tenant_id.clone();
let service = self.service;
let expire_metrics_secs = self.expire_metrics_secs;

input
.filter_map(|event| future::ready(event.try_into_metric()))
.normalized_with_default::<PrometheusMetricNormalize>()
.normalized_with_ttl::<PrometheusMetricNormalize>(expire_metrics_secs)
.filter_map(move |event| {
future::ready(make_remote_write_event(tenant_id.as_ref(), event))
})
Expand Down
227 changes: 187 additions & 40 deletions src/sinks/util/buffer/metrics/normalize.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use indexmap::IndexMap;

use std::time::{Duration, Instant};

use vector_lib::event::{
metric::{MetricData, MetricSeries},
EventMetadata, Metric, MetricKind,
Expand Down Expand Up @@ -46,6 +48,14 @@ pub struct MetricNormalizer<N> {
}

impl<N> MetricNormalizer<N> {
/// Creates a new normalizer with TTL policy.
pub fn with_ttl(normalizer: N, ttl: TtlPolicy) -> Self {
Self {
state: MetricSet::with_ttl_policy(ttl),
normalizer,
}
}

/// Gets a mutable reference to the current metric state for this normalizer.
pub fn get_state_mut(&mut self) -> &mut MetricSet {
&mut self.state
Expand Down Expand Up @@ -79,46 +89,178 @@ impl<N> From<N> for MetricNormalizer<N> {
}
}

type MetricEntry = (MetricData, EventMetadata);
/// Represents a stored metric entry with its data, metadata, and optional timestamp.
#[derive(Clone, Debug)]
pub struct MetricEntry {
/// The metric data containing the value and kind
pub data: MetricData,
/// Event metadata associated with this metric
pub metadata: EventMetadata,
/// Optional timestamp for TTL tracking
pub timestamp: Option<Instant>,
}

impl MetricEntry {
/// Creates a new MetricEntry with the given data, metadata, and timestamp.
pub const fn new(
data: MetricData,
metadata: EventMetadata,
timestamp: Option<Instant>,
) -> Self {
Self {
data,
metadata,
timestamp,
}
}

/// Creates a new MetricEntry from a Metric and optional timestamp.
pub fn from_metric(metric: Metric, timestamp: Option<Instant>) -> (MetricSeries, Self) {
let (series, data, metadata) = metric.into_parts();
let entry = Self::new(data, metadata, timestamp);
(series, entry)
}

/// Converts this entry back to a Metric with the given series.
pub fn into_metric(self, series: MetricSeries) -> Metric {
Metric::from_parts(series, self.data, self.metadata)
}

/// Updates this entry's timestamp.
pub const fn update_timestamp(&mut self, timestamp: Option<Instant>) {
self.timestamp = timestamp;
}
}

/// Configuration for automatic cleanup of expired entries.
#[derive(Clone, Debug)]
pub struct TtlPolicy {
/// Time-to-live for entries
pub ttl: Duration,
/// How often to run cleanup
pub cleanup_interval: Duration,
/// Last time cleanup was performed
pub(crate) last_cleanup: Instant,
}

impl TtlPolicy {
/// Creates a new cleanup configuration with TTL.
/// Cleanup interval defaults to TTL/10 with a 10-second minimum.
pub fn new(ttl: Duration) -> Self {
Self {
ttl,
cleanup_interval: ttl.div_f32(10.0).max(Duration::from_secs(10)),
last_cleanup: Instant::now(),
}
}

/// Checks if it's time to run cleanup.
pub fn should_cleanup(&self) -> bool {
Instant::now().duration_since(self.last_cleanup) >= self.cleanup_interval
}

/// Marks cleanup as having been performed.
pub fn mark_cleanup_done(&mut self) {
self.last_cleanup = Instant::now();
}
}

/// Metric storage for use with normalization.
///
/// This is primarily a wrapper around [`IndexMap`] (to ensure insertion order
/// is maintained) with convenience methods to make it easier to perform
/// normalization-specific operations.
#[derive(Clone, Default, Debug)]
pub struct MetricSet(IndexMap<MetricSeries, MetricEntry>);
/// normalization-specific operations. It also includes an optional TTL policy
/// to automatically expire old entries.
#[derive(Clone, Debug, Default)]
pub struct MetricSet {
inner: IndexMap<MetricSeries, MetricEntry>,
ttl_policy: Option<TtlPolicy>,
}

impl MetricSet {
/// Creates an empty `MetricSet` with the specified capacity.
///
/// The metric set will be able to hold at least `capacity` elements without reallocating. If `capacity` is 0, the
/// metric set will not allocate.
/// Creates an empty MetricSet with the specified capacity.
pub fn with_capacity(capacity: usize) -> Self {
Self(IndexMap::with_capacity(capacity))
Self {
inner: IndexMap::with_capacity(capacity),
ttl_policy: None,
}
}

/// Creates a MetricSet with custom cleanup configuration.
pub fn with_ttl_policy(ttl_policy: TtlPolicy) -> Self {
Self {
inner: IndexMap::default(),
ttl_policy: Some(ttl_policy),
}
}

/// Gets a reference to the TTL policy configuration.
pub const fn ttl_policy(&self) -> Option<&TtlPolicy> {
self.ttl_policy.as_ref()
}

/// Gets a mutable reference to the TTL policy configuration.
pub const fn ttl_policy_mut(&mut self) -> Option<&mut TtlPolicy> {
self.ttl_policy.as_mut()
}

/// Perform periodic cleanup if enough time has passed since the last cleanup
fn maybe_cleanup(&mut self) {
// Return early if no cleanup is needed
if !self
.ttl_policy()
.is_some_and(|config| config.should_cleanup())
{
return;
}
self.cleanup_expired();
if let Some(config) = self.ttl_policy_mut() {
config.mark_cleanup_done();
}
}

/// Removes expired entries based on TTL if configured.
fn cleanup_expired(&mut self) {
let now = Instant::now();
if let Some(config) = &self.ttl_policy {
self.inner.retain(|_, entry| match entry.timestamp {
Some(ts) => now.duration_since(ts) < config.ttl,
None => true,
});
}
}

/// Returns the number of elements in the set.
pub fn len(&self) -> usize {
self.0.len()
self.inner.len()
}

fn create_timestamp(&self) -> Option<Instant> {
match self.ttl_policy() {
Some(_) => Some(Instant::now()),
_ => None,
}
}

/// Returns `true` if the set contains no elements.
/// Returns true if the set contains no elements.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
self.inner.is_empty()
}

/// Consumes this `MetricSet` and returns a vector of `Metric`.
pub fn into_metrics(self) -> Vec<Metric> {
self.0
/// Consumes this MetricSet and returns a vector of Metric.
pub fn into_metrics(mut self) -> Vec<Metric> {
// Always cleanup on final consumption
self.cleanup_expired();
self.inner
.into_iter()
.map(|(series, (data, metadata))| Metric::from_parts(series, data, metadata))
.map(|(series, entry)| entry.into_metric(series))
.collect()
}

/// Either pass the metric through as-is if absolute, or convert it
/// to absolute if incremental.
pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
self.maybe_cleanup();
match metric.kind() {
MetricKind::Absolute => Some(metric),
MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
Expand All @@ -128,6 +270,7 @@ impl MetricSet {
/// Either convert the metric to incremental if absolute, or
/// aggregate it with any previous value if already incremental.
pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
self.maybe_cleanup();
match metric.kind() {
MetricKind::Absolute => self.absolute_to_incremental(metric),
MetricKind::Incremental => Some(metric),
Expand All @@ -138,23 +281,21 @@ impl MetricSet {
/// state buffer to keep track of the value throughout the entire
/// application uptime.
fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric {
match self.0.get_mut(metric.series()) {
let timestamp = self.create_timestamp();
match self.inner.get_mut(metric.series()) {
Some(existing) => {
if existing.0.value.add(metric.value()) {
metric = metric.with_value(existing.0.value.clone());
if existing.data.value.add(metric.value()) {
metric = metric.with_value(existing.data.value.clone());
existing.update_timestamp(timestamp);
} else {
// Metric changed type, store this as the new reference value
self.0.insert(
metric.series().clone(),
(metric.data().clone(), EventMetadata::default()),
);
let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp);
self.inner.insert(series, entry);
}
}
None => {
self.0.insert(
metric.series().clone(),
(metric.data().clone(), EventMetadata::default()),
);
let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp);
self.inner.insert(series, entry);
}
}
metric.into_absolute()
Expand Down Expand Up @@ -182,42 +323,47 @@ impl MetricSet {
// introducing a small amount of lag before a metric is emitted by having to wait to see it
// again, but this is a behavior we have to observe for sinks that can only handle
// incremental updates.
match self.0.get_mut(metric.series()) {
let timestamp = self.create_timestamp();
match self.inner.get_mut(metric.series()) {
Some(reference) => {
let new_value = metric.value().clone();
// From the stored reference value, emit an increment
if metric.subtract(&reference.0) {
reference.0.value = new_value;
if metric.subtract(&reference.data) {
reference.data.value = new_value;
reference.update_timestamp(timestamp);
Some(metric.into_incremental())
} else {
// Metric changed type, store this and emit nothing
self.insert(metric);
self.insert(metric, timestamp);
None
}
}
None => {
// No reference so store this and emit nothing
self.insert(metric);
self.insert(metric, timestamp);
None
}
}
}

fn insert(&mut self, metric: Metric) {
let (series, data, metadata) = metric.into_parts();
self.0.insert(series, (data, metadata));
fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
let (series, entry) = MetricEntry::from_metric(metric, timestamp);
self.inner.insert(series, entry);
}

pub fn insert_update(&mut self, metric: Metric) {
self.maybe_cleanup();
let timestamp = self.create_timestamp();
let update = match metric.kind() {
MetricKind::Absolute => Some(metric),
MetricKind::Incremental => {
// Incremental metrics update existing entries, if present
match self.0.get_mut(metric.series()) {
match self.inner.get_mut(metric.series()) {
Some(existing) => {
let (series, data, metadata) = metric.into_parts();
if existing.0.update(&data) {
existing.1.merge(metadata);
if existing.data.update(&data) {
existing.metadata.merge(metadata);
existing.update_timestamp(timestamp);
None
} else {
warn!(message = "Metric changed type, dropping old value.", %series);
Expand All @@ -229,14 +375,15 @@ impl MetricSet {
}
};
if let Some(metric) = update {
self.insert(metric);
self.insert(metric, timestamp);
}
}

/// Removes a series from the set.
///
/// If the series existed and was removed, returns `true`. Otherwise, `false`.
pub fn remove(&mut self, series: &MetricSeries) -> bool {
self.0.shift_remove(series).is_some()
self.maybe_cleanup();
self.inner.shift_remove(series).is_some()
}
}
Loading