Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 50 additions & 7 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ pub struct Cli {
#[arg(long, default_value = "realm1", global = true)]
pub realm: String,

/// Authentication ID
/// The authid to use, if authenticating.
#[arg(long, global = true)]
pub authid: Option<String>,

/// Authentication role
/// The authrole to use, if authenticating.
#[arg(long, global = true)]
pub authrole: Option<String>,

/// Secret for ticket/wampcra authentication
/// The secret to use in Challenge-Response Auth.
#[arg(long, global = true)]
pub secret: Option<String>,

/// Path to private key file for cryptosign
/// The ed25519 private key hex for cryptosign.
#[arg(long, global = true)]
pub private_key: Option<String>,

/// Ticket for ticket authentication
/// The ticket when using ticket authentication.
#[arg(long, global = true)]
pub ticket: Option<String>,

Expand Down Expand Up @@ -78,9 +78,52 @@ pub enum Commands {
procedure: String,
},
/// Subscribe to a topic
Subscribe,
Subscribe {
/// Topic to subscribe to
topic: String,

/// Number of parallel sessions to create
#[arg(long, default_value_t = 1)]
parallel: u32,

/// Maximum number of concurrent sessions
#[arg(long, default_value_t = 1)]
concurrency: usize,
},
/// Publish to a topic
Publish,
Publish {
/// Topic to publish to
topic: String,

/// Positional arguments for the publish
/// To enforce value is always a string, send value in quotes e.g. "'1'" or '"true"'
#[arg()]
args: Vec<String>,

/// Keyword argument for the publish. To enforce value is always a string, send value in quotes e.g."'1'" or '"true"'. (May be provided multiple times)
#[arg(short = 'k', long = "kwarg", value_name = "KEY=VALUE")]
kwargs: Vec<String>,

/// WAMP publish option (May be provided multiple times)
#[arg(short = 'o', long = "option", value_name = "KEY=VALUE")]
options: Vec<String>,

/// Number of times to repeat the publish per session
#[arg(long, default_value_t = 1)]
repeat: u32,

/// Number of parallel sessions to create
#[arg(long, default_value_t = 1)]
parallel: u32,

/// Maximum number of concurrent sessions
#[arg(long, default_value_t = 1)]
concurrency: usize,

/// Request acknowledgement from the broker
#[arg(long)]
acknowledge: bool,
},
/// Generate a WAMP cryptosign ed25519 keypair
Keygen {
/// Write keypair to file. Uses 'key' and 'key.pub' by default, or specify a custom name
Expand Down
4 changes: 2 additions & 2 deletions src/commands/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn run_session(
}
};

for iteration in 0..call_config.repeat {
for iteration in 1..=call_config.repeat {
let request = build_call_request(&call_config);

match session.call(request).await {
Expand Down Expand Up @@ -124,7 +124,7 @@ pub async fn handle(

let mut handles = Vec::with_capacity(call_config.parallel as usize);

for session_id in 0..call_config.parallel {
for session_id in 1..=call_config.parallel {
let permit = semaphore.clone().acquire_owned().await.unwrap();

let conn_config = conn_config.clone();
Expand Down
123 changes: 120 additions & 3 deletions src/commands/publish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,123 @@
use xconn::async_::session::Session;
use crate::config::{ConnectionConfig, PublishConfig};
use crate::utils::{ParsedArg, parse_arg};
use std::sync::Arc;
use tokio::sync::Semaphore;
use xconn::sync::PublishRequest;

/// Parses a "key=value" string and returns the key and parsed value.
fn parse_key_value(input: &str) -> Option<(String, ParsedArg)> {
let parts: Vec<&str> = input.splitn(2, '=').collect();
if parts.len() == 2 {
Some((parts[0].to_string(), parse_arg(parts[1])))
} else {
None
}
}

/// Builds a PublishRequest from the PublishConfig.
fn build_publish_request(config: &PublishConfig) -> PublishRequest {
let mut request = PublishRequest::new(&config.topic);

// Add positional arguments
for arg in &config.args {
request = match parse_arg(arg) {
ParsedArg::Integer(v) => request.arg(v),
ParsedArg::Float(v) => request.arg(v),
ParsedArg::Boolean(v) => request.arg(v),
ParsedArg::String(v) => request.arg(v),
};
}

// Add keyword arguments
for kwarg in &config.kwargs {
if let Some((key, value)) = parse_key_value(kwarg) {
request = match value {
ParsedArg::Integer(v) => request.kwarg(&key, v),
ParsedArg::Float(v) => request.kwarg(&key, v),
ParsedArg::Boolean(v) => request.kwarg(&key, v),
ParsedArg::String(v) => request.kwarg(&key, v),
};
}
}

// Add options
for opt in &config.options {
if let Some((key, value)) = parse_key_value(opt) {
request = match value {
ParsedArg::Integer(v) => request.option(&key, v),
ParsedArg::Float(v) => request.option(&key, v),
ParsedArg::Boolean(v) => request.option(&key, v),
ParsedArg::String(v) => request.option(&key, v),
};
}
}

// Add acknowledge option if requested
if config.acknowledge {
request = request.option("acknowledge", true);
}

request
}

/// Executes publishes for a single session: connects, runs repeated publishes, and disconnects.
async fn run_session(
conn_config: Arc<ConnectionConfig>,
publish_config: Arc<PublishConfig>,
session_id: u32,
) {
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
eprintln!("Session {} Connection Error: {}", session_id, e);
return;
}
};

for iteration in 1..=publish_config.repeat {
let request = build_publish_request(&publish_config);

match session.publish(request).await {
Ok(_) => {}
Err(e) => eprintln!(
"Session {} Iteration {} Publish Error: {}",
session_id, iteration, e
),
}
}

if let Err(e) = session.leave().await {
eprintln!("Session {} Error leaving: {}", session_id, e);
}
}

pub async fn handle(
conn_config: ConnectionConfig,
publish_config: PublishConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let semaphore = Arc::new(Semaphore::new(publish_config.concurrency));
let conn_config = Arc::new(conn_config);
let publish_config = Arc::new(publish_config);

let mut handles = Vec::with_capacity(publish_config.parallel as usize);

for session_id in 1..=publish_config.parallel {
let permit = semaphore.clone().acquire_owned().await.unwrap();

let conn_config = conn_config.clone();
let publish_config = publish_config.clone();

let handle = tokio::spawn(async move {
let _permit = permit;
run_session(conn_config, publish_config, session_id).await;
});

handles.push(handle);
}

for handle in handles {
let _ = handle.await;
}

pub async fn handle(_session: &Session) -> Result<(), Box<dyn std::error::Error>> {
println!("Subcommand 'publish' executed");
Ok(())
}
4 changes: 2 additions & 2 deletions src/commands/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub async fn handle(session: &Session, procedure: &str) -> Result<(), Box<dyn st
let register_request = RegisterRequest::new(procedure, registration_handler);

match session.register(register_request).await {
Ok(reg) => println!("Registered procedure {}: {:?}", procedure, reg),
Err(e) => println!("Error registering procedure: {}", e),
Ok(_) => println!("Registered procedure '{}'", procedure),
Err(e) => eprintln!("Error registering procedure: {}", e),
}

println!("Press Ctrl+C to exit");
Expand Down
113 changes: 110 additions & 3 deletions src/commands/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,113 @@
use xconn::async_::session::Session;
use crate::config::{ConnectionConfig, SubscribeConfig};
use crate::utils::{CommandOutput, wamp_async_value_to_serde};
use std::sync::Arc;
use tokio::signal;
use tokio::sync::Semaphore;
use xconn::async_::{Event, SubscribeRequest};

/// Builds a SubscribeRequest from the SubscribeConfig.
fn build_subscribe_request(config: &SubscribeConfig) -> SubscribeRequest {
// Note: SubscribeRequest doesn't support options via builder pattern
// Options would need to be added at the xconn-rust library level
SubscribeRequest::new(&config.topic, event_handler)
}

async fn event_handler(event: Event) {
let output = CommandOutput {
args: event.args.iter().map(wamp_async_value_to_serde).collect(),
kwargs: event
.kwargs
.iter()
.map(|(k, v): (_, _)| (k.clone(), wamp_async_value_to_serde(v)))
.collect(),
};

match serde_json::to_string_pretty(&output) {
Ok(json) => println!("{}", json),
Err(e) => eprintln!("Error serializing event: {}", e),
}
}

/// Runs a single subscribe session: connects, subscribes, and waits.
async fn run_session(
conn_config: Arc<ConnectionConfig>,
subscribe_config: Arc<SubscribeConfig>,
session_id: u32,
shutdown: tokio::sync::watch::Receiver<bool>,
) {
let session = match conn_config.connect().await {
Ok(s) => s,
Err(e) => {
eprintln!("Session {} Connection Error: {}", session_id, e);
return;
}
};

let request = build_subscribe_request(&subscribe_config);

match session.subscribe(request).await {
Ok(_) => {
if subscribe_config.parallel > 1 {
println!(
"Session {}: Subscribed to topic '{}'",
session_id, subscribe_config.topic
);
} else {
println!("Subscribed to topic '{}'", subscribe_config.topic);
}
}
Err(e) => {
eprintln!("Session {} Subscribe Error: {}", session_id, e);
return;
}
}

// Wait for shutdown signal
let mut shutdown = shutdown;
let _ = shutdown.changed().await;

if let Err(e) = session.leave().await {
eprintln!("Session {} Error leaving: {}", session_id, e);
}
}

pub async fn handle(
conn_config: ConnectionConfig,
subscribe_config: SubscribeConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let semaphore = Arc::new(Semaphore::new(subscribe_config.concurrency));
let conn_config = Arc::new(conn_config);
let subscribe_config = Arc::new(subscribe_config);

// Create shutdown channel
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);

let mut handles = Vec::with_capacity(subscribe_config.parallel as usize);

for session_id in 1..=subscribe_config.parallel {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let conn_config = conn_config.clone();
let subscribe_config = subscribe_config.clone();
let shutdown_rx = shutdown_rx.clone();

let handle = tokio::spawn(async move {
let _permit = permit;
run_session(conn_config, subscribe_config, session_id, shutdown_rx).await;
});

handles.push(handle);
}

println!("Press Ctrl+C to exit");
signal::ctrl_c().await?;
println!("Exiting...");

// Signal all sessions to shutdown
let _ = shutdown_tx.send(true);

for handle in handles {
let _ = handle.await;
}

pub async fn handle(_session: &Session) -> Result<(), Box<dyn std::error::Error>> {
println!("Subcommand 'subscribe' executed");
Ok(())
}
21 changes: 21 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,24 @@ pub struct CallConfig {
pub parallel: u32,
pub concurrency: usize,
}

/// Configuration specific to the Publish command.
#[derive(Debug, Clone)]
pub struct PublishConfig {
pub topic: String,
pub args: Vec<String>,
pub kwargs: Vec<String>,
pub options: Vec<String>,
pub repeat: u32,
pub parallel: u32,
pub concurrency: usize,
pub acknowledge: bool,
}

/// Configuration specific to the Subscribe command.
#[derive(Debug, Clone)]
pub struct SubscribeConfig {
pub topic: String,
pub parallel: u32,
pub concurrency: usize,
}
Loading
Loading