Skip to content

Tcp End#98

Open
SHshenhao wants to merge 2 commits into
mainfrom
tcp-v3
Open

Tcp End#98
SHshenhao wants to merge 2 commits into
mainfrom
tcp-v3

Conversation

@SHshenhao
Copy link
Copy Markdown

add Tcp End

root and others added 2 commits May 14, 2026 13:24
Introduce a standalone-asio-based TcpEndpoint in dlslime/csrc/engine/tcp/
with four async communication primitives, all supporting timeout (default 30s).

Architecture highlights:
- 17-byte SessionHeader (Mooncake-aligned): {size, addr, opcode} with 3 opcodes
  (OP_SEND, OP_READ, OP_WRITE) supporting 4 primitives (recv matched passively)
- TcpContext: shared io_context + connection pool + background thread,
  multiple endpoints can share one context to reduce thread count
- TcpConnectionPool: (host, port)-keyed connection reuse, 60s idle timeout
- ServerSession: async_read callback chain (readHeader->dispatch->readBody loop)
  with 64KB chunked reads for large payloads
- Symmetric connection rendezvous (is_initiator by host:port comparison)

Async primitives:
- async_send(chunk, timeout_ms=30000): post to io_ctx, async_write, signal future
- async_recv(chunk, timeout_ms=30000): FIFO registration, ServerSession matches
  incoming OP_SEND, memcpy to user buffer, signal future
- async_read(assign, timeout_ms=30000): post OP_READ header, async_read response
  data, connection reserved until response arrives
- async_write(assign, timeout_ms=30000): post OP_WRITE header+payload via
  async_write, signal future

Timeout: SO_SNDTIMEO on socket for send/write, future.wait_for(ms) timed
busy-spin (machnet_pause) for recv/read.  All return TcpFuture with wait()
and wait_for(seconds) -> int|None.

Files: 16 new (10 in tcp/), 5 modified (CMakeLists chain + bind.cpp)
Tests: 5 Python cases (send/recv, write/read, recv timeout, send timeout,
default timeout) all pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
- Remove void* stream from all 4 async_* methods (RDMA leftover, never used)
- Remove timeout_ms from async_recv (recv timeout via future.wait_for())
- Remove ineffective SO_SNDTIMEO calls (no effect on asio::async_write)
- Update pybind11 bindings and tests to match
- Add tcp/plan.md with v3 architecture documentation

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@SHshenhao SHshenhao requested a deployment to self-hosted-rdma May 14, 2026 14:19 — with GitHub Actions Waiting
@JimyMa JimyMa self-requested a review May 14, 2026 14:22
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


root seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@JimyMa
Copy link
Copy Markdown
Contributor

JimyMa commented May 16, 2026

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new TCP transport layer for the dlslime engine, featuring an asio-based asynchronous I/O architecture, connection pooling, and Python bindings. While the implementation provides a comprehensive framework for TCP communication, the code review identified several critical security and stability issues. These include arbitrary memory access vulnerabilities in the session handler, potential use-after-free and dangling reference bugs in asynchronous callbacks, and a lack of thread synchronization in the memory pool. Furthermore, the reviewer highlighted performance concerns regarding synchronous blocking during connection establishment and inefficient busy-waiting in the future's timeout logic.

Comment on lines +75 to +81
uintptr_t addr = static_cast<uintptr_t>(header_.addr);
size_t sz = static_cast<size_t>(header_.size);
if (sz == 0) { readHeader(); return; }
// Write back raw data — no header on the response.
auto self = shared_from_this();
asio::async_write(socket_,
asio::buffer(reinterpret_cast<const void*>(addr), sz),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

Security Vulnerability: Arbitrary Memory Read. The OP_READ handler uses header_.addr directly as a memory address to read data from. Since this address is provided by the remote peer over the network, a malicious peer could read any memory location in the process. You must validate that the requested address and size fall within a registered and authorized memory region.

Comment on lines +119 to +121
uintptr_t addr = static_cast<uintptr_t>(header_.addr);
std::memcpy(reinterpret_cast<void*>(addr),
chunk_buf_.data(), header_.size);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

Security Vulnerability: Arbitrary Memory Write. The OP_WRITE handler uses header_.addr directly as a destination for std::memcpy. A malicious peer could overwrite any memory location in the process. You must validate that the destination address and size fall within a registered and authorized memory region.

Comment on lines +200 to +205
SessionHeader net = hdr;
hdr_hton(net);
std::array<asio::const_buffer, 2> bufs = {
asio::buffer(&net, sizeof(net)),
asio::buffer(reinterpret_cast<const void*>(src), len)
};
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Critical Bug: Use-after-free / Lifetime Issue. The net variable is a local variable within the lambda posted to io_context. However, asio::async_write is an asynchronous operation that returns immediately. By the time the actual I/O occurs, the outer lambda may have finished, and net will have been destroyed. The asio::buffer will then point to invalid memory. You should store the header in a way that its lifetime is guaranteed until the completion handler is called (e.g., inside the op state or as a member of a captured shared object).

Comment on lines +207 to +212
[conn, op, &pool](asio::error_code ec, size_t) {
op->completion_status.store(
ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release);
if (op->signal) op->signal->set_comm_done(0);
pool.returnConnection(conn);
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Dangling Reference: The completion handler for async_write captures &pool by reference. pool is a local reference variable in the async_send function. When the completion handler eventually runs, the async_send function has long since returned, making &pool a dangling reference. You should access the connection pool through the ep pointer (which is safely locked from a weak_ptr) or capture the TcpContext shared pointer.

Comment on lines +30 to +34
tcp::resolver resolver(io_ctx_);
auto endpoints = resolver.resolve(host, std::to_string(port));
tcp::socket sock(io_ctx_);
asio::error_code ec;
asio::connect(sock, endpoints, ec);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Performance Issue: Synchronous Blocking in Async API. TcpConnectionPool::getConnection performs synchronous DNS resolution and socket connection. When called from TcpEndpoint::async_send (or other async methods) on the caller's thread, it will block that thread until the connection is established. This defeats the purpose of an asynchronous API, especially in Python where it might block the GIL or other event loops. These operations should be performed asynchronously using asio::ip::tcp::resolver::async_resolve and asio::ip::tcp::socket::async_connect.

Comment on lines +8 to +29
int32_t TcpMemoryPool::register_memory_region(
uintptr_t addr, uint64_t offset, size_t length,
std::optional<std::string> name) {

auto pit = ptr_to_handle_.find(addr);
if (pit != ptr_to_handle_.end()) {
int32_t h = pit->second;
if (h >= 0 && static_cast<size_t>(h) < handle_to_mr_.size()
&& handle_to_mr_[h].addr == addr
&& handle_to_mr_[h].length >= length) {
if (name.has_value()) name_to_handle_[*name] = h;
return h;
}
}

int32_t h = static_cast<int32_t>(handle_to_mr_.size());
handle_to_mr_.push_back({addr, offset, length});
handle_to_name_.push_back(name.value_or(""));
ptr_to_handle_[addr] = h;
if (name.has_value()) name_to_handle_[*name] = h;
return h;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Thread Safety Issue: TcpMemoryPool is not thread-safe. It uses std::unordered_map and std::vector without any synchronization. Since register_memory_region can be called from multiple Python threads and get_mr_fast is called from the io_context thread, this will lead to race conditions and potential crashes. Add a mutex to protect the internal data structures.

Comment on lines +34 to +55
while (true) {
if (op_state_->signal) {
uint32_t m = op_state_->signal->get_comm_done_mask();
if ((m & op_state_->expected_mask) == op_state_->expected_mask) {
if (out) *out = op_state_->completion_status.load(
std::memory_order_acquire);
return true;
}
}
if (std::chrono::steady_clock::now() >= deadline) {
if (op_state_->signal) {
uint32_t m = op_state_->signal->get_comm_done_mask();
if ((m & op_state_->expected_mask) == op_state_->expected_mask) {
if (out) *out = op_state_->completion_status.load(
std::memory_order_acquire);
return true;
}
}
return false;
}
machnet_pause();
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Efficiency Issue: The wait_for implementation uses a spin-loop with machnet_pause(). While this provides low latency, it consumes 100% of a CPU core while waiting. For a TCP transport where latencies are typically in the millisecond range, it is much more efficient to use a condition variable or asio timers to block the thread without busy-waiting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants