Skip to content

Max/asyncread asyncwrite nym client#6318

Open
mfahampshire wants to merge 24 commits intodevelopfrom
max/asyncread-asyncwrite-nym-client
Open

Max/asyncread asyncwrite nym client#6318
mfahampshire wants to merge 24 commits intodevelopfrom
max/asyncread-asyncwrite-nym-client

Conversation

@mfahampshire
Copy link
Contributor

@mfahampshire mfahampshire commented Jan 11, 2026

Breaking up #6129 into more reasonably sized chunks.

This is the first which is basically the original addition of AsyncRead/AsyncWrite to the core client.


This change is Reviewable

@mfahampshire mfahampshire marked this pull request as draft January 11, 2026 21:41
@vercel
Copy link

vercel bot commented Jan 11, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
nym-explorer-v2 Ready Ready Preview, Comment Feb 2, 2026 8:46pm
2 Skipped Deployments
Project Deployment Actions Updated (UTC)
docs-nextra Ignored Ignored Preview Feb 2, 2026 8:46pm
nym-node-status Ignored Ignored Preview Feb 2, 2026 8:46pm

Request Review

) -> Result<Vec<u64>> {
let mut request_ids = Vec::with_capacity(3);

for _ in 1..=3 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this changes semantics of the original call. the former was performing all 3 pings concurrently. the updated variant does it sequentially - make sure this is what you want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops - fixed this!

self
}
#[allow(clippy::expect_used)]
pub fn serialized_size(&self) -> u64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem to be used anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its used in sdk/rust/nym-sdk/src/mixnet/native_client.rs - removed expect_used.

}
}

pub struct InputMessageCodec;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure InputMessage is the right abstraction for the AsyncRead/AsyncWrite impl? if you're concerned about reading/writing bytes, why should you care about internal TransmissionLane (that's only used for backpressure)?

wouldn't it be sufficient to just use the inbuilt LengthDelimitedCodec wrapped around simple Vec<u8> and then call normal "send" within the client on that message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we remove the InputMessage format, don't we lose info (e.g. if its a reply)? I can have a go at it, but been thinking it through, and feels like we lose something there?

}

// MAX TODO implement for v1 as well for back compat? - this was added in the original asyncread/write work when we only had one v
impl Serialize for MixPacket {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given we have to implement serde for internal MixPacket makes me think we chose wrong abstraction. client really shouldn't be concerned about this guy. it's just a wrapper for data that's eventually going to get encoded in websocket messages for the gateway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed need for serde

// TEMP UNTIL FURTHER REFACTORING
pub use preparer::payload::NymPayloadBuilder;

fn make_bincode_serializer() -> impl bincode::Options {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how to feel about using bincode inside sphinx itself...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it out of the nym-sphinx crate

Poll::Ready(Ok(()))
} else {
let written = buf.capacity();
buf.put_slice(&self._read.buffer[..written]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use split method on the BytesMut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


let msg_size = msg.serialized_size();

let mut fut = pin!(self.client_input.send(msg));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're creating a new send future every single time you call poll_write without letting the previous one resolve. you will have to actually store the future on the MixnetClient itself and check for its existence in subsequent calls

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a random thought (don't know if it's a good idea!): what if AsyncRead / AsyncWrite was changed to the simplest possible variant where they only read/write bytes to internal client buffers without any encoding. but there would be a dedicated internal client task/thread that periodically would check those buffers and send/recv from the mixnet on a timer based on their contents

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're creating a new send future every single time you call poll_write without letting the previous one resolve. you will have to actually store the future on the MixnetClient itself and check for its existence in subsequent calls

This should be fixed now :)

self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
Sink::poll_flush(self, cx).map_err(|_| std::io::Error::other("failed to flush the sink"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if calling the internal Sink impl for this guy is a good call

self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::prelude::v1::Result<(), std::io::Error>> {
AsyncWrite::poll_flush(self, cx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't have anything substantial to do here, it's fine to just return an Ok(()) straightaway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment it's just checking the write buffer doesn't have anything to send, otherwise just returning Ok(()) (after fixing the pending future problem this code has changed a bit)


let msg_size = msg.serialized_size();

let mut fut = pin!(self.client_input.send(msg));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same problem as with AsyncRead - this future will never have time to resolve unless it was already ready and won't wake the waker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants