Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/rrg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ pub use ping::Ping;
pub use startup::Startup;

pub use request::{ParseRequestError, Request, RequestId};
pub use response::{Item, LogBuilder, Parcel, ResponseBuilder, ResponseId, Sink};
pub use response::{Item, LogBuilder, Parcel, ResponseId, Sink};
101 changes: 13 additions & 88 deletions crates/rrg/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ impl<I: Item> From<I> for PreparedItem<I> {
/// [`Item`]: crate::response::Item
pub struct Reply<I: Item> {
/// A unique request identifier for which this item was yielded.
request_id: RequestId,
pub request_id: RequestId,
/// A unique response identifier of this item.
response_id: ResponseId,
pub response_id: ResponseId,
/// An actual item that the action yielded.
item: PreparedItem<I>,
pub item: PreparedItem<I>,
}

impl<I: Item> Reply<I> {
Expand Down Expand Up @@ -112,13 +112,17 @@ impl<I: Item> Reply<I> {
/// succeeded and error details in case it did not.
pub struct Status {
/// A unique request identifier for which this status is generated.
request_id: RequestId,
pub request_id: RequestId,
/// A unique response identifier of this status.
response_id: ResponseId,
pub response_id: ResponseId,
/// Number of bytes sent during execution of the action.
pub network_bytes_sent: u64,
/// Total real (wall) time execution of the action took.
pub real_time: std::time::Duration,
/// Number of items that have been rejected by filters.
filtered_out_count: u32,
pub filtered_out_count: u32,
/// The action execution status.
result: Result<(), crate::session::Error>,
pub result: Result<(), crate::session::Error>,
}

impl Status {
Expand Down Expand Up @@ -199,94 +203,13 @@ impl<'r, 'a> Log<'r, 'a> {
}
}

/// An action reply message after applying filters to the contained item.
pub enum FilteredReply<I: Item> {
/// Item passed the filters and can be sent as a reply.
Accepted(Reply<I>),
/// Item was rejected by the filters.
Rejected,
/// Error occurred when applying filters to the item.
Error(crate::filter::Error),
}

// TODO(@panhania): We should have some kind of end-to-end test that verifies
// that all responses sent by RRG are consecutive integers.

/// A unique identifier of a response.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct ResponseId(pub(super) u64);

/// Response factory for building many responses to a single request.
pub struct ResponseBuilder {
/// A unique request identifier for which we build responses.
request_id: RequestId,
/// The response identifier assigned to the next generated response.
next_response_id: ResponseId,
/// Filters to apply to the results before they are sent.
filters: FilterSet,
/// Number of items that have been rejected by filters.
filtered_out_count: u32,
}

impl ResponseBuilder {

/// Creates a new response builder for the specified request.
pub fn new(request_id: RequestId) -> ResponseBuilder {
ResponseBuilder {
request_id,
// Response identifiers that GRR agents use start at 1. The server
// assumes this to determine the number of expected messages when
// the status message is received. Thus, we have to replicate the
// behaviour of the existing GRR agent and start at 1 as well.
next_response_id: ResponseId(1),
filters: FilterSet::empty(),
filtered_out_count: 0,
}
}

/// Creates a new response builder that will filter results.
pub fn with_filters(mut self, filters: FilterSet) -> ResponseBuilder {
self.filters = filters;
self
}

/// Builds a new status response for the given action outcome.
pub fn status(self, result: crate::session::Result<()>) -> Status {
Status {
request_id: self.request_id,
// Because this method consumes the builder, we do not need to
// increment the response id.
response_id: self.next_response_id,
filtered_out_count: self.filtered_out_count,
result,
}
}

/// Builds a new reply response for the given action item.
pub fn reply<I: Item>(&mut self, item: PreparedItem<I>) -> FilteredReply<I>
where
I: Item,
{
match self.filters.eval(&item.proto) {
Ok(true) => {
let response_id = self.next_response_id;
self.next_response_id.0 += 1;

FilteredReply::Accepted(Reply {
request_id: self.request_id.clone(),
response_id,
item,
})
}
Ok(false) => {
self.filtered_out_count += 1;
FilteredReply::Rejected
}
Err(error) => FilteredReply::Error(error),
}
}
}

/// Log factory for building many log responses to a single request.
pub struct LogBuilder {
/// A unique request identifier for which we build log responses.
Expand Down Expand Up @@ -447,6 +370,8 @@ impl From<Status> for rrg_proto::rrg::Status {
proto.set_error(error.into());
}

proto.set_network_bytes_sent(status.network_bytes_sent);
proto.set_real_time(status.real_time.into());
proto.set_filtered_out_count(status.filtered_out_count);

proto
Expand Down
64 changes: 52 additions & 12 deletions crates/rrg/src/session/fleetspeak.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ pub struct FleetspeakSession<'a, 'fs> {
args: &'a crate::args::Args,
/// Filestore of the current process (if available).
filestore: Option<&'fs crate::filestore::Filestore>,
/// A builder for responses sent through Fleetspeak to the GRR server.
response_builder: crate::ResponseBuilder,
/// The response identifier assigned to the next generated response.
next_response_id: crate::ResponseId,
/// Filters to apply to the results before they are sent.
filters: crate::filter::FilterSet,
/// Number of items that have been rejected by filters.
filtered_out_count: u32,
/// Number of bytes sent since the session was created.
network_bytes_sent: u64,
/// Number of bytes we are allowed to send within the session.
Expand Down Expand Up @@ -59,16 +63,21 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> {

info!("received request '{request_id}'");

let response_builder = crate::ResponseBuilder::new(request_id);
// Response identifiers that GRR agents use start at 1. The server
// assumes this to determine the number of expected messages when the
// status message is received. Thus, we have to replicate the behaviour
// of the existing GRR agent and start at 1 as well.
let next_response_id = crate::ResponseId(1);

let status = match request {
Ok(mut request) => {
let filters = request.take_filters();
let mut session = FleetspeakSession {
request_id,
args,
filestore,
response_builder: response_builder.with_filters(filters),
next_response_id,
filters: request.take_filters(),
filtered_out_count: 0,
network_bytes_sent: 0,
network_bytes_limit: request.network_bytes_limit(),
real_time_start: std::time::Instant::now(),
Expand All @@ -78,11 +87,29 @@ impl<'a, 'fs> FleetspeakSession<'a, 'fs> {
let result = crate::log::ResponseLogger::new(&request)
.context(|| crate::action::dispatch(&mut session, request));

session.response_builder.status(result)
crate::response::Status {
request_id: request_id,
// Because status is the last response to be sent (`session`
// is dropped at the end of this scope), we do not need to
// increment the response id.
response_id: session.next_response_id,
network_bytes_sent: session.network_bytes_sent,
real_time: session.real_time_start.elapsed(),
filtered_out_count: session.filtered_out_count,
result,
}
},
Err(error) => {
error!("invalid request '{request_id}': {error}");
response_builder.status(Err(error.into()))

crate::response::Status {
request_id,
response_id: next_response_id,
network_bytes_sent: 0,
real_time: std::time::Duration::ZERO,
filtered_out_count: 0,
result: Err(crate::session::Error::from(error)),
}
}
};

Expand Down Expand Up @@ -142,11 +169,24 @@ impl<'a, 'fs> crate::session::Session for FleetspeakSession<'a, 'fs> {
{
let item = crate::response::PreparedItem::from(item);

use crate::response::FilteredReply::*;
let reply = match self.response_builder.reply(item) {
Accepted(reply) => reply,
Rejected => return Ok(()),
Error(error) => return Err(error.into()),
let reply = match self.filters.eval(item.as_proto()) {
Ok(true) => {
let response_id = self.next_response_id;
self.next_response_id.0 += 1;

crate::response::Reply {
request_id: self.request_id,
response_id,
item,
}
}
Ok(false) => {
self.filtered_out_count += 1;
return Ok(())
}
Err(error) => {
return Err(error.into())
}
};

self.network_bytes_sent += reply.send_unaccounted() as u64;
Expand Down
Loading