diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 3d331f3..0698e1e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -42,3 +42,7 @@ path = "src/error_sender.rs" [[example]] name = "simple_fib" path = "src/simple_fib.rs" + +[[example]] +name = "proxy_service" +path = "src/proxy_service.rs" diff --git a/examples/src/proxy_service.rs b/examples/src/proxy_service.rs new file mode 100644 index 0000000..01dfcf6 --- /dev/null +++ b/examples/src/proxy_service.rs @@ -0,0 +1,28 @@ +//! Demonstrates an in-service RPC proxy: the `proxy.hello` method calls +//! `video.hello` from inside its handler via `ctx.rpc.call(...)`. +//! +//! Run alongside `simple_macro` (or any service exposing `video.hello`): +//! +//! ```sh +//! cargo run --example simple_macro +//! cargo run --example proxy_service +//! ``` + +use girolle::prelude::*; + +#[girolle] +async fn proxy_hello(ctx: RpcContext, name: String) -> String { + let result = ctx + .rpc + .call("video", "hello", Payload::new().arg(name)) + .await + .expect("rpc call to video.hello failed"); + serde_json::from_value(result).expect("video.hello did not return a String") +} + +fn main() { + let conf: Config = Config::with_yaml_defaults("staging/config.yml".to_string()).unwrap(); + let _ = RpcService::new(conf, "proxy") + .register(proxy_hello) + .start(); +} diff --git a/girolle/Cargo.toml b/girolle/Cargo.toml index 1222bc0..f0f6348 100644 --- a/girolle/Cargo.toml +++ b/girolle/Cargo.toml @@ -20,6 +20,7 @@ tokio-executor-trait = "2.1.3" tokio-reactor-trait = "1.1.0" serde_yaml = "0.9.34+deprecated" regex = "1.11.1" +dashmap = "6.1" serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/girolle/src/lib.rs b/girolle/src/lib.rs index a16059a..9e73923 100644 --- a/girolle/src/lib.rs +++ b/girolle/src/lib.rs @@ -55,6 +55,7 @@ pub mod nameko_utils; pub mod prelude; mod rpc_client; pub use rpc_client::RpcClient; +mod rpc_core; mod rpc_service; pub use rpc_service::RpcService; mod rpc_task; diff --git a/girolle/src/rpc_core.rs b/girolle/src/rpc_core.rs new file mode 100644 index 0000000..7e89f0a --- /dev/null +++ b/girolle/src/rpc_core.rs @@ -0,0 +1,189 @@ +//! In-service async RPC core. +//! +//! Holds the per-service shared state needed to act as an RPC client from +//! within a service handler: a reply queue + consumer, a correlation map +//! mapping `correlation_id -> oneshot::Sender`, and a +//! publish channel. +//! +//! [`RpcCallerCore`] is constructed once in [`crate::rpc_service`] startup +//! and shared (via `Arc`) by every inbound delivery's `RpcCaller`. + +use crate::config::Config; +use crate::error::GirolleError; +use crate::payload::{Payload, PayloadResult}; +use crate::queue::create_message_channel; +use crate::types::GirolleResult; +use dashmap::DashMap; +use lapin::message::DeliveryResult; +use lapin::options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions}; +use lapin::types::{AMQPValue, FieldArray, FieldTable, ShortString}; +use lapin::{BasicProperties, Connection}; +use serde_json::Value; +use std::sync::Arc; +use tokio::sync::oneshot; +use tracing::error; +use uuid::Uuid; + +const NAMEKO_AMQP_URI: &str = "nameko.AMQP_URI"; +const NAMEKO_CALL_ID_STACK: &str = "nameko.call_id_stack"; + +pub(crate) struct RpcCallerCore { + publish_channel: lapin::Channel, + reply_routing_key: String, + rpc_exchange: String, + pending: Arc>>, + identifier: String, + amqp_uri: String, +} + +impl RpcCallerCore { + /// Set up the in-service reply queue + consumer and return a shared core. + /// + /// Call once per `RpcService` instance. The returned `Arc` is cloned into + /// the `RpcCaller` handed to every inbound delivery. + pub(crate) async fn new( + conn: &Connection, + conf: &Config, + identifier: Uuid, + ) -> GirolleResult> { + let reply_queue_name = format!("rpc.listener-{}", identifier); + let reply_channel = create_message_channel( + conn, + &reply_queue_name, + conf.prefetch_count(), + &identifier, + conf.rpc_exchange(), + ) + .await?; + let publish_channel = conn.create_channel().await?; + + let pending: Arc>> = + Arc::new(DashMap::new()); + + let consumer = reply_channel + .basic_consume( + &reply_queue_name, + "girolle_in_service_replies", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + let pending_consumer = Arc::clone(&pending); + let reply_channel_for_ack = reply_channel.clone(); + consumer.set_delegate(move |delivery: DeliveryResult| { + let pending = Arc::clone(&pending_consumer); + let reply_channel = reply_channel_for_ack.clone(); + async move { + if let Ok(Some(delivery)) = delivery { + let correlation_id = delivery + .properties + .correlation_id() + .clone() + .map(|s| s.to_string()); + let parsed: Result = + serde_json::from_slice(&delivery.data); + match parsed { + Ok(payload) => match correlation_id { + Some(cid) => { + if let Some((_, tx)) = pending.remove(&cid) { + let _ = tx.send(payload); + } + } + None => error!( + "in-service reply: missing correlation id, dropping" + ), + }, + Err(e) => { + error!(error = %e, "in-service reply: deserialization failed"); + } + } + let _ = reply_channel + .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) + .await; + } + } + }); + + Ok(Arc::new(Self { + publish_channel, + reply_routing_key: identifier.to_string(), + rpc_exchange: conf.rpc_exchange().to_string(), + pending, + identifier: identifier.to_string(), + amqp_uri: conf.AMQP_URI().to_string(), + })) + } + + /// Issue an outbound RPC call from inside a service handler and await + /// its reply. + /// + /// `parent_headers` carries the inbound delivery's headers; we propagate + /// `nameko.call_id_stack` through if present and seed it with our own + /// service identifier otherwise. `nameko.AMQP_URI` is always stamped + /// with this service's URI. + pub(crate) async fn call( + &self, + parent_headers: &FieldTable, + service: &str, + method: &str, + payload: Payload, + ) -> GirolleResult { + let correlation_id = Uuid::new_v4().to_string(); + let routing_key = format!("{}.{}", service, method); + + let mut outbound_headers = parent_headers.clone(); + outbound_headers.insert( + ShortString::from(NAMEKO_AMQP_URI), + AMQPValue::LongString(self.amqp_uri.clone().into()), + ); + let stack_key = ShortString::from(NAMEKO_CALL_ID_STACK); + if outbound_headers.inner().get(&stack_key).is_none() { + outbound_headers.insert( + stack_key, + AMQPValue::FieldArray(FieldArray::from(vec![AMQPValue::LongString( + self.identifier.clone().into(), + )])), + ); + } + + let properties = BasicProperties::default() + .with_reply_to(self.reply_routing_key.clone().into()) + .with_correlation_id(correlation_id.clone().into()) + .with_content_type("application/json".into()) + .with_content_encoding("utf-8".into()) + .with_headers(outbound_headers) + .with_priority(0); + + let (tx, rx) = oneshot::channel::(); + self.pending.insert(correlation_id.clone(), tx); + + let publish_result = self + .publish_channel + .basic_publish( + &self.rpc_exchange, + &routing_key, + BasicPublishOptions::default(), + payload.to_string().as_bytes(), + properties, + ) + .await; + + if let Err(e) = publish_result { + self.pending.remove(&correlation_id); + return Err(GirolleError::LapinError(e)); + } + + let result = rx.await.map_err(|_| { + GirolleError::ServiceMissingError( + "in-service RPC reply channel dropped before a reply arrived" + .to_string(), + ) + })?; + + match result.get_error() { + Some(remote) => Err(remote.convert_to_girolle_error()), + None => Ok(result.get_result()), + } + } +} diff --git a/girolle/src/rpc_service.rs b/girolle/src/rpc_service.rs index 680b4f9..2f22e72 100644 --- a/girolle/src/rpc_service.rs +++ b/girolle/src/rpc_service.rs @@ -4,6 +4,7 @@ use crate::{ nameko_utils::{compute_deliver, delivery_to_message_properties, get_id, publish}, payload::{Payload, PayloadResult}, queue::{create_service_channel, get_connection}, + rpc_core::RpcCallerCore, rpc_task::RpcTask, types::{EventDispatcher, RpcCaller, RpcContext}, }; @@ -272,6 +273,9 @@ async fn rpc_service( conf.rpc_exchange(), ) .await?; + // Stand up the in-service RPC core: reply queue, correlation map, + // and a publish channel used by ctx.rpc.call(). + let rpc_caller_core = RpcCallerCore::new(&conn, conf, id).await?; // Start a consumer. let consumer = rpc_channel .basic_consume( @@ -288,7 +292,7 @@ async fn rpc_service( service_name: service_name.to_string(), semaphore: Semaphore::new(conf.max_workers() as usize), parent_calls_tracked: conf.parent_calls_tracked() as usize, - rpc_caller: RpcCaller::placeholder(), + rpc_caller: RpcCaller::from_core(rpc_caller_core), event_dispatcher: EventDispatcher::placeholder(), }); consumer.set_delegate(move |delivery: DeliveryResult| { @@ -331,13 +335,16 @@ async fn rpc_service( (Some(rpc_task_struct), _) => { let incomming_data: Payload = serde_json::from_slice(&delivery.data) .expect("Can't deserialize incomming data"); + let rpc = shared_data + .rpc_caller + .with_parent_headers(inbound_headers.clone()); let ctx = RpcContext { service_name: incommig_service.to_string(), method_name: incomming_method.to_string(), correlation_id: correlation_id.clone(), reply_to: reply_to_id.clone(), headers: inbound_headers, - rpc: shared_data.rpc_caller.clone(), + rpc, events: shared_data.event_dispatcher.clone(), }; compute_deliver( diff --git a/girolle/src/types.rs b/girolle/src/types.rs index 21ef72f..c244c3d 100644 --- a/girolle/src/types.rs +++ b/girolle/src/types.rs @@ -1,5 +1,6 @@ use crate::error::GirolleError; use crate::payload::Payload; +use crate::rpc_core::RpcCallerCore; use lapin::types::FieldTable; use serde_json::Value; use std::future::Future; @@ -21,16 +22,61 @@ pub type BoxFuture = Pin + Send>>; /// Capability handle exposed on [`RpcContext`] that lets a service handler /// call other services as an RPC client. /// -/// In this release the caller is a placeholder; the in-service async RPC -/// core (shared reply queue, correlation map) lands in a follow-up change. -#[derive(Clone, Debug, Default)] +/// Each inbound delivery receives an `RpcCaller` derived from the service's +/// shared in-service RPC core. The derivation captures the parent +/// delivery's AMQP headers so that outbound calls propagate +/// `nameko.call_id_stack` correctly. +/// +/// A default-constructed (or `placeholder`) `RpcCaller` has no underlying +/// core; calling [`RpcCaller::call`] on it returns an error. This shape is +/// useful for unit-testing handlers without standing up a broker. +#[derive(Clone, Default)] pub struct RpcCaller { - _private: (), + inner: Option>, + parent_headers: FieldTable, +} + +impl std::fmt::Debug for RpcCaller { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RpcCaller") + .field("active", &self.inner.is_some()) + .finish() + } } impl RpcCaller { - pub(crate) fn placeholder() -> Self { - Self { _private: () } + pub(crate) fn from_core(core: Arc) -> Self { + Self { + inner: Some(core), + parent_headers: FieldTable::default(), + } + } + + pub(crate) fn with_parent_headers(&self, headers: FieldTable) -> Self { + Self { + inner: self.inner.clone(), + parent_headers: headers, + } + } + + /// Invoke `.` and await the reply. + /// + /// Returns the decoded JSON result on success, or a [`GirolleError`] + /// reconstructed from the remote service's error on failure. + pub async fn call( + &self, + service: &str, + method: &str, + payload: Payload, + ) -> GirolleResult { + let core = self.inner.as_ref().ok_or_else(|| { + GirolleError::ServiceMissingError( + "RpcCaller has no in-service core; ctx.rpc.call requires a running RpcService" + .to_string(), + ) + })?; + core.call(&self.parent_headers, service, method, payload) + .await } }