From 25275c16bb44647189bb89f6c93bda551776a247 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Wed, 4 Dec 2024 15:28:15 +0100 Subject: [PATCH 1/4] fix: changed internal architecture to avoid high memory usage Signed-off-by: Gabriele Baldoni --- Cargo.lock | 66 +++++++++++++- Cargo.toml | 1 + README.md | 2 +- examples/builder.rs | 2 +- examples/layer.rs | 2 +- examples/shutdown.rs | 2 +- src/builder.rs | 41 ++++++++- src/lib.rs | 205 ++++++++++++------------------------------- 8 files changed, 161 insertions(+), 160 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 765d4a9..37b7348 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,6 +114,18 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -184,6 +196,19 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "wasi", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.12" @@ -379,9 +404,19 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.120" +version = "0.2.167" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09" +checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" + +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] [[package]] name = "log" @@ -454,6 +489,15 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.8" @@ -720,7 +764,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -763,6 +807,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "sct" version = "0.7.0" @@ -882,6 +932,15 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "syn" version = "1.0.89" @@ -1059,6 +1118,7 @@ dependencies = [ name = "tracing-loki" version = "0.2.5" dependencies = [ + "flume", "loki-api", "reqwest", "serde", diff --git a/Cargo.toml b/Cargo.toml index 3012d58..c4dcfb3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ tracing-log = "0.1.2" tracing-serde = "0.1.3" tracing-subscriber = "0.3.9" url = "2.2.2" +flume = "0.11.1" [dev-dependencies] tokio = { version = "1.17.0", features = ["macros", "rt-multi-thread"] } diff --git a/README.md b/README.md index b62fc92..8f98588 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ async fn main() -> Result<(), tracing_loki::Error> { // The background task needs to be spawned so the logs actually get // delivered. - tokio::spawn(task); + tokio::spawn(task.start()); tracing::info!( task = "tracing_setup", diff --git a/examples/builder.rs b/examples/builder.rs index e7bdfc5..627c019 100644 --- a/examples/builder.rs +++ b/examples/builder.rs @@ -18,7 +18,7 @@ fn tracing_setup() -> Result<(), Box> { .with(layer) .with(Layer::new()) .init(); - tokio::spawn(task); + tokio::spawn(task.start()); Ok(()) } diff --git a/examples/layer.rs b/examples/layer.rs index 447575b..6d175ce 100644 --- a/examples/layer.rs +++ b/examples/layer.rs @@ -19,7 +19,7 @@ fn tracing_setup() -> Result<(), Box> { .with(layer) .with(Layer::new()) .init(); - tokio::spawn(task); + tokio::spawn(task.start()); Ok(()) } diff --git a/examples/shutdown.rs b/examples/shutdown.rs index 8f4ee6e..95b7b85 100644 --- a/examples/shutdown.rs +++ b/examples/shutdown.rs @@ -19,7 +19,7 @@ fn tracing_setup( .with(layer) .with(Layer::new()) .init(); - Ok((controller, tokio::spawn(task))) + Ok((controller, tokio::spawn(task.start()))) } #[tokio::main(flavor = "current_thread")] diff --git a/src/builder.rs b/src/builder.rs index 71449d4..e756175 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -7,8 +7,11 @@ use super::FormattedLabels; use super::Layer; use std::collections::hash_map; use std::collections::HashMap; +use std::time::Duration; use url::Url; +const DEFAULT_BACKGROUD_TASK_BACKOFF: u64 = 500; + /// Create a [`Builder`] for constructing a [`Layer`] and its corresponding /// [`BackgroundTask`]. /// @@ -23,6 +26,7 @@ pub fn builder() -> Builder { labels: FormattedLabels::new(), extra_fields: HashMap::new(), http_headers, + backoff: Duration::from_millis(DEFAULT_BACKGROUD_TASK_BACKOFF), } } @@ -35,6 +39,7 @@ pub struct Builder { labels: FormattedLabels, extra_fields: HashMap, http_headers: reqwest::header::HeaderMap, + backoff: Duration, } impl Builder { @@ -143,6 +148,26 @@ impl Builder { } Ok(self) } + + /// Set the backoff used by the backgroud process. + /// + /// # Example + /// + /// ``` + /// # use tracing_loki::Error; + /// # use std::time::Duration; + /// # fn main() -> Result<(), Error> { + /// let builder = tracing_loki::builder() + /// // Set the period of pushing to Loki. + /// .backoff(Duration::from_millis(100) + /// # Ok(()) + /// # } + /// ``` + pub fn backoff(mut self, backoff: Duration) -> Builder { + self.backoff = backoff; + self + } + /// Build the tracing [`Layer`] and its corresponding [`BackgroundTask`]. /// /// The `loki_url` is the URL of the Loki server, like @@ -164,7 +189,13 @@ impl Builder { sender, extra_fields: self.extra_fields, }, - BackgroundTask::new(loki_url, self.http_headers, receiver, &self.labels)?, + BackgroundTask::new( + loki_url, + self.http_headers, + receiver, + &self.labels, + self.backoff, + )?, )) } /// Build the tracing [`Layer`], [`BackgroundTask`] and its @@ -196,7 +227,13 @@ impl Builder { extra_fields: self.extra_fields, }, BackgroundTaskController { sender }, - BackgroundTask::new(loki_url, self.http_headers, receiver, &self.labels)?, + BackgroundTask::new( + loki_url, + self.http_headers, + receiver, + &self.labels, + self.backoff, + )?, )) } } diff --git a/src/lib.rs b/src/lib.rs index f0b01a3..b8f4490 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ //! //! // The background task needs to be spawned so the logs actually get //! // delivered. -//! tokio::spawn(task); +//! tokio::spawn(task.start()); //! //! tracing::info!( //! task = "tracing_setup", @@ -53,24 +53,16 @@ compile_error!( /// Use this to avoid depending on a potentially-incompatible `url` version yourself. pub extern crate url; +use flume::{Receiver, Sender}; use loki_api::logproto as loki; use loki_api::prost; use serde::Serialize; -use std::cmp; use std::collections::HashMap; use std::error; use std::fmt; -use std::future::Future; use std::mem; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; use std::time::Duration; use std::time::SystemTime; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::Stream; -use tracing::instrument::WithSubscriber; use tracing_core::field::Field; use tracing_core::field::Visit; use tracing_core::span::Attributes; @@ -87,7 +79,6 @@ use url::Url; use labels::FormattedLabels; use level_map::LevelMap; use log_support::SerializeEventFieldMapStrippingLog; -use no_subscriber::NoSubscriber; use ErrorInner as ErrorI; pub use builder::builder; @@ -103,11 +94,8 @@ mod no_subscriber; #[doc = include_str!("../README.md")] struct ReadmeDoctests; -fn event_channel() -> ( - mpsc::Sender>, - mpsc::Receiver>, -) { - mpsc::channel(512) +fn event_channel() -> (Sender>, Receiver>) { + flume::bounded(512) } /// The error type for constructing a [`Layer`]. @@ -195,7 +183,7 @@ impl fmt::Display for ErrorInner { /// /// // The background task needs to be spawned so the logs actually get /// // delivered. -/// tokio::spawn(task); +/// tokio::spawn(task.start()); /// /// tracing::info!( /// task = "tracing_setup", @@ -230,9 +218,10 @@ pub fn layer( /// See the crate's root documentation for an example. pub struct Layer { extra_fields: HashMap, - sender: mpsc::Sender>, + sender: Sender>, } +#[allow(dead_code)] struct LokiEvent { trigger_send: bool, timestamp: SystemTime, @@ -378,18 +367,7 @@ impl SendQueue { self.sending.clear(); len } - fn on_send_result(&mut self, result: Result<(), ()>) { - match result { - Ok(()) => self.sending.clear(), - Err(()) => { - self.sending.append(&mut self.to_send); - mem::swap(&mut self.sending, &mut self.to_send); - } - } - } - fn should_send(&self) -> bool { - self.to_send.iter().any(|e| e.trigger_send) - } + fn prepare_sending(&mut self) -> loki::StreamAdapter { if !self.sending.is_empty() { panic!("can only prepare sending while no request is in flight"); @@ -441,26 +419,23 @@ impl error::Error for BadRedirect {} /// See the crate's root documentation for an example. pub struct BackgroundTask { loki_url: Url, - receiver: ReceiverStream>, + receiver: Receiver>, queues: LevelMap, buffer: Buffer, http_client: reqwest::Client, - backoff_count: u32, - backoff: Option>>, - quitting: bool, - send_task: - Option>> + Send + 'static>>>, + backoff: Duration, } impl BackgroundTask { fn new( loki_url: Url, http_headers: reqwest::header::HeaderMap, - receiver: mpsc::Receiver>, + receiver: Receiver>, labels: &FormattedLabels, + backoff: Duration, ) -> Result { Ok(BackgroundTask { - receiver: ReceiverStream::new(receiver), + receiver, loki_url: loki_url .join("loki/api/v1/push") .map_err(|_| Error(ErrorI::InvalidLokiUrl))?, @@ -483,126 +458,54 @@ impl BackgroundTask { })) .build() .expect("reqwest client builder"), - backoff_count: 0, - backoff: None, - quitting: false, - send_task: None, + backoff, }) } - fn backoff_time(&self) -> (bool, Duration) { - let backoff_time = if self.backoff_count >= 1 { - Duration::from_millis( - 500u64 - .checked_shl(self.backoff_count - 1) - .unwrap_or(u64::MAX), - ) - } else { - Duration::from_millis(0) - }; - ( - backoff_time >= Duration::from_secs(30), - cmp::min(backoff_time, Duration::from_secs(600)), - ) - } -} - -impl Future for BackgroundTask { - type Output = (); - fn poll(mut self: Pin<&mut BackgroundTask>, cx: &mut Context<'_>) -> Poll<()> { - let mut default_guard = tracing::subscriber::set_default(NoSubscriber::default()); - while let Poll::Ready(maybe_maybe_item) = Pin::new(&mut self.receiver).poll_next(cx) { - match maybe_maybe_item { - Some(Some(item)) => self.queues[item.level].push(item), - Some(None) => self.quitting = true, // Explicit close. - None => self.quitting = true, // The sender was dropped. - } - } - - let mut backing_off = if let Some(backoff) = &mut self.backoff { - matches!(Pin::new(backoff).poll(cx), Poll::Pending) - } else { - false - }; - if !backing_off { - self.backoff = None; - } + /// Does something + pub async fn start(mut self) { loop { - if let Some(send_task) = &mut self.send_task { - match Pin::new(send_task).poll(cx) { - Poll::Ready(res) => { - if let Err(e) = &res { - let (drop_outstanding, backoff_time) = self.backoff_time(); - drop(default_guard); - tracing::error!( - error_count = self.backoff_count + 1, - ?backoff_time, - error = %e, - "couldn't send logs to loki", - ); - default_guard = - tracing::subscriber::set_default(NoSubscriber::default()); - if drop_outstanding { - let num_dropped: usize = - self.queues.values_mut().map(|q| q.drop_outstanding()).sum(); - drop(default_guard); - tracing::error!( - num_dropped, - "dropped outstanding messages due to sending errors", - ); - default_guard = - tracing::subscriber::set_default(NoSubscriber::default()); - } - self.backoff = Some(Box::pin(tokio::time::sleep(backoff_time))); - self.backoff_count += 1; - backing_off = true; - } else { - self.backoff_count = 0; - } - let res = res.map_err(|_| ()); - for q in self.queues.values_mut() { - q.on_send_result(res); - } - self.send_task = None; + // get everything form the channel + while let Ok(msg) = self.receiver.try_recv() { + match msg { + Some(event) => { + // push somewhere + self.queues[event.level].push(event); + } + None => { + // explicit exit + return; } - Poll::Pending => {} } } - if self.send_task.is_none() - && !backing_off - && self.queues.values().any(|q| q.should_send()) + // send the things to loki + let streams = self + .queues + .values_mut() + .map(|q| q.prepare_sending()) + .filter(|s| !s.entries.is_empty()) + .collect(); + let body = self + .buffer + .encode(&loki::PushRequest { streams }) + .to_owned(); + let request_builder = self.http_client.post(self.loki_url.clone()); + if let Ok(res) = request_builder + .header(reqwest::header::CONTENT_TYPE, "application/x-snappy") + .body(body) + .send() + .await { - let streams = self - .queues - .values_mut() - .map(|q| q.prepare_sending()) - .filter(|s| !s.entries.is_empty()) - .collect(); - let body = self - .buffer - .encode(&loki::PushRequest { streams }) - .to_owned(); - let request_builder = self.http_client.post(self.loki_url.clone()); - self.send_task = Some(Box::pin( - async move { - request_builder - .header(reqwest::header::CONTENT_TYPE, "application/x-snappy") - .body(body) - .send() - .await? - .error_for_status()?; - Ok(()) - } - .with_subscriber(NoSubscriber::default()), - )); - } else { - break; + if let Err(e) = res.error_for_status() { + tracing::error!("Cannot send {e:?}"); + } } - } - if self.quitting && self.send_task.is_none() { - Poll::Ready(()) - } else { - Poll::Pending + // drop sent messages + self.queues.values_mut().for_each(|q| { + q.drop_outstanding(); + }); + // sleep before next iteration + tokio::time::sleep(self.backoff).await; } } } @@ -646,13 +549,13 @@ impl Buffer { /// /// It'll still try to send all available data and then quit. pub struct BackgroundTaskController { - sender: mpsc::Sender>, + sender: Sender>, } impl BackgroundTaskController { /// Shut down the associated `BackgroundTask`. pub async fn shutdown(&self) { // Ignore the error. If no one is listening, it already shut down. - let _ = self.sender.send(None).await; + let _ = self.sender.send_async(None).await; } } From c8bc943decce209373a136c89b05f12f5fcfa145 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Wed, 4 Dec 2024 16:00:13 +0100 Subject: [PATCH 2/4] chore: fix docstring example Signed-off-by: Gabriele Baldoni --- src/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/builder.rs b/src/builder.rs index e756175..205815e 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -159,7 +159,7 @@ impl Builder { /// # fn main() -> Result<(), Error> { /// let builder = tracing_loki::builder() /// // Set the period of pushing to Loki. - /// .backoff(Duration::from_millis(100) + /// .backoff(Duration::from_millis(100)); /// # Ok(()) /// # } /// ``` From 47e30f487e6aa46e288a9c88f095e13f2156c641 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Wed, 4 Dec 2024 16:16:09 +0100 Subject: [PATCH 3/4] feat(channel-size): adding channel size configuration in builder Signed-off-by: Gabriele Baldoni --- src/builder.rs | 27 +++++++++++++++++++++++++-- src/lib.rs | 4 ++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 205815e..3ef7f9c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -11,6 +11,7 @@ use std::time::Duration; use url::Url; const DEFAULT_BACKGROUD_TASK_BACKOFF: u64 = 500; +const DEFAULT_CHANNEL_CAP: usize = 512; /// Create a [`Builder`] for constructing a [`Layer`] and its corresponding /// [`BackgroundTask`]. @@ -27,6 +28,7 @@ pub fn builder() -> Builder { extra_fields: HashMap::new(), http_headers, backoff: Duration::from_millis(DEFAULT_BACKGROUD_TASK_BACKOFF), + channel_cap: DEFAULT_CHANNEL_CAP, } } @@ -40,6 +42,7 @@ pub struct Builder { extra_fields: HashMap, http_headers: reqwest::header::HeaderMap, backoff: Duration, + channel_cap: usize, } impl Builder { @@ -168,6 +171,26 @@ impl Builder { self } + /// Set the size of the internal event channel. + /// This has an impact on RAM usage. + /// + /// # Example + /// + /// ``` + /// # use tracing_loki::Error; + /// # use std::time::Duration; + /// # fn main() -> Result<(), Error> { + /// let builder = tracing_loki::builder() + /// // Set the period of pushing to Loki. + /// .channel_cap(1024); + /// # Ok(()) + /// # } + /// ``` + pub fn channel_cap(mut self, channel_cap: usize) -> Builder { + self.channel_cap = channel_cap; + self + } + /// Build the tracing [`Layer`] and its corresponding [`BackgroundTask`]. /// /// The `loki_url` is the URL of the Loki server, like @@ -183,7 +206,7 @@ impl Builder { /// /// See the crate's root documentation for an example. pub fn build_url(self, loki_url: Url) -> Result<(Layer, BackgroundTask), Error> { - let (sender, receiver) = event_channel(); + let (sender, receiver) = event_channel(self.channel_cap); Ok(( Layer { sender, @@ -220,7 +243,7 @@ impl Builder { self, loki_url: Url, ) -> Result<(Layer, BackgroundTaskController, BackgroundTask), Error> { - let (sender, receiver) = event_channel(); + let (sender, receiver) = event_channel(self.channel_cap); Ok(( Layer { sender: sender.clone(), diff --git a/src/lib.rs b/src/lib.rs index b8f4490..b4f4c7d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,8 +94,8 @@ mod no_subscriber; #[doc = include_str!("../README.md")] struct ReadmeDoctests; -fn event_channel() -> (Sender>, Receiver>) { - flume::bounded(512) +fn event_channel(cap: usize) -> (Sender>, Receiver>) { + flume::bounded(cap) } /// The error type for constructing a [`Layer`]. From 217e18a7b80d1fb026097af9a1545e86ad499c89 Mon Sep 17 00:00:00 2001 From: Gabriele Baldoni Date: Wed, 4 Dec 2024 16:33:07 +0100 Subject: [PATCH 4/4] fix: remove API change for BackgroudTask Signed-off-by: Gabriele Baldoni --- README.md | 2 +- examples/builder.rs | 2 +- examples/layer.rs | 2 +- examples/shutdown.rs | 2 +- src/builder.rs | 40 ++++++++++++++++++++++++---------------- src/lib.rs | 12 +++++++++--- 6 files changed, 37 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 8f98588..b62fc92 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ async fn main() -> Result<(), tracing_loki::Error> { // The background task needs to be spawned so the logs actually get // delivered. - tokio::spawn(task.start()); + tokio::spawn(task); tracing::info!( task = "tracing_setup", diff --git a/examples/builder.rs b/examples/builder.rs index 627c019..e7bdfc5 100644 --- a/examples/builder.rs +++ b/examples/builder.rs @@ -18,7 +18,7 @@ fn tracing_setup() -> Result<(), Box> { .with(layer) .with(Layer::new()) .init(); - tokio::spawn(task.start()); + tokio::spawn(task); Ok(()) } diff --git a/examples/layer.rs b/examples/layer.rs index 6d175ce..447575b 100644 --- a/examples/layer.rs +++ b/examples/layer.rs @@ -19,7 +19,7 @@ fn tracing_setup() -> Result<(), Box> { .with(layer) .with(Layer::new()) .init(); - tokio::spawn(task.start()); + tokio::spawn(task); Ok(()) } diff --git a/examples/shutdown.rs b/examples/shutdown.rs index 95b7b85..8f4ee6e 100644 --- a/examples/shutdown.rs +++ b/examples/shutdown.rs @@ -19,7 +19,7 @@ fn tracing_setup( .with(layer) .with(Layer::new()) .init(); - Ok((controller, tokio::spawn(task.start()))) + Ok((controller, tokio::spawn(task))) } #[tokio::main(flavor = "current_thread")] diff --git a/src/builder.rs b/src/builder.rs index 3ef7f9c..9f19ae1 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,3 +1,5 @@ +use crate::BackgroundTaskFuture; + use super::event_channel; use super::BackgroundTask; use super::BackgroundTaskController; @@ -205,20 +207,23 @@ impl Builder { /// appending `/loki/api/v1/push`. /// /// See the crate's root documentation for an example. - pub fn build_url(self, loki_url: Url) -> Result<(Layer, BackgroundTask), Error> { + pub fn build_url(self, loki_url: Url) -> Result<(Layer, BackgroundTaskFuture), Error> { let (sender, receiver) = event_channel(self.channel_cap); Ok(( Layer { sender, extra_fields: self.extra_fields, }, - BackgroundTask::new( - loki_url, - self.http_headers, - receiver, - &self.labels, - self.backoff, - )?, + Box::pin( + BackgroundTask::new( + loki_url, + self.http_headers, + receiver, + &self.labels, + self.backoff, + )? + .start(), + ), )) } /// Build the tracing [`Layer`], [`BackgroundTask`] and its @@ -242,7 +247,7 @@ impl Builder { pub fn build_controller_url( self, loki_url: Url, - ) -> Result<(Layer, BackgroundTaskController, BackgroundTask), Error> { + ) -> Result<(Layer, BackgroundTaskController, BackgroundTaskFuture), Error> { let (sender, receiver) = event_channel(self.channel_cap); Ok(( Layer { @@ -250,13 +255,16 @@ impl Builder { extra_fields: self.extra_fields, }, BackgroundTaskController { sender }, - BackgroundTask::new( - loki_url, - self.http_headers, - receiver, - &self.labels, - self.backoff, - )?, + Box::pin( + BackgroundTask::new( + loki_url, + self.http_headers, + receiver, + &self.labels, + self.backoff, + )? + .start(), + ), )) } } diff --git a/src/lib.rs b/src/lib.rs index b4f4c7d..ee5a741 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ //! //! // The background task needs to be spawned so the logs actually get //! // delivered. -//! tokio::spawn(task.start()); +//! tokio::spawn::(task); //! //! tracing::info!( //! task = "tracing_setup", @@ -60,7 +60,9 @@ use serde::Serialize; use std::collections::HashMap; use std::error; use std::fmt; +use std::future::Future; use std::mem; +use std::pin::Pin; use std::time::Duration; use std::time::SystemTime; use tracing_core::field::Field; @@ -84,6 +86,9 @@ use ErrorInner as ErrorI; pub use builder::builder; pub use builder::Builder; +/// Wrapper around the future running in the [`BackgroundTask`] +pub type BackgroundTaskFuture = Pin + Send + Sync>>; + mod builder; mod labels; mod level_map; @@ -164,6 +169,7 @@ impl fmt::Display for ErrorInner { /// ```rust /// use tracing_subscriber::layer::SubscriberExt; /// use tracing_subscriber::util::SubscriberInitExt; +/// use tracing_loki::BackgroundTaskFuture; /// use url::Url; /// /// #[tokio::main] @@ -183,7 +189,7 @@ impl fmt::Display for ErrorInner { /// /// // The background task needs to be spawned so the logs actually get /// // delivered. -/// tokio::spawn(task.start()); +/// tokio::spawn::(task); /// /// tracing::info!( /// task = "tracing_setup", @@ -198,7 +204,7 @@ pub fn layer( loki_url: Url, labels: HashMap, extra_fields: HashMap, -) -> Result<(Layer, BackgroundTask), Error> { +) -> Result<(Layer, BackgroundTaskFuture), Error> { let mut builder = builder(); for (key, value) in labels { builder = builder.label(key, value)?;