diff --git a/girolle/Cargo.toml b/girolle/Cargo.toml index 90d9653..6a6e6ae 100644 --- a/girolle/Cargo.toml +++ b/girolle/Cargo.toml @@ -13,11 +13,9 @@ bench = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lapin = "2.5.3" +lapin = "4.6.0" futures = "0.3.31" girolle_macro = { path = "../girolle_macro", version = "2" } -tokio-executor-trait = "2.1.3" -tokio-reactor-trait = "1.1.0" serde_yaml = "0.9.34+deprecated" regex = "1.11.1" dashmap = "6.1" diff --git a/girolle/src/amqp/channel.rs b/girolle/src/amqp/channel.rs index 9c51d04..22f08f5 100644 --- a/girolle/src/amqp/channel.rs +++ b/girolle/src/amqp/channel.rs @@ -31,7 +31,7 @@ pub(crate) async fn create_service_channel( let rpc_queue = format!("rpc-{}", service_name); let queue = incomming_channel .queue_declare( - &rpc_queue, + rpc_queue.as_str().into(), QueueDeclareOptions { durable: true, ..Default::default() @@ -45,9 +45,9 @@ pub(crate) async fn create_service_channel( .await?; incomming_channel .queue_bind( - &rpc_queue, - rpc_exchange, - &routing_key, + rpc_queue.as_str().into(), + rpc_exchange.into(), + routing_key.as_str().into(), QueueBindOptions::default(), FieldTable::default(), ) @@ -71,7 +71,7 @@ pub(crate) async fn create_message_channel( response_arguments.insert("x-expires".into(), QUEUE_TTL.into()); response_channel .queue_declare( - rpc_queue_reply, + rpc_queue_reply.into(), QueueDeclareOptions { durable: true, ..Default::default() @@ -84,9 +84,9 @@ pub(crate) async fn create_message_channel( .await?; response_channel .queue_bind( - rpc_queue_reply, - rpc_exchange, - &id.to_string(), + rpc_queue_reply.into(), + rpc_exchange.into(), + id.to_string().into(), QueueBindOptions::default(), response_arguments, ) @@ -110,7 +110,7 @@ pub(crate) async fn create_event_consumer_channel( channel .exchange_declare( - &exchange, + exchange.as_str().into(), ExchangeKind::Topic, ExchangeDeclareOptions { durable: true, @@ -122,7 +122,7 @@ pub(crate) async fn create_event_consumer_channel( channel .queue_declare( - queue_name, + queue_name.into(), QueueDeclareOptions { durable: true, ..Default::default() @@ -133,9 +133,9 @@ pub(crate) async fn create_event_consumer_channel( channel .queue_bind( - queue_name, - &exchange, - event_type, + queue_name.into(), + exchange.as_str().into(), + event_type.into(), QueueBindOptions::default(), FieldTable::default(), ) diff --git a/girolle/src/amqp/connection.rs b/girolle/src/amqp/connection.rs index 34d7a30..5f5744e 100644 --- a/girolle/src/amqp/connection.rs +++ b/girolle/src/amqp/connection.rs @@ -1,19 +1,16 @@ -use lapin::{types::FieldTable, Connection, ConnectionProperties}; +use lapin::uri::AMQPUri; +use lapin::{Connection, ConnectionProperties}; use tracing::{error, info}; -/// Open an AMQP connection wired up to the Tokio executor and reactor, -/// with a custom heartbeat advertised in the client properties. +/// Open an AMQP connection on the ambient Tokio runtime, applying a +/// custom heartbeat (in seconds) to the negotiated connection. pub(crate) async fn get_connection( amqp_uri: String, heartbeat_value: u16, ) -> Result { - let mut connection_options = ConnectionProperties::default() - .with_executor(tokio_executor_trait::Tokio::current()) - .with_reactor(tokio_reactor_trait::Tokio); - let mut client_properties_custom = FieldTable::default(); - client_properties_custom.insert("heartbeat".into(), heartbeat_value.into()); - connection_options.client_properties = client_properties_custom; - match Connection::connect(&amqp_uri, connection_options).await { + let mut uri: AMQPUri = amqp_uri.parse().map_err(std::io::Error::other)?; + uri.query.heartbeat = Some(heartbeat_value); + match Connection::connect_uri(uri, ConnectionProperties::default()).await { Ok(connection) => { info!("Connected to RabbitMQ"); Ok(connection) diff --git a/girolle/src/amqp/publish.rs b/girolle/src/amqp/publish.rs index e531f64..56e7f9d 100644 --- a/girolle/src/amqp/publish.rs +++ b/girolle/src/amqp/publish.rs @@ -17,8 +17,8 @@ pub(crate) async fn publish( tokio::spawn(async move { rpc_channel_clone .basic_publish( - &rpc_exchange_clone, - &reply_to_id, + rpc_exchange_clone.as_str().into(), + reply_to_id.as_str().into(), BasicPublishOptions::default(), payload .to_string() diff --git a/girolle/src/client/mod.rs b/girolle/src/client/mod.rs index a299995..7bc4caf 100644 --- a/girolle/src/client/mod.rs +++ b/girolle/src/client/mod.rs @@ -93,8 +93,8 @@ impl RpcClient { let consumer = self .reply_channel .basic_consume( - &reply_queue_name, - "girolle_consumer", + reply_queue_name.as_str().into(), + "girolle_consumer".into(), BasicConsumeOptions::default(), FieldTable::default(), ) @@ -183,8 +183,8 @@ impl RpcClient { tokio::spawn(async move { channel_clone .basic_publish( - &exchange_clone, - &routing_key, + exchange_clone.as_str().into(), + routing_key.as_str().into(), BasicPublishOptions { mandatory: false, immediate: false, @@ -279,8 +279,8 @@ impl RpcClient { /// Close the reply channel and the underlying AMQP connection. pub async fn close(&self) -> Result<(), lapin::Error> { - self.reply_channel.close(200, "Goodbye").await?; - self.conn.close(200, "Goodbye").await?; + self.reply_channel.close(200, "Goodbye".into()).await?; + self.conn.close(200, "Goodbye".into()).await?; Ok(()) } } @@ -296,7 +296,7 @@ impl TargetService { } fn close(&self) -> Result<(), lapin::Error> { - executor::block_on(self.channel.close(200, "Goodbye"))?; + executor::block_on(self.channel.close(200, "Goodbye".into()))?; Ok(()) } diff --git a/girolle/src/service/caller.rs b/girolle/src/service/caller.rs index 3cc38b4..a72dc80 100644 --- a/girolle/src/service/caller.rs +++ b/girolle/src/service/caller.rs @@ -63,8 +63,8 @@ impl RpcCallerCore { let consumer = reply_channel .basic_consume( - &reply_queue_name, - "girolle_in_service_replies", + reply_queue_name.as_str().into(), + "girolle_in_service_replies".into(), BasicConsumeOptions::default(), FieldTable::default(), ) @@ -159,8 +159,8 @@ impl RpcCallerCore { let publish_result = self .publish_channel .basic_publish( - &self.rpc_exchange, - &routing_key, + self.rpc_exchange.as_str().into(), + routing_key.as_str().into(), BasicPublishOptions::default(), payload.to_string().as_bytes(), properties, diff --git a/girolle/src/service/dispatcher.rs b/girolle/src/service/dispatcher.rs index be1257c..7df38d2 100644 --- a/girolle/src/service/dispatcher.rs +++ b/girolle/src/service/dispatcher.rs @@ -63,7 +63,7 @@ impl EventDispatcherCore { if !self.declared_exchanges.contains(&exchange) { self.publish_channel .exchange_declare( - &exchange, + exchange.as_str().into(), ExchangeKind::Topic, ExchangeDeclareOptions { durable: true, @@ -91,8 +91,8 @@ impl EventDispatcherCore { self.publish_channel .basic_publish( - &exchange, - event_type, + exchange.as_str().into(), + event_type.into(), BasicPublishOptions::default(), &payload_bytes, properties, diff --git a/girolle/src/service/runtime.rs b/girolle/src/service/runtime.rs index 19508d8..a20766b 100644 --- a/girolle/src/service/runtime.rs +++ b/girolle/src/service/runtime.rs @@ -136,8 +136,8 @@ pub(super) async fn run( .await?; let consumer = event_channel .basic_consume( - &queue_name, - "girolle_event_consumer", + queue_name.as_str().into(), + "girolle_event_consumer".into(), BasicConsumeOptions::default(), FieldTable::default(), ) @@ -199,8 +199,8 @@ pub(super) async fn run( let consumer = rpc_channel .basic_consume( - &rpc_queue, - "girolle_consumer_incomming", + rpc_queue.as_str().into(), + "girolle_consumer_incomming".into(), BasicConsumeOptions::default(), FieldTable::default(), )