Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ path = "src/error_sender.rs"
[[example]]
name = "simple_fib"
path = "src/simple_fib.rs"

[[example]]
name = "proxy_service"
path = "src/proxy_service.rs"
28 changes: 28 additions & 0 deletions examples/src/proxy_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Demonstrates an in-service RPC proxy: the `proxy.hello` method calls
//! `video.hello` from inside its handler via `ctx.rpc.call(...)`.
//!
//! Run alongside `simple_macro` (or any service exposing `video.hello`):
//!
//! ```sh
//! cargo run --example simple_macro
//! cargo run --example proxy_service
//! ```

use girolle::prelude::*;

#[girolle]
async fn proxy_hello(ctx: RpcContext, name: String) -> String {
let result = ctx
.rpc
.call("video", "hello", Payload::new().arg(name))
.await
.expect("rpc call to video.hello failed");
serde_json::from_value(result).expect("video.hello did not return a String")
}

fn main() {
let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string()).unwrap();
let _ = RpcService::new(conf, "proxy")
.register(proxy_hello)
.start();
}
1 change: 1 addition & 0 deletions girolle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio-executor-trait = "2.1.3"
tokio-reactor-trait = "1.1.0"
serde_yaml = "0.9.34+deprecated"
regex = "1.11.1"
dashmap = "6.1"
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions girolle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub mod nameko_utils;
pub mod prelude;
mod rpc_client;
pub use rpc_client::RpcClient;
mod rpc_core;
mod rpc_service;
pub use rpc_service::RpcService;
mod rpc_task;
Expand Down
189 changes: 189 additions & 0 deletions girolle/src/rpc_core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
//! In-service async RPC core.
//!
//! Holds the per-service shared state needed to act as an RPC client from
//! within a service handler: a reply queue + consumer, a correlation map
//! mapping `correlation_id -> oneshot::Sender<PayloadResult>`, and a
//! publish channel.
//!
//! [`RpcCallerCore`] is constructed once in [`crate::rpc_service`] startup
//! and shared (via `Arc`) by every inbound delivery's `RpcCaller`.

use crate::config::Config;
use crate::error::GirolleError;
use crate::payload::{Payload, PayloadResult};
use crate::queue::create_message_channel;
use crate::types::GirolleResult;
use dashmap::DashMap;
use lapin::message::DeliveryResult;
use lapin::options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions};
use lapin::types::{AMQPValue, FieldArray, FieldTable, ShortString};
use lapin::{BasicProperties, Connection};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::error;
use uuid::Uuid;

const NAMEKO_AMQP_URI: &str = "nameko.AMQP_URI";
const NAMEKO_CALL_ID_STACK: &str = "nameko.call_id_stack";

pub(crate) struct RpcCallerCore {
publish_channel: lapin::Channel,
reply_routing_key: String,
rpc_exchange: String,
pending: Arc<DashMap<String, oneshot::Sender<PayloadResult>>>,
identifier: String,
amqp_uri: String,
}

impl RpcCallerCore {
/// Set up the in-service reply queue + consumer and return a shared core.
///
/// Call once per `RpcService` instance. The returned `Arc` is cloned into
/// the `RpcCaller` handed to every inbound delivery.
pub(crate) async fn new(
conn: &Connection,
conf: &Config,
identifier: Uuid,
) -> GirolleResult<Arc<Self>> {
let reply_queue_name = format!("rpc.listener-{}", identifier);
let reply_channel = create_message_channel(
conn,
&reply_queue_name,
conf.prefetch_count(),
&identifier,
conf.rpc_exchange(),
)
.await?;
let publish_channel = conn.create_channel().await?;

let pending: Arc<DashMap<String, oneshot::Sender<PayloadResult>>> =
Arc::new(DashMap::new());

let consumer = reply_channel
.basic_consume(
&reply_queue_name,
"girolle_in_service_replies",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;

let pending_consumer = Arc::clone(&pending);
let reply_channel_for_ack = reply_channel.clone();
consumer.set_delegate(move |delivery: DeliveryResult| {
let pending = Arc::clone(&pending_consumer);
let reply_channel = reply_channel_for_ack.clone();
async move {
if let Ok(Some(delivery)) = delivery {
let correlation_id = delivery
.properties
.correlation_id()
.clone()
.map(|s| s.to_string());
let parsed: Result<PayloadResult, _> =
serde_json::from_slice(&delivery.data);
match parsed {
Ok(payload) => match correlation_id {
Some(cid) => {
if let Some((_, tx)) = pending.remove(&cid) {
let _ = tx.send(payload);
}
}
None => error!(
"in-service reply: missing correlation id, dropping"
),
},
Err(e) => {
error!(error = %e, "in-service reply: deserialization failed");
}
}
let _ = reply_channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await;
}
}
});

Ok(Arc::new(Self {
publish_channel,
reply_routing_key: identifier.to_string(),
rpc_exchange: conf.rpc_exchange().to_string(),
pending,
identifier: identifier.to_string(),
amqp_uri: conf.AMQP_URI().to_string(),
}))
}

/// Issue an outbound RPC call from inside a service handler and await
/// its reply.
///
/// `parent_headers` carries the inbound delivery's headers; we propagate
/// `nameko.call_id_stack` through if present and seed it with our own
/// service identifier otherwise. `nameko.AMQP_URI` is always stamped
/// with this service's URI.
pub(crate) async fn call(
&self,
parent_headers: &FieldTable,
service: &str,
method: &str,
payload: Payload,
) -> GirolleResult<Value> {
let correlation_id = Uuid::new_v4().to_string();
let routing_key = format!("{}.{}", service, method);

let mut outbound_headers = parent_headers.clone();
outbound_headers.insert(
ShortString::from(NAMEKO_AMQP_URI),
AMQPValue::LongString(self.amqp_uri.clone().into()),
);
let stack_key = ShortString::from(NAMEKO_CALL_ID_STACK);
if outbound_headers.inner().get(&stack_key).is_none() {
outbound_headers.insert(
stack_key,
AMQPValue::FieldArray(FieldArray::from(vec![AMQPValue::LongString(
self.identifier.clone().into(),
)])),
);
}

let properties = BasicProperties::default()
.with_reply_to(self.reply_routing_key.clone().into())
.with_correlation_id(correlation_id.clone().into())
.with_content_type("application/json".into())
.with_content_encoding("utf-8".into())
.with_headers(outbound_headers)
.with_priority(0);

let (tx, rx) = oneshot::channel::<PayloadResult>();
self.pending.insert(correlation_id.clone(), tx);

let publish_result = self
.publish_channel
.basic_publish(
&self.rpc_exchange,
&routing_key,
BasicPublishOptions::default(),
payload.to_string().as_bytes(),
properties,
)
.await;

if let Err(e) = publish_result {
self.pending.remove(&correlation_id);
return Err(GirolleError::LapinError(e));
}

let result = rx.await.map_err(|_| {
GirolleError::ServiceMissingError(
"in-service RPC reply channel dropped before a reply arrived"
.to_string(),
)
})?;

match result.get_error() {
Some(remote) => Err(remote.convert_to_girolle_error()),
None => Ok(result.get_result()),
}
}
}
11 changes: 9 additions & 2 deletions girolle/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
nameko_utils::{compute_deliver, delivery_to_message_properties, get_id, publish},
payload::{Payload, PayloadResult},
queue::{create_service_channel, get_connection},
rpc_core::RpcCallerCore,
rpc_task::RpcTask,
types::{EventDispatcher, RpcCaller, RpcContext},
};
Expand Down Expand Up @@ -272,6 +273,9 @@ async fn rpc_service(
conf.rpc_exchange(),
)
.await?;
// Stand up the in-service RPC core: reply queue, correlation map,
// and a publish channel used by ctx.rpc.call().
let rpc_caller_core = RpcCallerCore::new(&conn, conf, id).await?;
// Start a consumer.
let consumer = rpc_channel
.basic_consume(
Expand All @@ -288,7 +292,7 @@ async fn rpc_service(
service_name: service_name.to_string(),
semaphore: Semaphore::new(conf.max_workers() as usize),
parent_calls_tracked: conf.parent_calls_tracked() as usize,
rpc_caller: RpcCaller::placeholder(),
rpc_caller: RpcCaller::from_core(rpc_caller_core),
event_dispatcher: EventDispatcher::placeholder(),
});
consumer.set_delegate(move |delivery: DeliveryResult| {
Expand Down Expand Up @@ -331,13 +335,16 @@ async fn rpc_service(
(Some(rpc_task_struct), _) => {
let incomming_data: Payload = serde_json::from_slice(&delivery.data)
.expect("Can't deserialize incomming data");
let rpc = shared_data
.rpc_caller
.with_parent_headers(inbound_headers.clone());
let ctx = RpcContext {
service_name: incommig_service.to_string(),
method_name: incomming_method.to_string(),
correlation_id: correlation_id.clone(),
reply_to: reply_to_id.clone(),
headers: inbound_headers,
rpc: shared_data.rpc_caller.clone(),
rpc,
events: shared_data.event_dispatcher.clone(),
};
compute_deliver(
Expand Down
58 changes: 52 additions & 6 deletions girolle/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::error::GirolleError;
use crate::payload::Payload;
use crate::rpc_core::RpcCallerCore;
use lapin::types::FieldTable;
use serde_json::Value;
use std::future::Future;
Expand All @@ -21,16 +22,61 @@ pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
/// Capability handle exposed on [`RpcContext`] that lets a service handler
/// call other services as an RPC client.
///
/// In this release the caller is a placeholder; the in-service async RPC
/// core (shared reply queue, correlation map) lands in a follow-up change.
#[derive(Clone, Debug, Default)]
/// Each inbound delivery receives an `RpcCaller` derived from the service's
/// shared in-service RPC core. The derivation captures the parent
/// delivery's AMQP headers so that outbound calls propagate
/// `nameko.call_id_stack` correctly.
///
/// A default-constructed (or `placeholder`) `RpcCaller` has no underlying
/// core; calling [`RpcCaller::call`] on it returns an error. This shape is
/// useful for unit-testing handlers without standing up a broker.
#[derive(Clone, Default)]
pub struct RpcCaller {
_private: (),
inner: Option<Arc<RpcCallerCore>>,
parent_headers: FieldTable,
}

impl std::fmt::Debug for RpcCaller {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RpcCaller")
.field("active", &self.inner.is_some())
.finish()
}
}

impl RpcCaller {
pub(crate) fn placeholder() -> Self {
Self { _private: () }
pub(crate) fn from_core(core: Arc<RpcCallerCore>) -> Self {
Self {
inner: Some(core),
parent_headers: FieldTable::default(),
}
}

pub(crate) fn with_parent_headers(&self, headers: FieldTable) -> Self {
Self {
inner: self.inner.clone(),
parent_headers: headers,
}
}

/// Invoke `<service>.<method>` and await the reply.
///
/// Returns the decoded JSON result on success, or a [`GirolleError`]
/// reconstructed from the remote service's error on failure.
pub async fn call(
&self,
service: &str,
method: &str,
payload: Payload,
) -> GirolleResult<Value> {
let core = self.inner.as_ref().ok_or_else(|| {
GirolleError::ServiceMissingError(
"RpcCaller has no in-service core; ctx.rpc.call requires a running RpcService"
.to_string(),
)
})?;
core.call(&self.parent_headers, service, method, payload)
.await
}
}

Expand Down
Loading