Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = ["girolle", "girolle_macro", "examples"]

[workspace.package]
version = "2.0.0"
version = "2.1.0"

[workspace.dependencies]
serde_json = "1.0.140"
Expand Down
2 changes: 1 addition & 1 deletion girolle/benches/macro.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};

use girolle::nameko_utils::build_inputs_fn_service;
use girolle::__macro_support::build_inputs_fn_service;
use girolle::prelude::*;
use serde_json::Value;

Expand Down
6 changes: 6 additions & 0 deletions girolle/src/__macro_support.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Stable path for symbols that the `#[girolle]` attribute macro
//! emits into user code. The expanded code references
//! `::girolle::__macro_support::*`; nothing else should depend on
//! this module.

pub use crate::service::args::build_inputs_fn_service;
90 changes: 19 additions & 71 deletions girolle/src/queue.rs → girolle/src/amqp/channel.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,23 @@
/// # queue
///
/// This module contains functions to create queues and channels for the RPC communication.
//! Channel/queue/exchange declarations.
//!
//! Each helper opens a fresh `lapin::Channel` on an existing
//! `Connection`, declares the queue and any exchanges/bindings the
//! caller will need, applies QoS, and returns the channel ready to use.

use lapin::{
options::{BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions},
types::FieldTable,
Connection, ConnectionProperties, ExchangeKind,
Connection, ExchangeKind,
};
use tracing::{error, info};
use tracing::info;
use uuid::Uuid;

/// # QUEUE_TTL
const QUEUE_TTL: u32 = 300000;

pub(crate) async fn get_connection(
amqp_uri: String,
heartbeat_value: u16,
) -> Result<lapin::Connection, lapin::Error> {
let mut connection_options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let mut client_properties_custom = FieldTable::default();
client_properties_custom.insert("heartbeat".into(), heartbeat_value.into());
connection_options.client_properties = client_properties_custom;
match Connection::connect(&amqp_uri, connection_options).await {
Ok(connection) => {
info!("Connected to RabbitMQ");
Ok(connection)
}
Err(e) => {
error!("Failed to connect to RabbitMQ with error:{}", e);
Err(e)
}
}
}
/// TTL applied to per-client reply queues (5 min). After this long
/// without a consumer, the broker reclaims the queue.
const QUEUE_TTL: u32 = 300_000;

/// # create_service_channel
///
/// This function creates a channel for a service.
///
/// ## Arguments
///
/// * `service_name` - A string slice that holds the name of the service.
/// * `amqp_uri` - A string that holds the URI of the AMQP server.
/// * `heartbeat_value` - A u16 that holds the heartbeat value.
/// * `prefetch_count` - A u16 that holds the prefetch count.
/// * `rpc_exchange` - A string slice that holds the name of the exchange.
///
/// ## Returns
///
/// A lapin::Result<lapin::Channel> that holds the channel.
/// Declare a service's inbound RPC queue (`rpc-<service>`), bind it to
/// the RPC exchange with `<service>.*`, and apply the requested QoS.
pub(crate) async fn create_service_channel(
conn: &Connection,
service_name: &str,
Expand Down Expand Up @@ -86,21 +55,9 @@ pub(crate) async fn create_service_channel(
Ok(incomming_channel)
}

/// # create_message_channel
///
/// This function creates a channel for a message.
///
/// ## Arguments
///
/// * `rpc_queue_reply` - A string slice that holds the name of the queue.
/// * `id` - A string slice that holds the id of the message.
/// * `amqp_uri` - A string that holds the URI of the AMQP server.
/// * `heartbeat_value` - A u16 that holds the heartbeat value.
/// * `rpc_exchange` - A string slice that holds the name of the exchange.
///
/// ## Returns
///
/// A lapin::Result<lapin::Channel> that holds the channel.
/// Declare a per-client reply queue bound to the RPC exchange under
/// the client's identifier as the routing key. Used by both the
/// standalone [`crate::client::RpcClient`] and the in-service caller.
pub(crate) async fn create_message_channel(
conn: &Connection,
rpc_queue_reply: &str,
Expand All @@ -112,7 +69,6 @@ pub(crate) async fn create_message_channel(
let response_channel = conn.create_channel().await?;
let mut response_arguments = FieldTable::default();
response_arguments.insert("x-expires".into(), QUEUE_TTL.into());
// Need to clone the response_arguments because the queue_declare function takes ownership of the FieldTable
response_channel
.queue_declare(
rpc_queue_reply,
Expand All @@ -138,17 +94,9 @@ pub(crate) async fn create_message_channel(
Ok(response_channel)
}

/// # create_event_consumer_channel
///
/// Sets up a channel for consuming Nameko-style events emitted by
/// `source_service`. The function declares the source's `{source}.events`
/// topic exchange (idempotently — so the consumer can come up before any
/// publisher), declares a durable consumer queue, binds it to the exchange
/// with `event_type` as the routing key, and applies `prefetch_count`.
///
/// The exchange and queue declarations match the Nameko EventDispatcher
/// publisher's conventions, so a Girolle consumer can subscribe to events
/// emitted by a Python Nameko service and vice versa.
/// Declare an event consumer's `{source}.events` topic exchange and a
/// durable queue bound to it on `event_type`. The exchange is declared
/// idempotently so a consumer can come up before any publisher.
pub(crate) async fn create_event_consumer_channel(
conn: &Connection,
queue_name: &str,
Expand Down
26 changes: 26 additions & 0 deletions girolle/src/amqp/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use lapin::{types::FieldTable, Connection, ConnectionProperties};
use tracing::{error, info};

/// Open an AMQP connection wired up to the Tokio executor and reactor,
/// with a custom heartbeat advertised in the client properties.
pub(crate) async fn get_connection(
amqp_uri: String,
heartbeat_value: u16,
) -> Result<Connection, lapin::Error> {
let mut connection_options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let mut client_properties_custom = FieldTable::default();
client_properties_custom.insert("heartbeat".into(), heartbeat_value.into());
connection_options.client_properties = client_properties_custom;
match Connection::connect(&amqp_uri, connection_options).await {
Ok(connection) => {
info!("Connected to RabbitMQ");
Ok(connection)
}
Err(e) => {
error!("Failed to connect to RabbitMQ with error:{}", e);
Err(e)
}
}
}
112 changes: 112 additions & 0 deletions girolle/src/amqp/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//! Manipulation of Nameko-specific AMQP headers.
//!
//! In particular `nameko.call_id_stack`: every hop appends
//! `<routing_key>.<id>`, and the stack is truncated to a configurable
//! window so it doesn't grow unbounded across long call chains.

use crate::error::GirolleError;
use crate::protocol::NAMEKO_CALL_ID_STACK;
use lapin::message::Delivery;
use lapin::types::{AMQPValue, FieldTable, LongString, ShortString};
use lapin::BasicProperties;
use tracing::error;
use uuid::Uuid;

fn set_current_call_id(function_name: &str, id: &str) -> AMQPValue {
let rpc_id = format!("{}.{}", function_name, id);
AMQPValue::LongString(LongString::from(rpc_id.as_bytes()))
}

#[test]
fn test_set_current_call_id() {
let function_name = "package_cg_asset.get_filepaths_from_tags";
let id = "4c5615e2-9367-46aa-8f90-b87e89723fa0";
let rpc_id = set_current_call_id(function_name, id);
assert_eq!(
rpc_id,
AMQPValue::LongString(LongString::from(
"package_cg_asset.get_filepaths_from_tags.4c5615e2-9367-46aa-8f90-b87e89723fa0"
.as_bytes()
))
);
}

/// Append a fresh call id to `nameko.call_id_stack` and truncate the
/// stack to `parent_calls_tracked` entries (Nameko semantics).
pub(crate) fn insert_new_id_to_call_id(
mut headers: FieldTable,
function_name: &str,
id: &str,
parent_calls_tracked: usize,
) -> FieldTable {
let inner_headers = headers.inner();
let call_id_stack_slice = inner_headers
.get(NAMEKO_CALL_ID_STACK)
.unwrap()
.as_array()
.unwrap()
.as_slice();
let mut call_id_stack = call_id_stack_slice.to_vec();
Comment on lines +43 to +49

Copilot AI Apr 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

insert_new_id_to_call_id uses unwrap() on the nameko.call_id_stack header and on its type conversions. If a caller sends headers without that key (or with an unexpected type), the service will panic. Consider treating a missing/invalid stack as an empty stack and seeding it instead of panicking.

Suggested change
let call_id_stack_slice = inner_headers
.get(NAMEKO_CALL_ID_STACK)
.unwrap()
.as_array()
.unwrap()
.as_slice();
let mut call_id_stack = call_id_stack_slice.to_vec();
let mut call_id_stack = inner_headers
.get(NAMEKO_CALL_ID_STACK)
.and_then(|value| value.as_array())
.map(|array| array.as_slice().to_vec())
.unwrap_or_default();

Copilot uses AI. Check for mistakes.
call_id_stack.push(set_current_call_id(function_name, id));

if call_id_stack.len() > parent_calls_tracked {
call_id_stack = call_id_stack[call_id_stack.len() - parent_calls_tracked..].to_vec();
}

let to_amqp = AMQPValue::FieldArray(call_id_stack.into());
let key_field = ShortString::from(NAMEKO_CALL_ID_STACK);
headers.insert(key_field, to_amqp);
headers
}

/// Read a required `ShortString` header into an owned `String`,
/// panicking if the header is absent.
pub(crate) fn get_id(opt_id: &Option<ShortString>, id_name: &str) -> String {
match opt_id {
Some(id) => id.to_string(),
None => {
error!("{}: None", id_name);
panic!("{}: None", id_name)
}
Comment on lines +66 to +70

Copilot AI Apr 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_id panics when an expected header is missing. In the consumer path this can bring down the whole service on a single malformed message. Prefer returning a Result<String, GirolleError> and letting callers decide how to handle missing fields (log + ack/nack).

Copilot uses AI. Check for mistakes.
}
}

#[test]
fn test_get_id() {
let id = get_id(&Some(ShortString::from("id")), "id");
assert_eq!(id, "id".to_string());
}

/// Build the `BasicProperties` of a reply message from an inbound
/// delivery: copies the correlation id, sets reply-to to the caller's
/// reply queue, and updates `nameko.call_id_stack` for the next hop.
pub(crate) fn delivery_to_message_properties(
delivery: &Delivery,
id: &Uuid,
rpc_queue: &str,
parent_calls_tracked: usize,
) -> Result<BasicProperties, GirolleError> {
let opt_routing_key = delivery.routing_key.to_string();
let correlation_id = get_id(delivery.properties.correlation_id(), "correlation_id");
let opt_headers = delivery.properties.headers();
let headers = match opt_headers {
Some(h) => insert_new_id_to_call_id(
h.clone(),
&opt_routing_key,
&id.to_string(),
parent_calls_tracked,
),
None => {
error!("No headers found in delivery properties");
return Err(GirolleError::MissingHeader);
}
};
Ok(BasicProperties::default()
.with_correlation_id(correlation_id.into())
.with_content_type("application/json".into())
.with_reply_to(rpc_queue.into())
.with_content_encoding("utf-8".into())
.with_headers(headers)
.with_delivery_mode(2)
.with_priority(0))
}
12 changes: 12 additions & 0 deletions girolle/src/amqp/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//! AMQP transport layer.
//!
//! Owns every direct interaction with `lapin`: opening connections,
//! declaring channels/queues/exchanges, manipulating Nameko-specific
//! AMQP headers, and publishing reply messages. Higher layers
//! (`service`, `client`) talk to the broker exclusively through this
//! module.

pub(crate) mod channel;
pub(crate) mod connection;
pub(crate) mod headers;
pub(crate) mod publish;
35 changes: 35 additions & 0 deletions girolle/src/amqp/publish.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::protocol::PayloadResult;
use lapin::options::BasicPublishOptions;
use lapin::{BasicProperties, Channel};

/// Publish an RPC reply on `rpc_exchange` with `reply_to_id` as the
/// routing key. Spawned as a fire-and-forget task so the consumer
/// loop is not blocked on the broker's confirm.
pub(crate) async fn publish(
rpc_channel: &Channel,
payload: PayloadResult,
properties: BasicProperties,
reply_to_id: String,
rpc_exchange: &str,
) -> lapin::Result<()> {
let rpc_channel_clone = rpc_channel.clone();
let rpc_exchange_clone = rpc_exchange.to_string();
tokio::spawn(async move {
rpc_channel_clone
.basic_publish(
&rpc_exchange_clone,
&reply_to_id,
BasicPublishOptions::default(),
payload
.to_string()
.expect("can't serialize payload")
.as_bytes(),
properties,
)
.await
.unwrap()
.await
.unwrap();
Comment on lines +15 to +32

Copilot AI Apr 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using expect("can't serialize payload") inside this library helper will panic the process on serialization failures (e.g., if PayloadResult structure changes). Prefer returning the serialization error as part of the lapin::Result (or mapping it into GirolleError) so callers can handle it.

Suggested change
let rpc_channel_clone = rpc_channel.clone();
let rpc_exchange_clone = rpc_exchange.to_string();
tokio::spawn(async move {
rpc_channel_clone
.basic_publish(
&rpc_exchange_clone,
&reply_to_id,
BasicPublishOptions::default(),
payload
.to_string()
.expect("can't serialize payload")
.as_bytes(),
properties,
)
.await
.unwrap()
.await
.unwrap();
let payload = payload.to_string().map_err(|err| {
lapin::Error::from(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("can't serialize payload: {err}"),
))
})?;
let rpc_channel_clone = rpc_channel.clone();
let rpc_exchange_clone = rpc_exchange.to_string();
tokio::spawn(async move {
if let Ok(confirm) = rpc_channel_clone
.basic_publish(
&rpc_exchange_clone,
&reply_to_id,
BasicPublishOptions::default(),
payload.as_bytes(),
properties,
)
.await
{
let _ = confirm.await;
}

Copilot uses AI. Check for mistakes.
Comment on lines +29 to +32

Copilot AI Apr 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spawned publish task uses unwrap() on both the initial publish and the broker confirm future. If the broker closes the channel or the confirm is negative, this will panic in a background task and the caller will still get Ok(()). Consider handling these errors explicitly (log + return error or signal failure) instead of panicking.

Copilot uses AI. Check for mistakes.
});
Ok(())
}
Loading
Loading