diff --git a/src/cli.rs b/src/cli.rs index 9d4a4a9..7d5d9f4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -12,23 +12,23 @@ pub struct Cli { #[arg(long, default_value = "realm1", global = true)] pub realm: String, - /// Authentication ID + /// The authid to use, if authenticating. #[arg(long, global = true)] pub authid: Option, - /// Authentication role + /// The authrole to use, if authenticating. #[arg(long, global = true)] pub authrole: Option, - /// Secret for ticket/wampcra authentication + /// The secret to use in Challenge-Response Auth. #[arg(long, global = true)] pub secret: Option, - /// Path to private key file for cryptosign + /// The ed25519 private key hex for cryptosign. #[arg(long, global = true)] pub private_key: Option, - /// Ticket for ticket authentication + /// The ticket when using ticket authentication. #[arg(long, global = true)] pub ticket: Option, @@ -78,9 +78,52 @@ pub enum Commands { procedure: String, }, /// Subscribe to a topic - Subscribe, + Subscribe { + /// Topic to subscribe to + topic: String, + + /// Number of parallel sessions to create + #[arg(long, default_value_t = 1)] + parallel: u32, + + /// Maximum number of concurrent sessions + #[arg(long, default_value_t = 1)] + concurrency: usize, + }, /// Publish to a topic - Publish, + Publish { + /// Topic to publish to + topic: String, + + /// Positional arguments for the publish + /// To enforce value is always a string, send value in quotes e.g. "'1'" or '"true"' + #[arg()] + args: Vec, + + /// Keyword argument for the publish. To enforce value is always a string, send value in quotes e.g."'1'" or '"true"'. (May be provided multiple times) + #[arg(short = 'k', long = "kwarg", value_name = "KEY=VALUE")] + kwargs: Vec, + + /// WAMP publish option (May be provided multiple times) + #[arg(short = 'o', long = "option", value_name = "KEY=VALUE")] + options: Vec, + + /// Number of times to repeat the publish per session + #[arg(long, default_value_t = 1)] + repeat: u32, + + /// Number of parallel sessions to create + #[arg(long, default_value_t = 1)] + parallel: u32, + + /// Maximum number of concurrent sessions + #[arg(long, default_value_t = 1)] + concurrency: usize, + + /// Request acknowledgement from the broker + #[arg(long)] + acknowledge: bool, + }, /// Generate a WAMP cryptosign ed25519 keypair Keygen { /// Write keypair to file. Uses 'key' and 'key.pub' by default, or specify a custom name diff --git a/src/commands/call.rs b/src/commands/call.rs index 6332128..bda617a 100644 --- a/src/commands/call.rs +++ b/src/commands/call.rs @@ -69,7 +69,7 @@ async fn run_session( } }; - for iteration in 0..call_config.repeat { + for iteration in 1..=call_config.repeat { let request = build_call_request(&call_config); match session.call(request).await { @@ -124,7 +124,7 @@ pub async fn handle( let mut handles = Vec::with_capacity(call_config.parallel as usize); - for session_id in 0..call_config.parallel { + for session_id in 1..=call_config.parallel { let permit = semaphore.clone().acquire_owned().await.unwrap(); let conn_config = conn_config.clone(); diff --git a/src/commands/publish.rs b/src/commands/publish.rs index d8ed5cf..d4cfefb 100644 --- a/src/commands/publish.rs +++ b/src/commands/publish.rs @@ -1,6 +1,123 @@ -use xconn::async_::session::Session; +use crate::config::{ConnectionConfig, PublishConfig}; +use crate::utils::{ParsedArg, parse_arg}; +use std::sync::Arc; +use tokio::sync::Semaphore; +use xconn::sync::PublishRequest; + +/// Parses a "key=value" string and returns the key and parsed value. +fn parse_key_value(input: &str) -> Option<(String, ParsedArg)> { + let parts: Vec<&str> = input.splitn(2, '=').collect(); + if parts.len() == 2 { + Some((parts[0].to_string(), parse_arg(parts[1]))) + } else { + None + } +} + +/// Builds a PublishRequest from the PublishConfig. +fn build_publish_request(config: &PublishConfig) -> PublishRequest { + let mut request = PublishRequest::new(&config.topic); + + // Add positional arguments + for arg in &config.args { + request = match parse_arg(arg) { + ParsedArg::Integer(v) => request.arg(v), + ParsedArg::Float(v) => request.arg(v), + ParsedArg::Boolean(v) => request.arg(v), + ParsedArg::String(v) => request.arg(v), + }; + } + + // Add keyword arguments + for kwarg in &config.kwargs { + if let Some((key, value)) = parse_key_value(kwarg) { + request = match value { + ParsedArg::Integer(v) => request.kwarg(&key, v), + ParsedArg::Float(v) => request.kwarg(&key, v), + ParsedArg::Boolean(v) => request.kwarg(&key, v), + ParsedArg::String(v) => request.kwarg(&key, v), + }; + } + } + + // Add options + for opt in &config.options { + if let Some((key, value)) = parse_key_value(opt) { + request = match value { + ParsedArg::Integer(v) => request.option(&key, v), + ParsedArg::Float(v) => request.option(&key, v), + ParsedArg::Boolean(v) => request.option(&key, v), + ParsedArg::String(v) => request.option(&key, v), + }; + } + } + + // Add acknowledge option if requested + if config.acknowledge { + request = request.option("acknowledge", true); + } + + request +} + +/// Executes publishes for a single session: connects, runs repeated publishes, and disconnects. +async fn run_session( + conn_config: Arc, + publish_config: Arc, + session_id: u32, +) { + let session = match conn_config.connect().await { + Ok(s) => s, + Err(e) => { + eprintln!("Session {} Connection Error: {}", session_id, e); + return; + } + }; + + for iteration in 1..=publish_config.repeat { + let request = build_publish_request(&publish_config); + + match session.publish(request).await { + Ok(_) => {} + Err(e) => eprintln!( + "Session {} Iteration {} Publish Error: {}", + session_id, iteration, e + ), + } + } + + if let Err(e) = session.leave().await { + eprintln!("Session {} Error leaving: {}", session_id, e); + } +} + +pub async fn handle( + conn_config: ConnectionConfig, + publish_config: PublishConfig, +) -> Result<(), Box> { + let semaphore = Arc::new(Semaphore::new(publish_config.concurrency)); + let conn_config = Arc::new(conn_config); + let publish_config = Arc::new(publish_config); + + let mut handles = Vec::with_capacity(publish_config.parallel as usize); + + for session_id in 1..=publish_config.parallel { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + + let conn_config = conn_config.clone(); + let publish_config = publish_config.clone(); + + let handle = tokio::spawn(async move { + let _permit = permit; + run_session(conn_config, publish_config, session_id).await; + }); + + handles.push(handle); + } + + for handle in handles { + let _ = handle.await; + } -pub async fn handle(_session: &Session) -> Result<(), Box> { - println!("Subcommand 'publish' executed"); Ok(()) } diff --git a/src/commands/register.rs b/src/commands/register.rs index 07e5ad6..640641e 100644 --- a/src/commands/register.rs +++ b/src/commands/register.rs @@ -25,8 +25,8 @@ pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box println!("Registered procedure {}: {:?}", procedure, reg), - Err(e) => println!("Error registering procedure: {}", e), + Ok(_) => println!("Registered procedure '{}'", procedure), + Err(e) => eprintln!("Error registering procedure: {}", e), } println!("Press Ctrl+C to exit"); diff --git a/src/commands/subscribe.rs b/src/commands/subscribe.rs index 817e12d..69799a1 100644 --- a/src/commands/subscribe.rs +++ b/src/commands/subscribe.rs @@ -1,6 +1,113 @@ -use xconn::async_::session::Session; +use crate::config::{ConnectionConfig, SubscribeConfig}; +use crate::utils::{CommandOutput, wamp_async_value_to_serde}; +use std::sync::Arc; +use tokio::signal; +use tokio::sync::Semaphore; +use xconn::async_::{Event, SubscribeRequest}; + +/// Builds a SubscribeRequest from the SubscribeConfig. +fn build_subscribe_request(config: &SubscribeConfig) -> SubscribeRequest { + // Note: SubscribeRequest doesn't support options via builder pattern + // Options would need to be added at the xconn-rust library level + SubscribeRequest::new(&config.topic, event_handler) +} + +async fn event_handler(event: Event) { + let output = CommandOutput { + args: event.args.iter().map(wamp_async_value_to_serde).collect(), + kwargs: event + .kwargs + .iter() + .map(|(k, v): (_, _)| (k.clone(), wamp_async_value_to_serde(v))) + .collect(), + }; + + match serde_json::to_string_pretty(&output) { + Ok(json) => println!("{}", json), + Err(e) => eprintln!("Error serializing event: {}", e), + } +} + +/// Runs a single subscribe session: connects, subscribes, and waits. +async fn run_session( + conn_config: Arc, + subscribe_config: Arc, + session_id: u32, + shutdown: tokio::sync::watch::Receiver, +) { + let session = match conn_config.connect().await { + Ok(s) => s, + Err(e) => { + eprintln!("Session {} Connection Error: {}", session_id, e); + return; + } + }; + + let request = build_subscribe_request(&subscribe_config); + + match session.subscribe(request).await { + Ok(_) => { + if subscribe_config.parallel > 1 { + println!( + "Session {}: Subscribed to topic '{}'", + session_id, subscribe_config.topic + ); + } else { + println!("Subscribed to topic '{}'", subscribe_config.topic); + } + } + Err(e) => { + eprintln!("Session {} Subscribe Error: {}", session_id, e); + return; + } + } + + // Wait for shutdown signal + let mut shutdown = shutdown; + let _ = shutdown.changed().await; + + if let Err(e) = session.leave().await { + eprintln!("Session {} Error leaving: {}", session_id, e); + } +} + +pub async fn handle( + conn_config: ConnectionConfig, + subscribe_config: SubscribeConfig, +) -> Result<(), Box> { + let semaphore = Arc::new(Semaphore::new(subscribe_config.concurrency)); + let conn_config = Arc::new(conn_config); + let subscribe_config = Arc::new(subscribe_config); + + // Create shutdown channel + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + let mut handles = Vec::with_capacity(subscribe_config.parallel as usize); + + for session_id in 1..=subscribe_config.parallel { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let conn_config = conn_config.clone(); + let subscribe_config = subscribe_config.clone(); + let shutdown_rx = shutdown_rx.clone(); + + let handle = tokio::spawn(async move { + let _permit = permit; + run_session(conn_config, subscribe_config, session_id, shutdown_rx).await; + }); + + handles.push(handle); + } + + println!("Press Ctrl+C to exit"); + signal::ctrl_c().await?; + println!("Exiting..."); + + // Signal all sessions to shutdown + let _ = shutdown_tx.send(true); + + for handle in handles { + let _ = handle.await; + } -pub async fn handle(_session: &Session) -> Result<(), Box> { - println!("Subcommand 'subscribe' executed"); Ok(()) } diff --git a/src/config.rs b/src/config.rs index 3c48e27..e090719 100644 --- a/src/config.rs +++ b/src/config.rs @@ -116,3 +116,24 @@ pub struct CallConfig { pub parallel: u32, pub concurrency: usize, } + +/// Configuration specific to the Publish command. +#[derive(Debug, Clone)] +pub struct PublishConfig { + pub topic: String, + pub args: Vec, + pub kwargs: Vec, + pub options: Vec, + pub repeat: u32, + pub parallel: u32, + pub concurrency: usize, + pub acknowledge: bool, +} + +/// Configuration specific to the Subscribe command. +#[derive(Debug, Clone)] +pub struct SubscribeConfig { + pub topic: String, + pub parallel: u32, + pub concurrency: usize, +} diff --git a/src/main.rs b/src/main.rs index b8dca46..095ecd7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,19 +5,16 @@ mod utils; use clap::Parser; use cli::{Cli, Commands}; -use config::{CallConfig, ConnectionConfig}; +use config::{CallConfig, ConnectionConfig, PublishConfig}; #[tokio::main] async fn main() -> Result<(), Box> { let cli = Cli::parse(); - // Handle commands that don't require a connection first if let Commands::Keygen { output_file } = cli.command { return commands::keygen::handle(output_file); } - println!("Connecting to {} in realm {}", cli.url, cli.realm); - let conn_config = ConnectionConfig::from(&cli); match cli.command { @@ -46,15 +43,39 @@ async fn main() -> Result<(), Box> { commands::register::handle(&session, &procedure).await?; session.leave().await?; } - Commands::Subscribe => { - let session = conn_config.connect().await?; - commands::subscribe::handle(&session).await?; - session.leave().await?; + Commands::Subscribe { + topic, + parallel, + concurrency, + } => { + let subscribe_config = config::SubscribeConfig { + topic, + parallel, + concurrency, + }; + commands::subscribe::handle(conn_config, subscribe_config).await?; } - Commands::Publish => { - let session = conn_config.connect().await?; - commands::publish::handle(&session).await?; - session.leave().await?; + Commands::Publish { + topic, + args, + kwargs, + options, + repeat, + parallel, + concurrency, + acknowledge, + } => { + let publish_config = PublishConfig { + topic, + args, + kwargs, + options, + repeat, + parallel, + concurrency, + acknowledge, + }; + commands::publish::handle(conn_config, publish_config).await?; } Commands::Keygen { .. } => unreachable!(), // Handled above }