From 941698c666d7ef4eb6d918e54045be0424964d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Hanuszczak?= Date: Fri, 13 Mar 2026 22:46:35 +0100 Subject: [PATCH 1/3] Refactor handling of the status response. --- crates/rrg/src/lib.rs | 2 +- crates/rrg/src/response.rs | 95 ++-------------------------- crates/rrg/src/session/fleetspeak.rs | 60 ++++++++++++++---- 3 files changed, 56 insertions(+), 101 deletions(-) diff --git a/crates/rrg/src/lib.rs b/crates/rrg/src/lib.rs index 27209ab2..24318bc1 100644 --- a/crates/rrg/src/lib.rs +++ b/crates/rrg/src/lib.rs @@ -29,4 +29,4 @@ pub use ping::Ping; pub use startup::Startup; pub use request::{ParseRequestError, Request, RequestId}; -pub use response::{Item, LogBuilder, Parcel, ResponseBuilder, ResponseId, Sink}; +pub use response::{Item, LogBuilder, Parcel, ResponseId, Sink}; diff --git a/crates/rrg/src/response.rs b/crates/rrg/src/response.rs index 9452465f..d3a1da16 100644 --- a/crates/rrg/src/response.rs +++ b/crates/rrg/src/response.rs @@ -64,11 +64,11 @@ impl From for PreparedItem { /// [`Item`]: crate::response::Item pub struct Reply { /// A unique request identifier for which this item was yielded. - request_id: RequestId, + pub request_id: RequestId, /// A unique response identifier of this item. - response_id: ResponseId, + pub response_id: ResponseId, /// An actual item that the action yielded. - item: PreparedItem, + pub item: PreparedItem, } impl Reply { @@ -112,13 +112,13 @@ impl Reply { /// succeeded and error details in case it did not. pub struct Status { /// A unique request identifier for which this status is generated. - request_id: RequestId, + pub request_id: RequestId, /// A unique response identifier of this status. - response_id: ResponseId, + pub response_id: ResponseId, /// Number of items that have been rejected by filters. - filtered_out_count: u32, + pub filtered_out_count: u32, /// The action execution status. - result: Result<(), crate::session::Error>, + pub result: Result<(), crate::session::Error>, } impl Status { @@ -199,16 +199,6 @@ impl<'r, 'a> Log<'r, 'a> { } } -/// An action reply message after applying filters to the contained item. -pub enum FilteredReply { - /// Item passed the filters and can be sent as a reply. - Accepted(Reply), - /// Item was rejected by the filters. - Rejected, - /// Error occurred when applying filters to the item. - Error(crate::filter::Error), -} - // TODO(@panhania): We should have some kind of end-to-end test that verifies // that all responses sent by RRG are consecutive integers. @@ -216,77 +206,6 @@ pub enum FilteredReply { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct ResponseId(pub(super) u64); -/// Response factory for building many responses to a single request. -pub struct ResponseBuilder { - /// A unique request identifier for which we build responses. - request_id: RequestId, - /// The response identifier assigned to the next generated response. - next_response_id: ResponseId, - /// Filters to apply to the results before they are sent. - filters: FilterSet, - /// Number of items that have been rejected by filters. - filtered_out_count: u32, -} - -impl ResponseBuilder { - - /// Creates a new response builder for the specified request. - pub fn new(request_id: RequestId) -> ResponseBuilder { - ResponseBuilder { - request_id, - // Response identifiers that GRR agents use start at 1. The server - // assumes this to determine the number of expected messages when - // the status message is received. Thus, we have to replicate the - // behaviour of the existing GRR agent and start at 1 as well. - next_response_id: ResponseId(1), - filters: FilterSet::empty(), - filtered_out_count: 0, - } - } - - /// Creates a new response builder that will filter results. - pub fn with_filters(mut self, filters: FilterSet) -> ResponseBuilder { - self.filters = filters; - self - } - - /// Builds a new status response for the given action outcome. - pub fn status(self, result: crate::session::Result<()>) -> Status { - Status { - request_id: self.request_id, - // Because this method consumes the builder, we do not need to - // increment the response id. - response_id: self.next_response_id, - filtered_out_count: self.filtered_out_count, - result, - } - } - - /// Builds a new reply response for the given action item. - pub fn reply(&mut self, item: PreparedItem) -> FilteredReply - where - I: Item, - { - match self.filters.eval(&item.proto) { - Ok(true) => { - let response_id = self.next_response_id; - self.next_response_id.0 += 1; - - FilteredReply::Accepted(Reply { - request_id: self.request_id.clone(), - response_id, - item, - }) - } - Ok(false) => { - self.filtered_out_count += 1; - FilteredReply::Rejected - } - Err(error) => FilteredReply::Error(error), - } - } -} - /// Log factory for building many log responses to a single request. pub struct LogBuilder { /// A unique request identifier for which we build log responses. diff --git a/crates/rrg/src/session/fleetspeak.rs b/crates/rrg/src/session/fleetspeak.rs index a4179cd5..10454751 100644 --- a/crates/rrg/src/session/fleetspeak.rs +++ b/crates/rrg/src/session/fleetspeak.rs @@ -12,8 +12,12 @@ pub struct FleetspeakSession<'a, 'fs> { args: &'a crate::args::Args, /// Filestore of the current process (if available). filestore: Option<&'fs crate::filestore::Filestore>, - /// A builder for responses sent through Fleetspeak to the GRR server. - response_builder: crate::ResponseBuilder, + /// The response identifier assigned to the next generated response. + next_response_id: crate::ResponseId, + /// Filters to apply to the results before they are sent. + filters: crate::filter::FilterSet, + /// Number of items that have been rejected by filters. + filtered_out_count: u32, /// Number of bytes sent since the session was created. network_bytes_sent: u64, /// Number of bytes we are allowed to send within the session. @@ -59,16 +63,21 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> { info!("received request '{request_id}'"); - let response_builder = crate::ResponseBuilder::new(request_id); + // Response identifiers that GRR agents use start at 1. The server + // assumes this to determine the number of expected messages when the + // status message is received. Thus, we have to replicate the behaviour + // of the existing GRR agent and start at 1 as well. + let next_response_id = crate::ResponseId(1); let status = match request { Ok(mut request) => { - let filters = request.take_filters(); let mut session = FleetspeakSession { request_id, args, filestore, - response_builder: response_builder.with_filters(filters), + next_response_id, + filters: request.take_filters(), + filtered_out_count: 0, network_bytes_sent: 0, network_bytes_limit: request.network_bytes_limit(), real_time_start: std::time::Instant::now(), @@ -78,11 +87,25 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> { let result = crate::log::ResponseLogger::new(&request) .context(|| crate::action::dispatch(&mut session, request)); - session.response_builder.status(result) + crate::response::Status { + request_id: request_id, + // Because status is the last response to be sent (`session` + // is dropped at the end of this scope), we do not need to + // increment the response id. + response_id: session.next_response_id, + filtered_out_count: session.filtered_out_count, + result, + } }, Err(error) => { error!("invalid request '{request_id}': {error}"); - response_builder.status(Err(error.into())) + + crate::response::Status { + request_id, + response_id: next_response_id, + filtered_out_count: 0, + result: Err(crate::session::Error::from(error)), + } } }; @@ -142,11 +165,24 @@ impl<'a, 'fs> crate::session::Session for FleetspeakSession<'a, 'fs> { { let item = crate::response::PreparedItem::from(item); - use crate::response::FilteredReply::*; - let reply = match self.response_builder.reply(item) { - Accepted(reply) => reply, - Rejected => return Ok(()), - Error(error) => return Err(error.into()), + let reply = match self.filters.eval(item.as_proto()) { + Ok(true) => { + let response_id = self.next_response_id; + self.next_response_id.0 += 1; + + crate::response::Reply { + request_id: self.request_id, + response_id, + item, + } + } + Ok(false) => { + self.filtered_out_count += 1; + return Ok(()) + } + Err(error) => { + return Err(error.into()) + } }; self.network_bytes_sent += reply.send_unaccounted() as u64; From 76f5576db977c635217ebc1b1e982a240568f6d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Hanuszczak?= Date: Fri, 13 Mar 2026 22:53:00 +0100 Subject: [PATCH 2/3] Add action network bytes statistic into status. --- crates/rrg/src/response.rs | 3 +++ crates/rrg/src/session/fleetspeak.rs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/crates/rrg/src/response.rs b/crates/rrg/src/response.rs index d3a1da16..49b57b9d 100644 --- a/crates/rrg/src/response.rs +++ b/crates/rrg/src/response.rs @@ -115,6 +115,8 @@ pub struct Status { pub request_id: RequestId, /// A unique response identifier of this status. pub response_id: ResponseId, + /// Number of bytes sent during execution of the action. + pub network_bytes_sent: u64, /// Number of items that have been rejected by filters. pub filtered_out_count: u32, /// The action execution status. @@ -366,6 +368,7 @@ impl From for rrg_proto::rrg::Status { proto.set_error(error.into()); } + proto.set_network_bytes_sent(status.network_bytes_sent); proto.set_filtered_out_count(status.filtered_out_count); proto diff --git a/crates/rrg/src/session/fleetspeak.rs b/crates/rrg/src/session/fleetspeak.rs index 10454751..07fc0c12 100644 --- a/crates/rrg/src/session/fleetspeak.rs +++ b/crates/rrg/src/session/fleetspeak.rs @@ -93,6 +93,7 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> { // is dropped at the end of this scope), we do not need to // increment the response id. response_id: session.next_response_id, + network_bytes_sent: session.network_bytes_sent, filtered_out_count: session.filtered_out_count, result, } @@ -103,6 +104,7 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> { crate::response::Status { request_id, response_id: next_response_id, + network_bytes_sent: 0, filtered_out_count: 0, result: Err(crate::session::Error::from(error)), } From 509e0a1945bd0cc223e74027e39609d268eaa831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Hanuszczak?= Date: Fri, 13 Mar 2026 23:02:20 +0100 Subject: [PATCH 3/3] Add action real time statistic into status. --- crates/rrg/src/response.rs | 3 +++ crates/rrg/src/session/fleetspeak.rs | 2 ++ 2 files changed, 5 insertions(+) diff --git a/crates/rrg/src/response.rs b/crates/rrg/src/response.rs index 49b57b9d..5443f682 100644 --- a/crates/rrg/src/response.rs +++ b/crates/rrg/src/response.rs @@ -117,6 +117,8 @@ pub struct Status { pub response_id: ResponseId, /// Number of bytes sent during execution of the action. pub network_bytes_sent: u64, + /// Total real (wall) time execution of the action took. + pub real_time: std::time::Duration, /// Number of items that have been rejected by filters. pub filtered_out_count: u32, /// The action execution status. @@ -369,6 +371,7 @@ impl From for rrg_proto::rrg::Status { } proto.set_network_bytes_sent(status.network_bytes_sent); + proto.set_real_time(status.real_time.into()); proto.set_filtered_out_count(status.filtered_out_count); proto diff --git a/crates/rrg/src/session/fleetspeak.rs b/crates/rrg/src/session/fleetspeak.rs index 07fc0c12..9461c433 100644 --- a/crates/rrg/src/session/fleetspeak.rs +++ b/crates/rrg/src/session/fleetspeak.rs @@ -94,6 +94,7 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> { // increment the response id. response_id: session.next_response_id, network_bytes_sent: session.network_bytes_sent, + real_time: session.real_time_start.elapsed(), filtered_out_count: session.filtered_out_count, result, } @@ -105,6 +106,7 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> { request_id, response_id: next_response_id, network_bytes_sent: 0, + real_time: std::time::Duration::ZERO, filtered_out_count: 0, result: Err(crate::session::Error::from(error)), }