Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions girolle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ bench = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lapin = "2.5.3"
lapin = "4.6.0"
futures = "0.3.31"
girolle_macro = { path = "../girolle_macro", version = "2" }
tokio-executor-trait = "2.1.3"
tokio-reactor-trait = "1.1.0"
serde_yaml = "0.9.34+deprecated"
regex = "1.11.1"
dashmap = "6.1"
Expand Down
26 changes: 13 additions & 13 deletions girolle/src/amqp/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) async fn create_service_channel(
let rpc_queue = format!("rpc-{}", service_name);
let queue = incomming_channel
.queue_declare(
&rpc_queue,
rpc_queue.as_str().into(),
QueueDeclareOptions {
durable: true,
..Default::default()
Expand All @@ -45,9 +45,9 @@ pub(crate) async fn create_service_channel(
.await?;
incomming_channel
.queue_bind(
&rpc_queue,
rpc_exchange,
&routing_key,
rpc_queue.as_str().into(),
rpc_exchange.into(),
routing_key.as_str().into(),
QueueBindOptions::default(),
FieldTable::default(),
)
Expand All @@ -71,7 +71,7 @@ pub(crate) async fn create_message_channel(
response_arguments.insert("x-expires".into(), QUEUE_TTL.into());
response_channel
.queue_declare(
rpc_queue_reply,
rpc_queue_reply.into(),
QueueDeclareOptions {
durable: true,
..Default::default()
Expand All @@ -84,9 +84,9 @@ pub(crate) async fn create_message_channel(
.await?;
response_channel
.queue_bind(
rpc_queue_reply,
rpc_exchange,
&id.to_string(),
rpc_queue_reply.into(),
rpc_exchange.into(),
id.to_string().into(),
QueueBindOptions::default(),
response_arguments,
)
Expand All @@ -110,7 +110,7 @@ pub(crate) async fn create_event_consumer_channel(

channel
.exchange_declare(
&exchange,
exchange.as_str().into(),
ExchangeKind::Topic,
ExchangeDeclareOptions {
durable: true,
Expand All @@ -122,7 +122,7 @@ pub(crate) async fn create_event_consumer_channel(

channel
.queue_declare(
queue_name,
queue_name.into(),
QueueDeclareOptions {
durable: true,
..Default::default()
Expand All @@ -133,9 +133,9 @@ pub(crate) async fn create_event_consumer_channel(

channel
.queue_bind(
queue_name,
&exchange,
event_type,
queue_name.into(),
exchange.as_str().into(),
event_type.into(),
QueueBindOptions::default(),
FieldTable::default(),
)
Expand Down
17 changes: 7 additions & 10 deletions girolle/src/amqp/connection.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use lapin::{types::FieldTable, Connection, ConnectionProperties};
use lapin::uri::AMQPUri;
use lapin::{Connection, ConnectionProperties};
use tracing::{error, info};

/// Open an AMQP connection wired up to the Tokio executor and reactor,
/// with a custom heartbeat advertised in the client properties.
/// Open an AMQP connection on the ambient Tokio runtime, applying a
/// custom heartbeat (in seconds) to the negotiated connection.
pub(crate) async fn get_connection(
amqp_uri: String,
heartbeat_value: u16,
) -> Result<Connection, lapin::Error> {
let mut connection_options = ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let mut client_properties_custom = FieldTable::default();
client_properties_custom.insert("heartbeat".into(), heartbeat_value.into());
connection_options.client_properties = client_properties_custom;
match Connection::connect(&amqp_uri, connection_options).await {
let mut uri: AMQPUri = amqp_uri.parse().map_err(std::io::Error::other)?;
uri.query.heartbeat = Some(heartbeat_value);
match Connection::connect_uri(uri, ConnectionProperties::default()).await {
Ok(connection) => {
info!("Connected to RabbitMQ");
Ok(connection)
Expand Down
4 changes: 2 additions & 2 deletions girolle/src/amqp/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub(crate) async fn publish(
tokio::spawn(async move {
rpc_channel_clone
.basic_publish(
&rpc_exchange_clone,
&reply_to_id,
rpc_exchange_clone.as_str().into(),
reply_to_id.as_str().into(),
BasicPublishOptions::default(),
payload
.to_string()
Expand Down
14 changes: 7 additions & 7 deletions girolle/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl RpcClient {
let consumer = self
.reply_channel
.basic_consume(
&reply_queue_name,
"girolle_consumer",
reply_queue_name.as_str().into(),
"girolle_consumer".into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
Expand Down Expand Up @@ -183,8 +183,8 @@ impl RpcClient {
tokio::spawn(async move {
channel_clone
.basic_publish(
&exchange_clone,
&routing_key,
exchange_clone.as_str().into(),
routing_key.as_str().into(),
BasicPublishOptions {
mandatory: false,
immediate: false,
Expand Down Expand Up @@ -279,8 +279,8 @@ impl RpcClient {

/// Close the reply channel and the underlying AMQP connection.
pub async fn close(&self) -> Result<(), lapin::Error> {
self.reply_channel.close(200, "Goodbye").await?;
self.conn.close(200, "Goodbye").await?;
self.reply_channel.close(200, "Goodbye".into()).await?;
self.conn.close(200, "Goodbye".into()).await?;
Ok(())
}
}
Expand All @@ -296,7 +296,7 @@ impl TargetService {
}

fn close(&self) -> Result<(), lapin::Error> {
executor::block_on(self.channel.close(200, "Goodbye"))?;
executor::block_on(self.channel.close(200, "Goodbye".into()))?;
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions girolle/src/service/caller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ impl RpcCallerCore {

let consumer = reply_channel
.basic_consume(
&reply_queue_name,
"girolle_in_service_replies",
reply_queue_name.as_str().into(),
"girolle_in_service_replies".into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
Expand Down Expand Up @@ -159,8 +159,8 @@ impl RpcCallerCore {
let publish_result = self
.publish_channel
.basic_publish(
&self.rpc_exchange,
&routing_key,
self.rpc_exchange.as_str().into(),
routing_key.as_str().into(),
BasicPublishOptions::default(),
payload.to_string().as_bytes(),
properties,
Expand Down
6 changes: 3 additions & 3 deletions girolle/src/service/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl EventDispatcherCore {
if !self.declared_exchanges.contains(&exchange) {
self.publish_channel
.exchange_declare(
&exchange,
exchange.as_str().into(),
ExchangeKind::Topic,
ExchangeDeclareOptions {
durable: true,
Expand Down Expand Up @@ -91,8 +91,8 @@ impl EventDispatcherCore {

self.publish_channel
.basic_publish(
&exchange,
event_type,
exchange.as_str().into(),
event_type.into(),
BasicPublishOptions::default(),
&payload_bytes,
properties,
Expand Down
8 changes: 4 additions & 4 deletions girolle/src/service/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ pub(super) async fn run(
.await?;
let consumer = event_channel
.basic_consume(
&queue_name,
"girolle_event_consumer",
queue_name.as_str().into(),
"girolle_event_consumer".into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
Expand Down Expand Up @@ -199,8 +199,8 @@ pub(super) async fn run(

let consumer = rpc_channel
.basic_consume(
&rpc_queue,
"girolle_consumer_incomming",
rpc_queue.as_str().into(),
"girolle_consumer_incomming".into(),
BasicConsumeOptions::default(),
FieldTable::default(),
)
Expand Down
Loading