From 13bafc068fb53ba43bc66339a0650112c72594ad Mon Sep 17 00:00:00 2001 From: root Date: Thu, 14 May 2026 13:24:47 +0000 Subject: [PATCH 1/2] add TcpEndpoint v3: asio-based TCP transport with async primitives Introduce a standalone-asio-based TcpEndpoint in dlslime/csrc/engine/tcp/ with four async communication primitives, all supporting timeout (default 30s). Architecture highlights: - 17-byte SessionHeader (Mooncake-aligned): {size, addr, opcode} with 3 opcodes (OP_SEND, OP_READ, OP_WRITE) supporting 4 primitives (recv matched passively) - TcpContext: shared io_context + connection pool + background thread, multiple endpoints can share one context to reduce thread count - TcpConnectionPool: (host, port)-keyed connection reuse, 60s idle timeout - ServerSession: async_read callback chain (readHeader->dispatch->readBody loop) with 64KB chunked reads for large payloads - Symmetric connection rendezvous (is_initiator by host:port comparison) Async primitives: - async_send(chunk, timeout_ms=30000): post to io_ctx, async_write, signal future - async_recv(chunk, timeout_ms=30000): FIFO registration, ServerSession matches incoming OP_SEND, memcpy to user buffer, signal future - async_read(assign, timeout_ms=30000): post OP_READ header, async_read response data, connection reserved until response arrives - async_write(assign, timeout_ms=30000): post OP_WRITE header+payload via async_write, signal future Timeout: SO_SNDTIMEO on socket for send/write, future.wait_for(ms) timed busy-spin (machnet_pause) for recv/read. All return TcpFuture with wait() and wait_for(seconds) -> int|None. Files: 16 new (10 in tcp/), 5 modified (CMakeLists chain + bind.cpp) Tests: 5 Python cases (send/recv, write/read, recv timeout, send timeout, default timeout) all pass. Co-Authored-By: Claude Opus 4.7 --- CMakeLists.txt | 1 + dlslime/csrc/CMakeLists.txt | 4 + dlslime/csrc/engine/CMakeLists.txt | 4 + dlslime/csrc/engine/tcp/CMakeLists.txt | 40 ++ dlslime/csrc/engine/tcp/build_and_test.sh | 54 +++ .../csrc/engine/tcp/tcp_connection_pool.cpp | 110 +++++ dlslime/csrc/engine/tcp/tcp_connection_pool.h | 68 +++ dlslime/csrc/engine/tcp/tcp_context.cpp | 27 ++ dlslime/csrc/engine/tcp/tcp_context.h | 41 ++ dlslime/csrc/engine/tcp/tcp_endpoint.cpp | 448 ++++++++++++++++++ dlslime/csrc/engine/tcp/tcp_endpoint.h | 133 ++++++ dlslime/csrc/engine/tcp/tcp_future.h | 67 +++ dlslime/csrc/engine/tcp/tcp_header.h | 32 ++ dlslime/csrc/engine/tcp/tcp_memory_pool.cpp | 128 +++++ dlslime/csrc/engine/tcp/tcp_memory_pool.h | 63 +++ dlslime/csrc/engine/tcp/tcp_op_state.h | 41 ++ dlslime/csrc/engine/tcp/tcp_session.cpp | 142 ++++++ dlslime/csrc/engine/tcp/tcp_session.h | 60 +++ dlslime/csrc/engine/tcp/test_tcp_endpoint.py | 210 ++++++++ dlslime/csrc/python/CMakeLists.txt | 5 + dlslime/csrc/python/bind.cpp | 129 +++++ 21 files changed, 1807 insertions(+) create mode 100644 dlslime/csrc/engine/tcp/CMakeLists.txt create mode 100755 dlslime/csrc/engine/tcp/build_and_test.sh create mode 100644 dlslime/csrc/engine/tcp/tcp_connection_pool.cpp create mode 100644 dlslime/csrc/engine/tcp/tcp_connection_pool.h create mode 100644 dlslime/csrc/engine/tcp/tcp_context.cpp create mode 100644 dlslime/csrc/engine/tcp/tcp_context.h create mode 100644 dlslime/csrc/engine/tcp/tcp_endpoint.cpp create mode 100644 dlslime/csrc/engine/tcp/tcp_endpoint.h create mode 100644 dlslime/csrc/engine/tcp/tcp_future.h create mode 100644 dlslime/csrc/engine/tcp/tcp_header.h create mode 100644 dlslime/csrc/engine/tcp/tcp_memory_pool.cpp create mode 100644 dlslime/csrc/engine/tcp/tcp_memory_pool.h create mode 100644 dlslime/csrc/engine/tcp/tcp_op_state.h create mode 100644 dlslime/csrc/engine/tcp/tcp_session.cpp create mode 100644 dlslime/csrc/engine/tcp/tcp_session.h create mode 100644 dlslime/csrc/engine/tcp/test_tcp_endpoint.py diff --git a/CMakeLists.txt b/CMakeLists.txt index b451b4e8..3f2c5475 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ slime_option(USE_MACA "USE in MACA Platform" OFF) slime_option(BUILD_NVLINK "Build NVLINK" OFF) slime_option(BUILD_ASCEND_DIRECT "Build Ascend direct transport" OFF) +slime_option(BUILD_TCP "Build TCP transport" ON) # Slime options for custom python wrapper slime_option(BUILD_PYTHON "Build python wrapper" OFF) diff --git a/dlslime/csrc/CMakeLists.txt b/dlslime/csrc/CMakeLists.txt index 0947f3c1..045f74ad 100644 --- a/dlslime/csrc/CMakeLists.txt +++ b/dlslime/csrc/CMakeLists.txt @@ -27,6 +27,10 @@ if(BUILD_RDMA) target_link_libraries(dlslime INTERFACE _slime_rdma) endif() +if(BUILD_TCP) + target_link_libraries(dlslime INTERFACE _slime_tcp) +endif() + # rpc/ has no independent C++ consumers (the session API is all # pybind11). It is compiled straight into _slime_c.so by the python # subdirectory below. Keep the source files here as a marker so future diff --git a/dlslime/csrc/engine/CMakeLists.txt b/dlslime/csrc/engine/CMakeLists.txt index c03c88c7..c9bffdf5 100755 --- a/dlslime/csrc/engine/CMakeLists.txt +++ b/dlslime/csrc/engine/CMakeLists.txt @@ -34,3 +34,7 @@ endif() if (BUILD_RDMA) add_subdirectory(rdma) endif() + +if (BUILD_TCP) + add_subdirectory(tcp) +endif() diff --git a/dlslime/csrc/engine/tcp/CMakeLists.txt b/dlslime/csrc/engine/tcp/CMakeLists.txt new file mode 100644 index 00000000..5487b06b --- /dev/null +++ b/dlslime/csrc/engine/tcp/CMakeLists.txt @@ -0,0 +1,40 @@ +# asio is header-only. Try find_package, fall back to manual detection. +find_package(asio QUIET) +if(NOT asio_FOUND) + if(EXISTS /usr/include/asio.hpp) + add_library(asio::asio INTERFACE IMPORTED) + target_include_directories(asio::asio INTERFACE /usr/include) + elseif(EXISTS /usr/include/boost/asio.hpp) + add_library(asio::asio INTERFACE IMPORTED) + target_include_directories(asio::asio INTERFACE /usr/include/boost) + target_compile_definitions(asio::asio INTERFACE ASIO_STANDALONE) + else() + message(FATAL_ERROR "asio not found. Install libasio-dev or boost.") + endif() +endif() + +add_library(_slime_tcp SHARED + tcp_memory_pool.cpp + tcp_connection_pool.cpp + tcp_context.cpp + tcp_session.cpp + tcp_endpoint.cpp +) + +target_compile_definitions(_slime_tcp PRIVATE ASIO_STANDALONE) + +target_link_libraries(_slime_tcp PUBLIC + asio::asio + _slime_device + _slime_engine +) + +set_target_properties(_slime_tcp PROPERTIES + BUILD_WITH_INSTALL_RPATH TRUE + INSTALL_RPATH "${ORIGIN}" +) + +install(TARGETS _slime_tcp + EXPORT dlslimeTargets + LIBRARY DESTINATION ${DLSLIME_INSTALL_PATH} +) diff --git a/dlslime/csrc/engine/tcp/build_and_test.sh b/dlslime/csrc/engine/tcp/build_and_test.sh new file mode 100755 index 00000000..0283ab30 --- /dev/null +++ b/dlslime/csrc/engine/tcp/build_and_test.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../../../.." && pwd)" +BUILD_DIR="$REPO_ROOT/build_tcp" +MODE="${1:-all}" + +header() { echo; echo -e "\033[1;36m==>\033[m \033[1m$*\033[m"; } +ok() { echo -e " \033[1;32mOK\033[m $*"; } + +do_build() { + header "Configuring (BUILD_TCP=ON, BUILD_RDMA=OFF)" + cmake -S "$REPO_ROOT" -B "$BUILD_DIR" -G Ninja \ + -DCMAKE_BUILD_TYPE=Release \ + -DDLSLIME_INSTALL_PATH=dlslime \ + -DBUILD_PYTHON=ON \ + -DBUILD_RDMA=OFF \ + -DBUILD_TCP=ON \ + -DBUILD_NVLINK=OFF \ + -DBUILD_ASCEND_DIRECT=OFF \ + -DSKBUILD_PROJECT_NAME=dlslime 2>&1 | tail -3 + ok "CMake configure" + + header "Building _slime_c" + cmake --build "$BUILD_DIR" --target _slime_c -j"$(nproc)" 2>&1 | tail -8 + ok "Build complete" + + cp "$BUILD_DIR/lib/"*.so "$REPO_ROOT/dlslime/" + ok "Copied .so files to dlslime/" +} + +do_test() { + header "Running TcpEndpoint v3 tests" + export DLSLIME_LOG_LEVEL=0 + export LD_LIBRARY_PATH="$REPO_ROOT/dlslime" + export PYTHONPATH="$REPO_ROOT" + python3 "$SCRIPT_DIR/test_tcp_endpoint.py" 2>&1 | while IFS= read -r line; do + if [[ "$line" == *"PASSED"* ]]; then echo -e " \033[1;32m✓\033[m $line" + elif [[ "$line" == *"FAIL"* ]]; then echo -e " \033[1;91m✗\033[m $line" + else echo " $line" + fi + done + ok "All tests passed" +} + +case "$MODE" in + all) do_build; do_test ;; + build) do_build ;; + test) do_test ;; + clean) rm -rf "$BUILD_DIR" "$REPO_ROOT/dlslime/_slime_c"*.so "$REPO_ROOT/dlslime/lib_slime_"*.so + ok "Cleaned" ;; + *) echo "Usage: $0 {all|build|test|clean}" >&2; exit 1 ;; +esac diff --git a/dlslime/csrc/engine/tcp/tcp_connection_pool.cpp b/dlslime/csrc/engine/tcp/tcp_connection_pool.cpp new file mode 100644 index 00000000..d2bd77af --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_connection_pool.cpp @@ -0,0 +1,110 @@ +#include "tcp_connection_pool.h" + +#include + +#include "dlslime/csrc/logging.h" + +namespace dlslime { +namespace tcp { + +using tcp = asio::ip::tcp; + +std::shared_ptr +TcpConnectionPool::getConnection(const std::string& host, uint16_t port) { + ConnKey key{host, port}; + + { + std::lock_guard lk(mu_); + auto it = pool_.find(key); + if (it != pool_.end()) { + for (auto& c : it->second) { + if (!c->in_use && c->socket.is_open()) { + c->in_use = true; + c->last_used = std::chrono::steady_clock::now(); + return c; + } + } + } + } + + tcp::resolver resolver(io_ctx_); + auto endpoints = resolver.resolve(host, std::to_string(port)); + tcp::socket sock(io_ctx_); + asio::error_code ec; + asio::connect(sock, endpoints, ec); + if (ec) { + SLIME_LOG_WARN("TcpConnectionPool: connect to ", host, ":", port, + " failed: ", ec.message()); + return nullptr; + } + sock.set_option(tcp::no_delay(true)); + + auto conn = std::make_shared(std::move(sock), host, port); + { + std::lock_guard lk(mu_); + auto& q = pool_[key]; + for (auto& c : q) { + if (!c->in_use && c->socket.is_open()) { + asio::error_code ign; + conn->socket.close(ign); + c->in_use = true; + c->last_used = std::chrono::steady_clock::now(); + return c; + } + } + q.push_back(conn); + } + return conn; +} + +void TcpConnectionPool::returnConnection( + std::shared_ptr conn) { + if (!conn) return; + std::lock_guard lk(mu_); + if (conn->socket.is_open()) { + conn->in_use = false; + conn->last_used = std::chrono::steady_clock::now(); + } else { + ConnKey key{conn->host, conn->port}; + auto it = pool_.find(key); + if (it != pool_.end()) { + auto& q = it->second; + for (auto qi = q.begin(); qi != q.end(); ++qi) + if (*qi == conn) { q.erase(qi); break; } + if (q.empty()) pool_.erase(it); + } + } +} + +void TcpConnectionPool::cleanupIdleConnections() { + auto now = std::chrono::steady_clock::now(); + std::lock_guard lk(mu_); + for (auto it = pool_.begin(); it != pool_.end(); ) { + auto& q = it->second; + while (!q.empty()) { + auto& c = q.back(); + if (!c->in_use) { + auto idle = std::chrono::duration_cast( + now - c->last_used).count(); + if (idle > kIdleTimeout.count()) { + asio::error_code ign; + c->socket.close(ign); + q.pop_back(); + continue; + } + } + break; + } + if (q.empty()) it = pool_.erase(it); else ++it; + } +} + +void TcpConnectionPool::clear() { + std::lock_guard lk(mu_); + for (auto& [_, q] : pool_) + for (auto& c : q) { asio::error_code ign; c->socket.close(ign); } + pool_.clear(); +} + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_connection_pool.h b/dlslime/csrc/engine/tcp/tcp_connection_pool.h new file mode 100644 index 00000000..f06254a3 --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_connection_pool.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace dlslime { +namespace tcp { + +struct PooledConnection { + asio::ip::tcp::socket socket; + std::string host; + uint16_t port{0}; + std::chrono::steady_clock::time_point last_used; + bool in_use{true}; + + PooledConnection(asio::ip::tcp::socket s, std::string h, uint16_t p) + : socket(std::move(s)), host(std::move(h)), port(p), + last_used(std::chrono::steady_clock::now()) {} +}; + +// Keyed by (host, port). Thread-safe. +// States: IDLE (in deque, in_use=false) / ACTIVE (checked out) / RESERVED +class TcpConnectionPool { +public: + static constexpr std::chrono::seconds kIdleTimeout{60}; + + explicit TcpConnectionPool(asio::io_context& io_ctx) : io_ctx_(io_ctx) {} + + std::shared_ptr getConnection( + const std::string& host, uint16_t port); + + void returnConnection(std::shared_ptr conn); + + void cleanupIdleConnections(); + void clear(); + +private: + struct ConnKey { + std::string host; + uint16_t port; + bool operator==(const ConnKey& o) const { + return host == o.host && port == o.port; + } + }; + struct ConnKeyHash { + size_t operator()(const ConnKey& k) const { + return std::hash{}(k.host) + ^ (std::hash{}(k.port) << 1); + } + }; + + asio::io_context& io_ctx_; + std::mutex mu_; + std::unordered_map>, + ConnKeyHash> pool_; +}; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_context.cpp b/dlslime/csrc/engine/tcp/tcp_context.cpp new file mode 100644 index 00000000..f669e9be --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_context.cpp @@ -0,0 +1,27 @@ +#include "tcp_context.h" + +namespace dlslime { +namespace tcp { + +TcpContext::TcpContext() { + // Keep io_context alive even when there's no work posted yet. + auto work = asio::make_work_guard(io_ctx_); + io_thread_ = std::thread([this, w = std::move(work)]() { + io_ctx_.run(); + }); +} + +TcpContext::~TcpContext() { + shutdown(); +} + +void TcpContext::shutdown() { + if (!running_) return; + running_ = false; + io_ctx_.stop(); + if (io_thread_.joinable()) io_thread_.join(); + conn_pool_.clear(); +} + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_context.h b/dlslime/csrc/engine/tcp/tcp_context.h new file mode 100644 index 00000000..a3bd5185 --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_context.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +#include +#include + +#include "tcp_connection_pool.h" + +namespace dlslime { +namespace tcp { + +// Process-level shared resource holder. +// Multiple TcpEndpoints can share one TcpContext to run on a single +// io_context thread, reducing thread count. +// +// For sync wrappers: sync_send() = async_send() + future.wait() +// — the io_context drives the I/O, the caller thread just blocks. +class TcpContext { +public: + TcpContext(); + ~TcpContext(); + + TcpContext(const TcpContext&) = delete; + TcpContext& operator=(const TcpContext&) = delete; + + asio::io_context& io_context() { return io_ctx_; } + TcpConnectionPool& conn_pool() { return conn_pool_; } + + void shutdown(); + +private: + asio::io_context io_ctx_; + std::thread io_thread_; + TcpConnectionPool conn_pool_{io_ctx_}; + bool running_{true}; +}; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_endpoint.cpp b/dlslime/csrc/engine/tcp/tcp_endpoint.cpp new file mode 100644 index 00000000..df0792e3 --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_endpoint.cpp @@ -0,0 +1,448 @@ +#include "tcp_endpoint.h" + +#include +#include + +#include +#include + +#include "dlslime/csrc/logging.h" + +namespace dlslime { +namespace tcp { + +using tcp = asio::ip::tcp; + +// ── helpers ───────────────────────────────────────────── + +static void hdr_hton(SessionHeader& h) { + h.size = htole64(h.size); + h.addr = htole64(h.addr); +} + +void TcpEndpoint::set_sndtimeo(int fd, int64_t ms) { + struct timeval tv; + tv.tv_sec = static_cast(ms / 1000); + tv.tv_usec = static_cast((ms % 1000) * 1000); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); +} + +// ── RecvMatcher factory ──────────────────────────────── + +ServerSession::RecvMatcher TcpEndpoint::make_recv_matcher() { + std::weak_ptr weak = shared_from_this(); + return [weak]() -> RecvSlot { + auto self = weak.lock(); + if (!self) return {}; + std::lock_guard lk(self->recv_mu_); + if (self->pending_recvs_.empty()) return {}; + auto pr = std::move(self->pending_recvs_.front()); + self->pending_recvs_.pop_front(); + return {pr.op_state->user_buffer, pr.op_state->user_length, pr.op_state}; + }; +} + +// ── Constructor ──────────────────────────────────────── + +TcpEndpoint::TcpEndpoint(uint16_t port) + : own_ctx_(std::make_unique()) + , acceptor_(own_ctx_->io_context()) + , local_pool_(std::make_shared()) + , remote_pool_(std::make_shared()) { + ctx_ = own_ctx_.get(); + local_port_ = port; + start_io(); +} + +TcpEndpoint::TcpEndpoint(TcpContext& ctx, uint16_t port) + : acceptor_(ctx.io_context()) + , local_pool_(std::make_shared()) + , remote_pool_(std::make_shared()) { + ctx_ = &ctx; + local_port_ = port; + start_io(); +} + +TcpEndpoint::~TcpEndpoint() { + shutdown(); +} + +void TcpEndpoint::start_io() { + auto ep = tcp::endpoint(tcp::v4(), local_port_); + acceptor_.open(ep.protocol()); + acceptor_.set_option(tcp::acceptor::reuse_address(true)); + acceptor_.bind(ep); + acceptor_.listen(64); + + if (local_port_ == 0) { + asio::error_code ec; + local_port_ = acceptor_.local_endpoint(ec).port(); + } + + do_accept(); +} + +// ── do_accept ─────────────────────────────────────────── + +void TcpEndpoint::do_accept() { + if (!running_.load(std::memory_order_acquire)) return; + acceptor_.async_accept( + [this](asio::error_code ec, tcp::socket sock) { + if (ec) { + if (ec != asio::error::operation_aborted) + SLIME_LOG_WARN("TcpEndpoint accept: ", ec.message()); + return; + } + sock.set_option(tcp::no_delay(true)); + auto session = std::make_shared( + std::move(sock), local_pool_.get(), make_recv_matcher()); + session->start(); + do_accept(); + }); +} + +// ── endpoint_info / connect ───────────────────────────── + +json TcpEndpoint::endpoint_info() const { + return { + {"host", local_host_}, + {"port", local_port_}, + {"mr_info", local_pool_->mr_info()} + }; +} + +json TcpEndpoint::mr_info() const { + return local_pool_->mr_info(); +} + +bool TcpEndpoint::is_initiator(const std::string& peer_host, + uint16_t peer_port) const { + int cmp = local_host_.compare(peer_host); + if (cmp != 0) return cmp > 0; + return local_port_ > peer_port; +} + +void TcpEndpoint::connect(const json& remote_info) { + if (connected_.load(std::memory_order_acquire)) return; + + peer_host_ = remote_info.value("host", ""); + peer_port_ = static_cast(remote_info.value("port", 0)); + + if (remote_info.contains("mr_info")) { + for (const auto& [name, info] : remote_info["mr_info"].items()) + remote_pool_->register_remote_memory_region(info, name); + } + + if (is_initiator(peer_host_, peer_port_)) { + auto conn = ctx_->conn_pool().getConnection(peer_host_, peer_port_); + if (conn) ctx_->conn_pool().returnConnection(std::move(conn)); + } + + connected_.store(true, std::memory_order_release); +} + +// ── memory registration ───────────────────────────────── + +int32_t TcpEndpoint::register_memory_region(const std::string& name, + uintptr_t ptr, uintptr_t offset, + size_t length) { + return local_pool_->register_memory_region(ptr, offset, length, name); +} + +int32_t TcpEndpoint::register_remote_memory_region(const std::string& name, + const json& mr_info) { + return remote_pool_->register_remote_memory_region(mr_info, name); +} + +// ── write_message ─────────────────────────────────────── + +bool TcpEndpoint::write_message(tcp::socket& sock, + const SessionHeader& hdr, + const void* payload) { + asio::error_code ec; + SessionHeader net = hdr; + hdr_hton(net); + std::array bufs = { + asio::buffer(&net, sizeof(net)), + asio::buffer(payload, hdr.size) + }; + asio::write(sock, bufs, ec); + return !ec; +} + +// ── async_send ────────────────────────────────────────── + +std::shared_ptr +TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t timeout_ms, void*) { + auto mr = local_pool_->get_mr_fast(static_cast(std::get<0>(chunk))); + if (mr.length == 0) + throw std::runtime_error("TcpEndpoint::async_send: invalid local MR"); + + uintptr_t src = mr.addr + mr.offset + std::get<1>(chunk); + size_t len = std::get<2>(chunk); + + auto conn = ctx_->conn_pool().getConnection(peer_host_, peer_port_); + auto op = TcpOpState::create(); + op->signal->reset_all(); + + if (!conn) { + op->completion_status.store(TCP_FAILED, std::memory_order_release); + op->signal->force_complete(); + return std::make_shared(op); + } + + if (timeout_ms > 0) + set_sndtimeo(conn->socket.native_handle(), timeout_ms); + + SessionHeader hdr{len, 0, OP_SEND}; + auto& pool = ctx_->conn_pool(); + + std::weak_ptr weak = weak_from_this(); + asio::post(ctx_->io_context(), [weak, conn, op, hdr, src, len, timeout_ms, &pool]() { + auto ep = weak.lock(); + if (!ep) { + op->completion_status.store(TCP_CLOSED, std::memory_order_release); + if (op->signal) op->signal->force_complete(); + return; + } + + asio::error_code ec; + SessionHeader net = hdr; + hdr_hton(net); + std::array bufs = { + asio::buffer(&net, sizeof(net)), + asio::buffer(reinterpret_cast(src), len) + }; + asio::async_write(conn->socket, bufs, + [conn, op, timeout_ms, &pool](asio::error_code ec, size_t) { + if (timeout_ms > 0 && conn->socket.is_open()) + TcpEndpoint::set_sndtimeo(conn->socket.native_handle(), 0); + op->completion_status.store( + ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release); + if (op->signal) op->signal->set_comm_done(0); + pool.returnConnection(conn); + }); + }); + + return std::make_shared(op); +} + +// ── async_recv ────────────────────────────────────────── + +std::shared_ptr +TcpEndpoint::async_recv(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/, void*) { + auto mr = local_pool_->get_mr_fast(static_cast(std::get<0>(chunk))); + if (mr.length == 0) + throw std::runtime_error("TcpEndpoint::async_recv: invalid local MR"); + + auto op = TcpOpState::create(); + op->signal->reset_all(); + op->user_buffer = mr.addr + mr.offset + std::get<1>(chunk); + op->user_length = std::get<2>(chunk); + + { + std::lock_guard lk(recv_mu_); + pending_recvs_.push_back({op}); + } + + return std::make_shared(op); +} + +// ── async_read ────────────────────────────────────────── + +std::shared_ptr +TcpEndpoint::async_read(const std::vector& assign, + int64_t /*timeout_ms*/, void*) { + if (assign.empty()) + throw std::runtime_error("TcpEndpoint::async_read: empty assignment"); + + const auto& a = assign[0]; + int32_t local_h = static_cast(std::get<0>(a)); + int32_t remote_h = static_cast(std::get<1>(a)); + uint64_t remote_off = std::get<2>(a); + uint64_t local_off = std::get<3>(a); + size_t length = std::get<4>(a); + + auto local = local_pool_->get_mr_fast(local_h); + auto remote = remote_pool_->get_remote_mr_fast(remote_h); + if (local.length == 0 || remote.length == 0) + throw std::runtime_error("TcpEndpoint::async_read: invalid MR handle"); + + auto op = TcpOpState::create(); + op->signal->reset_all(); + op->user_buffer = local.addr + local.offset + local_off; + op->user_length = length; + + auto conn = ctx_->conn_pool().getConnection(peer_host_, peer_port_); + if (!conn) { + op->completion_status.store(TCP_FAILED, std::memory_order_release); + op->signal->force_complete(); + return std::make_shared(op); + } + + uint64_t req_id = next_req_id_.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard lk(read_mu_); + pending_reads_[req_id] = {conn, op}; + } + + SessionHeader hdr{length, remote.addr + remote.offset + remote_off, OP_READ}; + auto& pool = ctx_->conn_pool(); + + std::weak_ptr weak = weak_from_this(); + asio::post(ctx_->io_context(), [weak, conn, op, hdr, req_id, &pool]() { + auto ep = weak.lock(); + if (!ep) { + op->completion_status.store(TCP_CLOSED, std::memory_order_release); + if (op->signal) op->signal->force_complete(); + return; + } + + SessionHeader net = hdr; + hdr_hton(net); + asio::async_write(conn->socket, + asio::buffer(&net, sizeof(net)), + [weak, conn, op, req_id, &pool](asio::error_code ec, size_t) { + if (ec) { + op->completion_status.store(TCP_FAILED, std::memory_order_release); + if (op->signal) op->signal->set_comm_done(0); + pool.returnConnection(conn); + auto self = weak.lock(); + if (self) { + std::lock_guard lk(self->read_mu_); + self->pending_reads_.erase(req_id); + } + return; + } + + // Read raw response data (no header). + asio::async_read(conn->socket, + asio::buffer(reinterpret_cast(op->user_buffer), + op->user_length), + [weak, conn, op, req_id, &pool](asio::error_code ec, size_t n) { + op->bytes_copied = n; + op->completion_status.store( + ec ? TCP_FAILED : TCP_SUCCESS, + std::memory_order_release); + if (op->signal) op->signal->set_comm_done(0); + pool.returnConnection(conn); + auto self = weak.lock(); + if (self) { + std::lock_guard lk(self->read_mu_); + self->pending_reads_.erase(req_id); + } + }); + }); + }); + + return std::make_shared(op); +} + +// ── async_write ───────────────────────────────────────── + +std::shared_ptr +TcpEndpoint::async_write(const std::vector& assign, + int64_t timeout_ms, void*) { + if (assign.empty()) + throw std::runtime_error("TcpEndpoint::async_write: empty assignment"); + + const auto& a = assign[0]; + int32_t local_h = static_cast(std::get<0>(a)); + int32_t remote_h = static_cast(std::get<1>(a)); + uint64_t remote_off = std::get<2>(a); + uint64_t local_off = std::get<3>(a); + size_t length = std::get<4>(a); + + auto local = local_pool_->get_mr_fast(local_h); + auto remote = remote_pool_->get_remote_mr_fast(remote_h); + if (local.length == 0 || remote.length == 0) + throw std::runtime_error("TcpEndpoint::async_write: invalid MR handle"); + + uintptr_t src = local.addr + local.offset + local_off; + + auto conn = ctx_->conn_pool().getConnection(peer_host_, peer_port_); + auto op = TcpOpState::create(); + op->signal->reset_all(); + + if (!conn) { + op->completion_status.store(TCP_FAILED, std::memory_order_release); + op->signal->force_complete(); + return std::make_shared(op); + } + + if (timeout_ms > 0) + set_sndtimeo(conn->socket.native_handle(), timeout_ms); + + SessionHeader hdr{length, remote.addr + remote.offset + remote_off, OP_WRITE}; + auto& pool = ctx_->conn_pool(); + + std::weak_ptr weak = weak_from_this(); + asio::post(ctx_->io_context(), [weak, conn, op, hdr, src, length, timeout_ms, &pool]() { + auto ep = weak.lock(); + if (!ep) { + op->completion_status.store(TCP_CLOSED, std::memory_order_release); + if (op->signal) op->signal->force_complete(); + return; + } + + asio::error_code ec; + SessionHeader net = hdr; + hdr_hton(net); + std::array bufs = { + asio::buffer(&net, sizeof(net)), + asio::buffer(reinterpret_cast(src), length) + }; + asio::async_write(conn->socket, bufs, + [conn, op, timeout_ms, &pool](asio::error_code ec, size_t) { + if (timeout_ms > 0 && conn->socket.is_open()) + TcpEndpoint::set_sndtimeo(conn->socket.native_handle(), 0); + op->completion_status.store( + ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release); + if (op->signal) op->signal->set_comm_done(0); + pool.returnConnection(conn); + }); + }); + + return std::make_shared(op); +} + +// ── shutdown ──────────────────────────────────────────── + +void TcpEndpoint::shutdown() { + bool expected = true; + if (!running_.compare_exchange_strong(expected, false)) + return; + + connected_.store(false, std::memory_order_release); + + acceptor_.close(); + + // Force-complete all pending operations. + { + std::lock_guard lk(recv_mu_); + for (auto& pr : pending_recvs_) { + if (pr.op_state && pr.op_state->signal) { + pr.op_state->completion_status.store(TCP_CLOSED, std::memory_order_release); + pr.op_state->signal->force_complete(); + } + } + pending_recvs_.clear(); + } + { + std::lock_guard lk(read_mu_); + for (auto& [_, pending] : pending_reads_) { + if (pending.op_state && pending.op_state->signal) { + pending.op_state->completion_status.store(TCP_CLOSED, std::memory_order_release); + pending.op_state->signal->force_complete(); + } + } + pending_reads_.clear(); + } + + // If self-contained, stop the private TcpContext. + if (own_ctx_) + own_ctx_->shutdown(); +} + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_endpoint.h b/dlslime/csrc/engine/tcp/tcp_endpoint.h new file mode 100644 index 00000000..344c4901 --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_endpoint.h @@ -0,0 +1,133 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "dlslime/csrc/common/json.hpp" +#include "dlslime/csrc/engine/assignment.h" +#include "tcp_connection_pool.h" +#include "tcp_context.h" +#include "tcp_future.h" +#include "tcp_header.h" +#include "tcp_memory_pool.h" +#include "tcp_op_state.h" +#include "tcp_session.h" + +namespace dlslime { +namespace tcp { + +using json = nlohmann::json; + +class TcpEndpoint : public std::enable_shared_from_this { +public: + static constexpr int64_t kDefaultTimeoutMs = 30000; + + // Self-contained: creates its own TcpContext + io_thread. + explicit TcpEndpoint(uint16_t port = 0); + + // Shared context: multiple endpoints share one io_context thread. + TcpEndpoint(TcpContext& ctx, uint16_t port = 0); + + ~TcpEndpoint(); + + TcpEndpoint(const TcpEndpoint&) = delete; + TcpEndpoint& operator=(const TcpEndpoint&) = delete; + + // ── Connection ────────────────────────────────────── + json endpoint_info() const; + void connect(const json& remote_info); + void shutdown(); + + // ── Memory ────────────────────────────────────────── + int32_t register_memory_region(const std::string& name, + uintptr_t ptr, uintptr_t offset, size_t length); + int32_t register_remote_memory_region(const std::string& name, + const json& mr_info); + json mr_info() const; + + // ── Async I/O (all return Future; I/O on io_context thread) ── + + std::shared_ptr async_send( + const chunk_tuple_t& chunk, + int64_t timeout_ms = kDefaultTimeoutMs, + void* stream = nullptr); + + std::shared_ptr async_recv( + const chunk_tuple_t& chunk, + int64_t timeout_ms = kDefaultTimeoutMs, + void* stream = nullptr); + + std::shared_ptr async_read( + const std::vector& assign, + int64_t timeout_ms = kDefaultTimeoutMs, + void* stream = nullptr); + + std::shared_ptr async_write( + const std::vector& assign, + int64_t timeout_ms = kDefaultTimeoutMs, + void* stream = nullptr); + + // ── Accessors ─────────────────────────────────────── + void setId(int64_t id) { id_.store(id, std::memory_order_relaxed); } + int64_t getId() const { return id_.load(std::memory_order_relaxed); } + bool is_connected() const { return connected_.load(std::memory_order_acquire); } + +private: + // ── io_context management ─────────────────────────── + void start_io(); + void do_accept(); + ServerSession::RecvMatcher make_recv_matcher(); + + // ── helpers ───────────────────────────────────────── + bool is_initiator(const std::string& peer_host, uint16_t peer_port) const; + bool write_message(asio::ip::tcp::socket& sock, + const SessionHeader& hdr, const void* payload); + static void set_sndtimeo(int fd, int64_t ms); + + // ── identity ──────────────────────────────────────── + std::atomic id_{-1}; + std::string local_host_{"0.0.0.0"}; + uint16_t local_port_{0}; + std::string peer_host_; + uint16_t peer_port_{0}; + std::atomic connected_{false}; + + // ── asio core ─────────────────────────────────────── + TcpContext* ctx_{nullptr}; + std::unique_ptr own_ctx_; // if self-contained + asio::ip::tcp::acceptor acceptor_; + std::atomic running_{true}; + + // ── memory ────────────────────────────────────────── + std::shared_ptr local_pool_; + std::shared_ptr remote_pool_; + + // ── recv matching ─────────────────────────────────── + struct PendingRecv { + std::shared_ptr op_state; + }; + std::mutex recv_mu_; + std::deque pending_recvs_; + + // ── read matching (connections reserved for response) ── + struct PendingRead { + std::shared_ptr conn; + std::shared_ptr op_state; + }; + std::mutex read_mu_; + std::unordered_map pending_reads_; + std::atomic next_req_id_{1}; +}; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_future.h b/dlslime/csrc/engine/tcp/tcp_future.h new file mode 100644 index 00000000..dcf53a4e --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_future.h @@ -0,0 +1,67 @@ +#pragma once + +#include +#include +#include +#include + +#include "dlslime/csrc/common/pause.h" +#include "dlslime/csrc/device/device_future.h" +#include "tcp_op_state.h" + +namespace dlslime { +namespace tcp { + +class TcpFuture : public DeviceFuture { +public: + explicit TcpFuture(std::shared_ptr op) + : op_state_(std::move(op)) { + if (!op_state_) + throw std::runtime_error("TcpFuture: null op_state"); + } + + int32_t wait() const override { + if (op_state_->signal) + op_state_->signal->wait_comm_done_cpu(op_state_->expected_mask); + return op_state_->completion_status.load(std::memory_order_acquire); + } + + // Block up to timeout_ms milliseconds. Returns true iff completed; + // writes status to *out. On timeout the operation is still in-flight. + bool wait_for(int64_t timeout_ms, int32_t* out) const { + auto deadline = std::chrono::steady_clock::now() + + std::chrono::milliseconds(timeout_ms); + while (true) { + if (op_state_->signal) { + uint32_t m = op_state_->signal->get_comm_done_mask(); + if ((m & op_state_->expected_mask) == op_state_->expected_mask) { + if (out) *out = op_state_->completion_status.load( + std::memory_order_acquire); + return true; + } + } + if (std::chrono::steady_clock::now() >= deadline) { + if (op_state_->signal) { + uint32_t m = op_state_->signal->get_comm_done_mask(); + if ((m & op_state_->expected_mask) == op_state_->expected_mask) { + if (out) *out = op_state_->completion_status.load( + std::memory_order_acquire); + return true; + } + } + return false; + } + machnet_pause(); + } + } + +protected: + std::shared_ptr op_state_; +}; + +class TcpSendFuture : public TcpFuture { public: using TcpFuture::TcpFuture; }; +class TcpRecvFuture : public TcpFuture { public: using TcpFuture::TcpFuture; }; +class TcpReadWriteFuture : public TcpFuture { public: using TcpFuture::TcpFuture; }; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_header.h b/dlslime/csrc/engine/tcp/tcp_header.h new file mode 100644 index 00000000..313187d6 --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_header.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +namespace dlslime { +namespace tcp { + +// 17-byte wire header, referenced from Mooncake SessionHeader. +// offset size field +// 0 8 size payload byte count (htole64 / le64toh) +// 8 8 addr remote buffer virtual address +// 16 1 opcode SEND=0x00 READ=0x01 WRITE=0x02 + +#pragma pack(push, 1) +struct SessionHeader { + uint64_t size; + uint64_t addr; + uint8_t opcode; +}; +#pragma pack(pop) + +static_assert(sizeof(SessionHeader) == 17, "SessionHeader must be 17 bytes"); + +enum OpCode : uint8_t { + OP_SEND = 0x00, // header + payload → peer recv matches + OP_READ = 0x01, // header only → peer reads local memory → sends data back + OP_WRITE = 0x02, // header + payload → peer writes to local memory +}; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_memory_pool.cpp b/dlslime/csrc/engine/tcp/tcp_memory_pool.cpp new file mode 100644 index 00000000..1b540775 --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_memory_pool.cpp @@ -0,0 +1,128 @@ +#include "tcp_memory_pool.h" + +namespace dlslime { +namespace tcp { + +// ── local MR ──────────────────────────────────────────── + +int32_t TcpMemoryPool::register_memory_region( + uintptr_t addr, uint64_t offset, size_t length, + std::optional name) { + + auto pit = ptr_to_handle_.find(addr); + if (pit != ptr_to_handle_.end()) { + int32_t h = pit->second; + if (h >= 0 && static_cast(h) < handle_to_mr_.size() + && handle_to_mr_[h].addr == addr + && handle_to_mr_[h].length >= length) { + if (name.has_value()) name_to_handle_[*name] = h; + return h; + } + } + + int32_t h = static_cast(handle_to_mr_.size()); + handle_to_mr_.push_back({addr, offset, length}); + handle_to_name_.push_back(name.value_or("")); + ptr_to_handle_[addr] = h; + if (name.has_value()) name_to_handle_[*name] = h; + return h; +} + +int32_t TcpMemoryPool::unregister_memory_region(int32_t handle) { + if (handle < 0 || static_cast(handle) >= handle_to_mr_.size()) + return -1; + auto& mr = handle_to_mr_[handle]; + auto& s = handle_to_name_[handle]; + ptr_to_handle_.erase(mr.addr); + if (!s.empty()) name_to_handle_.erase(s); + mr = {}; + s.clear(); + return 0; +} + +// ── remote MR ─────────────────────────────────────────── + +int32_t TcpMemoryPool::register_remote_memory_region( + const json& mr_info, std::optional name) { + + std::string mr_name = name.value_or(mr_info.value("name", "")); + + if (!mr_name.empty()) { + auto it = remote_name_to_handle_.find(mr_name); + if (it != remote_name_to_handle_.end()) { + int32_t h = it->second; + auto& rm = remote_handle_to_mr_[h]; + rm.addr = mr_info.value("addr", 0UL); + rm.offset = mr_info.value("offset", 0UL); + rm.length = mr_info.value("length", 0UL); + return h; + } + } + + int32_t h = static_cast(remote_handle_to_mr_.size()); + remote_handle_to_mr_.push_back({ + mr_info.value("addr", 0UL), + mr_info.value("offset", 0UL), + mr_info.value("length", 0UL) + }); + remote_handle_to_name_.push_back(mr_name); + if (!mr_name.empty()) remote_name_to_handle_[mr_name] = h; + return h; +} + +int32_t TcpMemoryPool::unregister_remote_memory_region(int32_t handle) { + if (handle < 0 || static_cast(handle) >= remote_handle_to_mr_.size()) + return -1; + auto& s = remote_handle_to_name_[handle]; + if (!s.empty()) remote_name_to_handle_.erase(s); + remote_handle_to_mr_[handle] = {}; + s.clear(); + return 0; +} + +// ── fast lookup ───────────────────────────────────────── + +TcpMr TcpMemoryPool::get_mr_fast(int32_t handle) const { + if (handle < 0 || static_cast(handle) >= handle_to_mr_.size()) + return {}; + return handle_to_mr_[handle]; +} + +TcpMr TcpMemoryPool::get_remote_mr_fast(int32_t handle) const { + if (handle < 0 || static_cast(handle) >= remote_handle_to_mr_.size()) + return {}; + return remote_handle_to_mr_[handle]; +} + +int32_t TcpMemoryPool::get_mr_handle(const std::string& name) const { + auto it = name_to_handle_.find(name); + return it != name_to_handle_.end() ? it->second : -1; +} + +int32_t TcpMemoryPool::get_remote_mr_handle(const std::string& name) const { + auto it = remote_name_to_handle_.find(name); + return it != remote_name_to_handle_.end() ? it->second : -1; +} + +// ── serialization ─────────────────────────────────────── + +json TcpMemoryPool::mr_info() const { + json j = json::object(); + for (const auto& [name, h] : name_to_handle_) + if (h >= 0 && static_cast(h) < handle_to_mr_.size() + && handle_to_mr_[h].length > 0) + j[name] = handle_to_mr_[h].json_info(name); + return j; +} + +json TcpMemoryPool::remote_mr_info() const { + json j = json::object(); + for (const auto& [name, h] : remote_name_to_handle_) + if (h >= 0 && static_cast(h) < remote_handle_to_mr_.size() + && remote_handle_to_mr_[h].length > 0) + j[name] = remote_handle_to_mr_[h].json_info(name); + return j; +} + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_memory_pool.h b/dlslime/csrc/engine/tcp/tcp_memory_pool.h new file mode 100644 index 00000000..249f30cb --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_memory_pool.h @@ -0,0 +1,63 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "dlslime/csrc/common/json.hpp" + +namespace dlslime { +namespace tcp { + +using json = nlohmann::json; + +struct TcpMr { + uintptr_t addr{0}; + uint64_t offset{0}; + size_t length{0}; + + json json_info(const std::string& name) const { + return {{"name", name}, {"addr", addr}, + {"offset", offset}, {"length", length}}; + } +}; + +// Pure-bookkeeping pool. No hardware registration needed for TCP. +class TcpMemoryPool { +public: + TcpMemoryPool() = default; + + int32_t register_memory_region(uintptr_t addr, uint64_t offset, + size_t length, + std::optional name = std::nullopt); + int32_t unregister_memory_region(int32_t handle); + + int32_t register_remote_memory_region(const json& mr_info, + std::optional name = std::nullopt); + int32_t unregister_remote_memory_region(int32_t handle); + + TcpMr get_mr_fast(int32_t handle) const; + TcpMr get_remote_mr_fast(int32_t handle) const; + int32_t get_mr_handle(const std::string& name) const; + int32_t get_remote_mr_handle(const std::string& name) const; + + json mr_info() const; + json remote_mr_info() const; + +private: + // local MRs + std::unordered_map name_to_handle_; + std::unordered_map ptr_to_handle_; + std::vector handle_to_mr_; + std::vector handle_to_name_; + + // remote MRs + std::unordered_map remote_name_to_handle_; + std::vector remote_handle_to_mr_; + std::vector remote_handle_to_name_; +}; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_op_state.h b/dlslime/csrc/engine/tcp/tcp_op_state.h new file mode 100644 index 00000000..dbf89a2a --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_op_state.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include + +#include "dlslime/csrc/device/device_api.h" +#include "dlslime/csrc/device/signal.h" + +namespace dlslime { +namespace tcp { + +enum Status : int32_t { + TCP_SUCCESS = 0, + TCP_FAILED = -1, + TCP_TIMEOUT = -2, + TCP_CLOSED = -3, +}; + +// One per in-flight operation. The io_context thread (or caller for sync +// ops) signals completion via DeviceSignal; the future's wait() spins on +// wait_comm_done_cpu(). +struct TcpOpState { + std::shared_ptr signal; + uint32_t expected_mask{1}; + std::atomic completion_mask{0}; + std::atomic completion_status{TCP_SUCCESS}; + + uintptr_t user_buffer{0}; + size_t user_length{0}; + size_t bytes_copied{0}; + + static std::shared_ptr create() { + auto s = std::make_shared(); + s->signal = dlslime::device::createSignal(false); + return s; + } +}; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_session.cpp b/dlslime/csrc/engine/tcp/tcp_session.cpp new file mode 100644 index 00000000..57e2d13a --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_session.cpp @@ -0,0 +1,142 @@ +#include "tcp_session.h" + +#include +#include +#include + +#include +#include + +#include "dlslime/csrc/logging.h" + +namespace dlslime { +namespace tcp { + +// ── helpers ───────────────────────────────────────────── + +static void hdr_to_net(SessionHeader& hdr) { + hdr.size = htole64(hdr.size); + hdr.addr = htole64(hdr.addr); +} + +static void hdr_to_host(SessionHeader& hdr) { + hdr.size = le64toh(hdr.size); + hdr.addr = le64toh(hdr.addr); +} + +static bool is_fatal(asio::error_code ec) { + return ec && ec != asio::error::eof; +} + +// ── ServerSession ─────────────────────────────────────── + +ServerSession::ServerSession(asio::ip::tcp::socket socket, + TcpMemoryPool* local_pool, + RecvMatcher recv_matcher) + : socket_(std::move(socket)) + , local_pool_(local_pool) + , recv_matcher_(std::move(recv_matcher)) {} + +void ServerSession::start() { + readHeader(); +} + +void ServerSession::readHeader() { + auto self = shared_from_this(); + header_ = {}; + asio::async_read(socket_, asio::buffer(&header_, sizeof(header_)), + [this, self](asio::error_code ec, size_t /*n*/) { + if (ec) { + if (is_fatal(ec)) + SLIME_LOG_WARN("ServerSession::readHeader ", ec.message()); + return; // connection closed, session ends + } + hdr_to_host(header_); + transferred_ = 0; + dispatch(); + }); +} + +void ServerSession::dispatch() { + switch (header_.opcode) { + case OP_SEND: + if (header_.size == 0) { readHeader(); return; } + chunk_buf_.resize(header_.size); + readBody(header_.size); + break; + + case OP_WRITE: + if (header_.size == 0) { readHeader(); return; } + chunk_buf_.resize(header_.size); + readBody(header_.size); + break; + + case OP_READ: { + uintptr_t addr = static_cast(header_.addr); + size_t sz = static_cast(header_.size); + if (sz == 0) { readHeader(); return; } + // Write back raw data — no header on the response. + auto self = shared_from_this(); + asio::async_write(socket_, + asio::buffer(reinterpret_cast(addr), sz), + [this, self](asio::error_code ec, size_t /*n*/) { + if (ec && is_fatal(ec)) + SLIME_LOG_WARN("ServerSession READ response ", ec.message()); + readHeader(); + }); + break; + } + + default: + SLIME_LOG_WARN("ServerSession: unknown opcode ", + static_cast(header_.opcode)); + readHeader(); + break; + } +} + +void ServerSession::readBody(uint64_t remaining) { + auto self = shared_from_this(); + size_t chunk = std::min(static_cast(remaining), kDefaultChunkSize); + + if (chunk == 0) { + if (header_.opcode == OP_SEND) { + auto slot = recv_matcher_(); + if (slot.buffer && slot.length > 0) { + size_t n = std::min(static_cast(header_.size), + slot.length); + std::memcpy(reinterpret_cast(slot.buffer), + chunk_buf_.data(), n); + if (slot.op_state) { + slot.op_state->bytes_copied = n; + slot.op_state->completion_status.store( + TCP_SUCCESS, std::memory_order_release); + if (slot.op_state->signal) + slot.op_state->signal->set_comm_done(0); + } + } + } else if (header_.opcode == OP_WRITE) { + uintptr_t addr = static_cast(header_.addr); + std::memcpy(reinterpret_cast(addr), + chunk_buf_.data(), header_.size); + } + readHeader(); + return; + } + + size_t offset = transferred_; + asio::async_read(socket_, + asio::buffer(chunk_buf_.data() + offset, chunk), + [this, self, remaining](asio::error_code ec, size_t n) { + if (ec) { + if (is_fatal(ec)) + SLIME_LOG_WARN("ServerSession::readBody ", ec.message()); + return; + } + transferred_ += n; + readBody(remaining - n); + }); +} + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/tcp_session.h b/dlslime/csrc/engine/tcp/tcp_session.h new file mode 100644 index 00000000..470cb186 --- /dev/null +++ b/dlslime/csrc/engine/tcp/tcp_session.h @@ -0,0 +1,60 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "tcp_header.h" +#include "tcp_memory_pool.h" +#include "tcp_op_state.h" + +namespace dlslime { +namespace tcp { + +class TcpConnectionPool; + +constexpr size_t kDefaultChunkSize = 65536; // 64KB + +// ── RecvSlot: returned by RecvMatcher when a SEND matches a pending recv ── +struct RecvSlot { + uintptr_t buffer{0}; + size_t length{0}; + std::shared_ptr op_state; +}; + +// ── ServerSession: handles incoming requests on one connection ── +// +// Lifecycle: start() → readHeader → dispatch → readBody/writeBody ↻ +// Persistent — one session handles many transfers on the same connection. +// Referenced from Mooncake ServerSession. +class ServerSession : public std::enable_shared_from_this { +public: + using RecvMatcher = std::function; + + ServerSession(asio::ip::tcp::socket socket, + TcpMemoryPool* local_pool, + RecvMatcher recv_matcher); + + void start(); + +private: + void readHeader(); + void dispatch(); + void readBody(uint64_t remaining); + + asio::ip::tcp::socket socket_; + TcpMemoryPool* local_pool_; + RecvMatcher recv_matcher_; + SessionHeader header_{}; + uint64_t transferred_{0}; + std::vector chunk_buf_; +}; + +} // namespace tcp +} // namespace dlslime diff --git a/dlslime/csrc/engine/tcp/test_tcp_endpoint.py b/dlslime/csrc/engine/tcp/test_tcp_endpoint.py new file mode 100644 index 00000000..4d6f85b2 --- /dev/null +++ b/dlslime/csrc/engine/tcp/test_tcp_endpoint.py @@ -0,0 +1,210 @@ +"""End-to-end test for TcpEndpoint v3 async primitives with timeout. + +Usage: + LD_LIBRARY_PATH=dlslime PYTHONPATH=. DLSLIME_LOG_LEVEL=0 python3 \ + dlslime/csrc/engine/tcp/test_tcp_endpoint.py +""" + +import ctypes +import threading +import time + +from dlslime import TcpEndpoint, TcpMemoryPool + + +def _sync_run(fn_a, fn_b): + b = threading.Barrier(2) + ta = threading.Thread(target=lambda: (b.wait(), fn_a()), daemon=True) + tb = threading.Thread(target=lambda: (b.wait(), fn_b()), daemon=True) + ta.start(); tb.start() + ta.join(); tb.join() + + +def test_async_send_recv(): + """Two endpoints async_send/async_recv each other.""" + print("=== test_async_send_recv ===") + + buf_a = ctypes.create_string_buffer(4096) + buf_b = ctypes.create_string_buffer(4096) + + ep_a = TcpEndpoint(10001) + ep_b = TcpEndpoint(10002) + h_a = ep_a.register_memory_region("a", ctypes.addressof(buf_a), 0, 4096) + h_b = ep_b.register_memory_region("b", ctypes.addressof(buf_b), 0, 4096) + info_a = ep_a.endpoint_info() + info_b = ep_b.endpoint_info() + + def run_a(): + ep_a.connect(info_b) + print(" A connected") + ctypes.memmove(ctypes.addressof(buf_a), b"hello", 5) + st = ep_a.async_send((h_a, 0, 5)).wait() + assert st == 0, f"send failed: {st}" + print(" A sent 5 bytes") + st = ep_a.async_recv((h_a, 0, 5)).wait() + assert st == 0, f"recv failed: {st}" + assert bytes(buf_a[:5]) == b"world" + print(" A recv'd: world") + ep_a.shutdown() + + def run_b(): + ep_b.connect(info_a) + print(" B connected") + st = ep_b.async_recv((h_b, 0, 5)).wait() + assert st == 0 and bytes(buf_b[:5]) == b"hello" + print(" B recv'd: hello") + ctypes.memmove(ctypes.addressof(buf_b), b"world", 5) + st = ep_b.async_send((h_b, 0, 5)).wait() + assert st == 0 + print(" B sent 5 bytes") + ep_b.shutdown() + + _sync_run(run_a, run_b) + print(" PASSED\n") + + +def test_async_write_read(): + """A writes to B's buffer, then reads from B's buffer.""" + print("=== test_async_write_read ===") + + buf_a = ctypes.create_string_buffer(4096) + buf_b = ctypes.create_string_buffer(4096) + addr_a = ctypes.addressof(buf_a) + + ep_a = TcpEndpoint(0) + ep_b = TcpEndpoint(0) + + h_a = ep_a.register_memory_region("a", addr_a, 0, 4096) + h_b = ep_b.register_memory_region("b", ctypes.addressof(buf_b), 0, 4096) + + info_a = ep_a.endpoint_info() + info_b = ep_b.endpoint_info() + + h_br = ep_a.register_remote_memory_region("rb", info_b["mr_info"]["b"]) + + test_data = b"hello_from_a" + + def run_a(): + ep_a.connect(info_b) + print(" A connected") + ctypes.memmove(addr_a, test_data, len(test_data)) + st = ep_a.async_write([(h_a, h_br, 0, 0, len(test_data))]).wait() + assert st == 0, f"write failed: {st}" + print(f" A wrote {len(test_data)} bytes to B") + time.sleep(0.1) + st = ep_a.async_read([(h_a, h_br, 0, 0, len(test_data))]).wait() + assert st == 0 and bytes(buf_a[:len(test_data)]) == test_data + print(f" A read from B: {bytes(buf_a[:len(test_data)])}") + ep_a.shutdown() + + def run_b(): + ep_b.connect(info_a) + print(" B connected") + time.sleep(0.2) + for _ in range(50): + if bytes(buf_b[:len(test_data)]) == test_data: + break + time.sleep(0.01) + assert bytes(buf_b[:len(test_data)]) == test_data + print(f" B buffer verified") + ep_b.shutdown() + + _sync_run(run_a, run_b) + print(" PASSED\n") + + +def test_recv_timeout(): + """recv times out when peer never sends.""" + print("=== test_recv_timeout ===") + + buf_a = ctypes.create_string_buffer(64) + + ep_a = TcpEndpoint(10003) + h_a = ep_a.register_memory_region("a", ctypes.addressof(buf_a), 0, 64) + ep_b = TcpEndpoint(10004) + + def run_b(): + ep_b.connect(ep_a.endpoint_info()) + time.sleep(1.5) + ep_b.shutdown() + + def run_a(): + ep_a.connect(ep_b.endpoint_info()) + fut = ep_a.async_recv((h_a, 0, 5), timeout_ms=300) + result = fut.wait_for(0.3) + print(f" recv wait_for(0.3s): {result} (expected None)") + assert result is None, f"Expected None (timeout), got {result}" + ep_a.shutdown() + + _sync_run(run_a, run_b) + print(" PASSED\n") + + +def test_send_timeout_ms(): + """async_send accepts timeout_ms parameter.""" + print("=== test_send_timeout_ms ===") + + buf_a = ctypes.create_string_buffer(256) + buf_b = ctypes.create_string_buffer(256) + + ep_a = TcpEndpoint(10005) + ep_b = TcpEndpoint(10006) + h_a = ep_a.register_memory_region("a", ctypes.addressof(buf_a), 0, 256) + h_b = ep_b.register_memory_region("b", ctypes.addressof(buf_b), 0, 256) + + def run_b(): + ep_b.connect(ep_a.endpoint_info()) + st = ep_b.async_recv((h_b, 0, 5)).wait() + assert st == 0 + ep_b.shutdown() + + def run_a(): + ep_a.connect(ep_b.endpoint_info()) + ctypes.memmove(ctypes.addressof(buf_a), b"world", 5) + st = ep_a.async_send((h_a, 0, 5), timeout_ms=10000).wait() + assert st == 0, f"send timeout_ms=10000 failed: {st}" + print(f" async_send with timeout_ms=10000: status={st}") + ep_a.shutdown() + + _sync_run(run_a, run_b) + print(" PASSED\n") + + +def test_default_timeout(): + """async_send uses kDefaultTimeoutMs=30000 when timeout_ms not given.""" + print("=== test_default_timeout ===") + + buf_a = ctypes.create_string_buffer(128) + buf_b = ctypes.create_string_buffer(128) + + ep_a = TcpEndpoint(10007) + ep_b = TcpEndpoint(10008) + h_a = ep_a.register_memory_region("a", ctypes.addressof(buf_a), 0, 128) + h_b = ep_b.register_memory_region("b", ctypes.addressof(buf_b), 0, 128) + + def run_b(): + ep_b.connect(ep_a.endpoint_info()) + st = ep_b.async_recv((h_b, 0, 5)).wait() + assert st == 0 + ep_b.shutdown() + + def run_a(): + ep_a.connect(ep_b.endpoint_info()) + ctypes.memmove(ctypes.addressof(buf_a), b"test!", 5) + # No timeout_ms arg — uses default 30000ms + st = ep_a.async_send((h_a, 0, 5)).wait() + assert st == 0, f"default timeout send failed: {st}" + print(f" async_send with default timeout: status={st}") + ep_a.shutdown() + + _sync_run(run_a, run_b) + print(" PASSED\n") + + +if __name__ == "__main__": + test_async_send_recv() + test_async_write_read() + test_recv_timeout() + test_send_timeout_ms() + test_default_timeout() + print("All TcpEndpoint v3 tests passed!") diff --git a/dlslime/csrc/python/CMakeLists.txt b/dlslime/csrc/python/CMakeLists.txt index 389be03a..1584babc 100755 --- a/dlslime/csrc/python/CMakeLists.txt +++ b/dlslime/csrc/python/CMakeLists.txt @@ -67,6 +67,11 @@ if (BUILD_ASCEND_DIRECT) ) endif() +if (BUILD_TCP) + target_compile_definitions(_slime_c PRIVATE BUILD_TCP) + list(APPEND _slime_c_link_libraries _slime_tcp) +endif() + # Ops moved to NanoCCL - link to NanoCCL if needed # if (BUILD_INTRA_OPS OR BUILD_INTER_OPS) # if (BUILD_INTRA_OPS) diff --git a/dlslime/csrc/python/bind.cpp b/dlslime/csrc/python/bind.cpp index b5a56591..0b1efc90 100644 --- a/dlslime/csrc/python/bind.cpp +++ b/dlslime/csrc/python/bind.cpp @@ -27,6 +27,12 @@ #include "dlslime/csrc/engine/ascend_direct/ascend_remote_memory_pool.h" #endif +#ifdef BUILD_TCP +#include "dlslime/csrc/engine/tcp/tcp_endpoint.h" +#include "dlslime/csrc/engine/tcp/tcp_future.h" +#include "dlslime/csrc/engine/tcp/tcp_memory_pool.h" +#endif + #include "dlslime/csrc/device/signal.h" #ifdef BUILD_RDMA @@ -89,6 +95,12 @@ namespace py = pybind11; #define BUILD_RPC_ENABLED false #endif +#ifdef BUILD_TCP +#define BUILD_TCP_ENABLED true +#else +#define BUILD_TCP_ENABLED false +#endif + // Ops moved to NanoCCL #define BUILD_INTRA_OPS_ENABLED false #define BUILD_INTER_OPS_ENABLED false @@ -102,6 +114,7 @@ PYBIND11_MODULE(_slime_c, m) EXPOSE_BUILD_FLAG(m, BUILD_INTRA_OPS); EXPOSE_BUILD_FLAG(m, BUILD_INTER_OPS); EXPOSE_BUILD_FLAG(m, BUILD_RPC); + EXPOSE_BUILD_FLAG(m, BUILD_TCP); m.def("discover_topology", &dlslime::topology::discoverTopology, @@ -512,6 +525,122 @@ PYBIND11_MODULE(_slime_c, m) py::call_guard()); #endif +#ifdef BUILD_TCP + // ========================================================================= + // TCP Transport + // ========================================================================= + py::class_>( + m, "SlimeTcpSendFuture") + .def("wait", &dlslime::tcp::TcpSendFuture::wait, + py::call_guard()) + .def("wait_for", + [](const dlslime::tcp::TcpSendFuture& self, double sec) -> py::object { + int32_t st = 0; + int64_t ms = static_cast(sec * 1000.0); + if (ms < 0) ms = 0; + if (self.wait_for(ms, &st)) return py::cast(st); + return py::none(); + }, + py::arg("timeout_seconds")); + + py::class_>( + m, "SlimeTcpRecvFuture") + .def("wait", &dlslime::tcp::TcpRecvFuture::wait, + py::call_guard()) + .def("wait_for", + [](const dlslime::tcp::TcpRecvFuture& self, double sec) -> py::object { + int32_t st = 0; + int64_t ms = static_cast(sec * 1000.0); + if (ms < 0) ms = 0; + if (self.wait_for(ms, &st)) return py::cast(st); + return py::none(); + }, + py::arg("timeout_seconds")); + + py::class_>( + m, "SlimeTcpReadWriteFuture") + .def("wait", &dlslime::tcp::TcpReadWriteFuture::wait, + py::call_guard()) + .def("wait_for", + [](const dlslime::tcp::TcpReadWriteFuture& self, double sec) -> py::object { + int32_t st = 0; + int64_t ms = static_cast(sec * 1000.0); + if (ms < 0) ms = 0; + if (self.wait_for(ms, &st)) return py::cast(st); + return py::none(); + }, + py::arg("timeout_seconds")); + + py::class_>( + m, "TcpMemoryPool") + .def(py::init<>()) + .def("register_memory_region", + [](dlslime::tcp::TcpMemoryPool& self, uintptr_t addr, uint64_t offset, + size_t length, py::object name_obj) { + std::optional name; + if (!name_obj.is_none()) name = name_obj.cast(); + return self.register_memory_region(addr, offset, length, name); + }, + py::arg("addr"), py::arg("offset"), py::arg("length"), + py::arg("name") = py::none()) + .def("register_remote_memory_region", + [](dlslime::tcp::TcpMemoryPool& self, const json& mr_info, + py::object name_obj) { + std::optional name; + if (!name_obj.is_none()) name = name_obj.cast(); + return self.register_remote_memory_region(mr_info, name); + }, + py::arg("mr_info"), py::arg("name") = py::none()) + .def("get_mr_handle", &dlslime::tcp::TcpMemoryPool::get_mr_handle, + py::arg("name")) + .def("mr_info", &dlslime::tcp::TcpMemoryPool::mr_info); + + py::class_>( + m, "TcpEndpoint") + .def(py::init(), py::arg("port") = 0) + .def("connect", &dlslime::tcp::TcpEndpoint::connect, + py::arg("remote_info"), py::call_guard()) + .def("endpoint_info", &dlslime::tcp::TcpEndpoint::endpoint_info) + .def("mr_info", &dlslime::tcp::TcpEndpoint::mr_info) + .def("shutdown", &dlslime::tcp::TcpEndpoint::shutdown, + py::call_guard()) + .def("register_memory_region", + &dlslime::tcp::TcpEndpoint::register_memory_region, + py::arg("name"), py::arg("data_ptr"), py::arg("offset"), py::arg("length"), + py::call_guard()) + .def("register_remote_memory_region", + &dlslime::tcp::TcpEndpoint::register_remote_memory_region, + py::arg("name"), py::arg("mr_info"), py::call_guard()) + .def("async_send", + py::overload_cast( + &dlslime::tcp::TcpEndpoint::async_send), + py::arg("chunk"), + py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, + py::arg("stream") = nullptr, + py::call_guard()) + .def("async_recv", + py::overload_cast( + &dlslime::tcp::TcpEndpoint::async_recv), + py::arg("chunk"), + py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, + py::arg("stream") = nullptr, + py::call_guard()) + .def("async_read", + py::overload_cast&, int64_t, void*>( + &dlslime::tcp::TcpEndpoint::async_read), + py::arg("assign"), + py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, + py::arg("stream") = nullptr, + py::call_guard()) + .def("async_write", + py::overload_cast&, int64_t, void*>( + &dlslime::tcp::TcpEndpoint::async_write), + py::arg("assign"), + py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, + py::arg("stream") = nullptr, + py::call_guard()); +#endif // BUILD_TCP + // Ops moved to NanoCCL - Python bindings should be in NanoCCL's Python module // #ifdef BUILD_INTRA_OPS // py::class_(m, "AllToAllIntraLLBuffer") From 4211aca3b76b11d2e85df2ff1877f17d0f0a93c4 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 14 May 2026 14:11:39 +0000 Subject: [PATCH 2/2] clean up TcpEndpoint API: remove dead void* stream and unused timeout_ms - Remove void* stream from all 4 async_* methods (RDMA leftover, never used) - Remove timeout_ms from async_recv (recv timeout via future.wait_for()) - Remove ineffective SO_SNDTIMEO calls (no effect on asio::async_write) - Update pybind11 bindings and tests to match - Add tcp/plan.md with v3 architecture documentation Co-Authored-By: Claude Opus 4.7 --- dlslime/csrc/engine/tcp/plan.md | 731 +++++++++++++++++++ dlslime/csrc/engine/tcp/tcp_endpoint.cpp | 36 +- dlslime/csrc/engine/tcp/tcp_endpoint.h | 31 +- dlslime/csrc/engine/tcp/test_tcp_endpoint.py | 2 +- dlslime/csrc/python/bind.cpp | 14 +- 5 files changed, 757 insertions(+), 57 deletions(-) create mode 100755 dlslime/csrc/engine/tcp/plan.md diff --git a/dlslime/csrc/engine/tcp/plan.md b/dlslime/csrc/engine/tcp/plan.md new file mode 100755 index 00000000..720ed8e4 --- /dev/null +++ b/dlslime/csrc/engine/tcp/plan.md @@ -0,0 +1,731 @@ +# DLSlime TcpEndpoint v3 Primitives 架构与实现计划 + +**分支**: `tcp-v3` | **基准**: `main` | **日期**: 2026-05-14 + +--- + +## 1. 架构设计 + +### 1.1 总体架构 + +``` +┌──────────────────────────────────────────────────────────────┐ +│ Python 调用者线程 │ +│ ep.async_send(chunk, timeout_ms=30000) → Future │ +│ ep.async_recv(chunk, timeout_ms=30000) → Future │ +│ ep.async_read(assign, timeout_ms=30000) → Future │ +│ ep.async_write(assign, timeout_ms=30000) → Future │ +│ │ │ +│ │ post lambda │ +│ ▼ │ +│ ┌──────────────────────┐ ┌─────────────────────────────┐ │ +│ │ asio::io_context │ │ TcpConnectionPool │ │ +│ │ (单后台线程) │◄───│ (host, port) → deque │ │ +│ │ │ │ IDLE / ACTIVE / RESERVED │ │ +│ │ async_write ────────┼───►│ 60s 空闲超时 │ │ +│ │ async_read ◄────────┼───►│ │ │ +│ │ async_accept ───────┼───►│ ServerSession │ │ +│ │ │ │ (readHeader→dispatch→ │ │ +│ │ │ │ readBody→readHeader 循环) │ │ +│ └──────────────────────┘ └─────────────────────────────┘ │ +└──────────────────────────────────────────────────────────────┘ +``` + +### 1.2 线程模型 + +| 角色 | 线程 | 职责 | +|------|------|------| +| io_context | 1 个 daemon 线程 | `io_ctx_.run()` — 所有 asio async I/O 回调 | +| 调用者 | N 个 Python 线程 | 调 async_* → 立即返回 Future;wait() 自旋阻塞 | +| accept | io_context | `async_accept` 回调链,每连接创建 ServerSession | + +### 1.3 asio 操作模型 + +``` +调用者线程 io_context 线程 +────────── ────────────── +async_send(chunk, 5000): + ├─ getConnection() [sync, fast] ┌─ async_write(header+payload) + ├─ SO_SNDTIMEO=5s │ → 归还连接 → signal op_state + ├─ asio::post(lambda) ──────────────► │ + └─ return Future ◄─── signal ────────┘ + +async_recv(chunk, 5000): + ├─ pending_recvs_.push(op_state) ┌─ ServerSession::dispatch(OP_SEND) + └─ return Future │ → pop pending_recvs_ + │ │ → memcpy → signal op_state + └── wait_for(5.0) ── timeout? ──┘ + +async_read(assign, 5000): + ├─ getConnection() [RESERVE] ┌─ async_write(OP_READ header) + ├─ asio::post(lambda) ──────────────► │ → async_read(response data) + └─ return Future ◄─── signal ────────┘ → 归还连接 → signal op_state + +async_write(assign, 5000): + ├─ getConnection() [sync, fast] ┌─ async_write(header+payload) + ├─ SO_SNDTIMEO=5s │ → 归还连接 → signal op_state + ├─ asio::post(lambda) ──────────────► │ + └─ return Future ◄─── signal ────────┘ +``` + +--- + +## 2. 线协议设计 + +### 2.1 SessionHeader (17 字节,对齐 Mooncake) + +``` +偏移 大小 字段 +0 8 size (payload 字节数, little-endian: htole64 / le64toh) +8 8 addr (远端 buffer 虚拟地址) +16 1 opcode (操作码) +───────────────── + 17 bytes total +``` + +### 2.2 为什么 3 个 opcode 支持 4 个原语? + +OP_SEND 同时承载 `async_send`(发起方主动 push 数据)和 `async_recv`(接收方 +被动等待)。recv 方不在线上发送任何操作码——它只是向本地 `pending_recvs_` 队列注册 +一个 buffer,然后对端 ServerSession 在收到 OP_SEND 时通过 `RecvMatcher` 回调 pop +队列前端、memcpy 数据并 signal op_state。 + +这与 Mooncake 的设计一致:ServerSession::dispatch(OP_SEND) 先分块读取 payload, +然后通过 recv_matcher_ 匹配本地注册的 recv buffer。不需要独立的 recv opcode—— +SEND 到达本身就隐含了"有一端在等待"的语义。 + +OP_READ 和 OP_WRITE 各需独立 opcode,因为服务端 dispatch 分支逻辑完全不同: +- OP_READ:读取本地内存后异步写回原始数据(无 header) +- OP_WRITE:读取 payload 后 memcpy 到 hdr.addr + +如果有 4 个 opcode(比如独立的 OP_RECV),反而增加冗余——OP_RECV 在语义上等于 +"我准备好接收了",但这已在连接建立时通过 endpoint_info 交换 MR 信息隐式表达, +不需要每个操作发一次。 + +| opcode | 值 | 线格式 | 远端 ServerSession 动作 | DLSlime 原语 | +|--------|-----|--------|------------------------|-------------| +| `OP_SEND` | 0x00 | header{sz, 0, 0x00} + payload | 读 payload → recv_matcher pop → memcpy → signal | **async_send** (发起) / **async_recv** (被动) | +| `OP_READ` | 0x01 | 仅 header{sz, addr, 0x01} | 从本地 addr 读 sz 字节 → async_write 原始数据发回 | **async_read** (调用者 pull) | +| `OP_WRITE` | 0x02 | header{sz, addr, 0x02} + payload | 读 payload → memcpy 到本地 addr | **async_write** (调用者 push) | + +### 2.3 四个原语在线上的完整流程 + +``` +async_send(chunk): + 调用者: getConnection → post to io_ctx → return Future + io_ctx: async_write(sock, [header{OP_SEND}|payload]) + → on_complete: returnConnection → signal op_state + 对端 ServerSession: async_read(header) → dispatch(OP_SEND) + → chunk_buf_.resize → readBody 分块读 payload → recv_matcher_() + → pop pending_recv → memcpy → signal recv op_state + +async_recv(chunk): + 调用者: pending_recvs_.push({buffer, op_state}) → return Future → wait_for(timeout) + (无 opcode 在线路上 — recv 是 SEND 的被动消费方) + +async_read(assign): + 调用者: getConnection(RESERVED) → post to io_ctx → return Future + io_ctx: async_write(sock, header{OP_READ, sz, remote_addr}) + → async_read(sock, user_buffer, sz) + → on_complete: returnConnection → signal op_state + 对端 ServerSession: async_read(header) → dispatch(OP_READ) + → async_write(sock, local[addr], sz) → readHeader 继续 + +async_write(assign): + 调用者: getConnection → post to io_ctx → return Future + io_ctx: async_write(sock, [header{OP_WRITE, sz, remote_addr}|payload]) + → on_complete: returnConnection → signal op_state + 对端 ServerSession: async_read(header) → dispatch(OP_WRITE) + → chunk_buf_.resize → readBody 分块读 payload → memcpy 到 addr +``` + +--- + +## 3. 接口设计 + +### 3.1 C++ TcpEndpoint 公共接口 + +```cpp +class TcpEndpoint : public std::enable_shared_from_this { +public: + // 默认超时 30 秒 + static constexpr int64_t kDefaultTimeoutMs = 30000; + + // ── 构造 ── + + // 【主构造】每个 endpoint 内部自动创建 TcpContext, 调用者无需关心。 + // 这是最常用的场景: 一个 endpoint = 一个 peer 连接。 + explicit TcpEndpoint(uint16_t port = 0); + + // 【次构造】注入外部共享 TcpContext, 用于多 endpoint 复用单 io_context 线程 + // 的高级优化场景 (如 PeerAgent 连接 N 个 peer 时节省 N-1 个线程)。 + // 仅在明确需要跨 endpoint 共享资源时使用。 + TcpEndpoint(TcpContext& ctx, uint16_t port = 0); + + // ── 连接 ── + json endpoint_info() const; // {host, port, mr_info} + void connect(const json& remote_info); + void shutdown(); + + // ── 内存 ── + int32_t register_memory_region(const std::string& name, + uintptr_t ptr, uintptr_t offset, size_t length); + int32_t register_remote_memory_region(const std::string& name, + const json& mr_info); + json mr_info() const; + + // ── 异步通信原语 (全部返回 Future, I/O 在 io_context 线程) ── + // + // timeout_ms 由调用者通过 future.wait_for() 控制实际操作时限; + // 方法签名的 timeout_ms 仅作为 op_state 的提示值传入。 + // recv 的超时完全由 future.wait_for() 控制, 不需要 timeout_ms 参数。 + + // 双边发送 + std::shared_ptr async_send( + const chunk_tuple_t& chunk, + int64_t timeout_ms = kDefaultTimeoutMs); + + // 双边接收 (超时通过 future.wait_for()) + std::shared_ptr async_recv( + const chunk_tuple_t& chunk); + + // 单边读 + std::shared_ptr async_read( + const std::vector& assign, + int64_t timeout_ms = kDefaultTimeoutMs); + + // 单边写 + std::shared_ptr async_write( + const std::vector& assign, + int64_t timeout_ms = kDefaultTimeoutMs); + + // ── 访问器 ── + void setId(int64_t id); + int64_t getId() const; + bool is_connected() const; +}; +``` + +### 3.2 C++ TcpFuture 接口 + +```cpp +class TcpFuture : public DeviceFuture { +public: + // 无限期阻塞等待 + int32_t wait() const override; + + // 限时等待: timeout_ms 毫秒, 成功返回 true 并写 *out + // 超时返回 false (操作仍在进行, 可重试) + bool wait_for(int64_t timeout_ms, int32_t* out) const; +}; + +class TcpSendFuture : public TcpFuture { }; +class TcpRecvFuture : public TcpFuture { }; +class TcpReadWriteFuture : public TcpFuture { }; +``` + +### 3.3 Python 接口 + +```python +from dlslime import TcpEndpoint, TcpMemoryPool + +pool = TcpMemoryPool() +buf = ctypes.create_string_buffer(4096) +h = pool.register_memory_region(ctypes.addressof(buf), 0, 4096, "buf") + +ep = TcpEndpoint(port=0) # 0 = 随机端口 +info = ep.endpoint_info() # {'host': '...', 'port': N, 'mr_info': {...}} + +ep.connect(peer_info) + +# ── 异步原语, 默认 30s 超时 ── +fut = ep.async_send((h, 0, 128)) # 30s 默认超时 +fut = ep.async_send((h, 0, 128), 5000) # 5s 超时 +status = fut.wait() # 阻塞直到完成, 返回 0=成功 + +fut = ep.async_recv((h, 0, 128)) # 超时通过 future 控制 +result = fut.wait_for(3.0) # 3 秒超时, 返回 int 或 None + +fut = ep.async_read([(local_h, remote_h, 0, 0, 128)]) +fut = ep.async_write([(local_h, remote_h, 0, 0, 128)]) + +ep.shutdown() +``` + +--- + +## 4. 通信原语设计详解 + +### 4.1 async_send(chunk, timeout_ms = 30000) + +**语义**: 将本地注册内存的数据异步发送到对端。对端必须已调用 `async_recv()` 注册接收缓冲区。 + +**调用者线程**: +1. `local_pool_->get_mr_fast(mr_key)` — resolve 本地 MR +2. `conn_pool_.getConnection(peer_host_, peer_port_)` — 获取或创建 TCP 连接 +3. `TcpOpState::create()` + `signal->reset_all()` — 创建完成信号 +4. 如果 `timeout_ms > 0`: `setsockopt(fd, SO_SNDTIMEO, timeout_ms)` +5. `asio::post(io_ctx_, lambda)` — 提交到 io_context +6. 立即返回 `TcpSendFuture(op_state)` + +**io_context 线程**: +1. `hdr_hton()` — 字节序转换 header +2. `asio::async_write(sock, [header_buf, payload_buf], callback)` — gather write +3. callback: + - 如果 `timeout_ms > 0`: 恢复 `SO_SNDTIMEO = 0` + - `op->completion_status = ec ? TCP_FAILED : TCP_SUCCESS` + - `conn_pool_.returnConnection(conn)` + - `op->signal->set_comm_done(0)` + +**超时行为**: socket 写超时 → write 失败 → completion_status = TCP_FAILED。调用者 `future.wait()` 得到 -1。 + +### 4.2 async_recv(chunk, timeout_ms = 30000) + +**语义**: 注册接收意图。当对端 `async_send()` 的数据到达时,io_context 线程自动匹配并 memcpy 到注册的 buffer。 + +**调用者线程**: +1. `local_pool_->get_mr_fast(mr_key)` — resolve 本地 MR +2. `TcpOpState::create()` + 设置 `user_buffer`, `user_length` +3. `pending_recvs_.push_back({op_state})` — FIFO 入队 +4. 立即返回 `TcpRecvFuture(op_state)` + +**io_context 线程** (ServerSession::dispatch, OP_SEND 分支): +1. `readBody()` — 分块读取 payload 到 `chunk_buf_` +2. `RecvSlot slot = recv_matcher_()` — pop FIFO 前端 +3. `memcpy(slot.buffer, chunk_buf_.data(), min(payload_len, slot.length))` +4. `slot.op_state->completion_status = TCP_SUCCESS` +5. `slot.op_state->signal->set_comm_done(0)` + +**超时行为**: 调用者使用 `future.wait_for(timeout_ms)` 限时等待。超时返回 None,但 recv 保留在队列中——后续到达的 SEND 仍会完成它(调用者可重试)。 + +### 4.3 async_read(assign, timeout_ms = 30000) + +**语义**: 从对端的注册内存异步读取数据。两步异步操作:发 OP_READ header → 收原始响应数据。 + +**调用者线程**: +1. resolve local + remote MRs +2. `conn_pool_.getConnection(peer_host_, peer_port_)` — RESERVE 连接 +3. `TcpOpState::create()` + 设置 `user_buffer`, `user_length` +4. `asio::post(io_ctx_, lambda)` — 提交到 io_context +5. 立即返回 `TcpReadWriteFuture(op_state)` + +**io_context 线程**: +1. `hdr_hton()` → `asio::async_write(sock, header_buf, callback_1)` +2. callback_1: 如果写失败 → signal TCP_FAILED + returnConnection +3. `asio::async_read(sock, user_buffer_buf, callback_2)` +4. callback_2: + - `op->completion_status = ec ? TCP_FAILED : TCP_SUCCESS` + - `conn_pool_.returnConnection(conn)` + - `op->signal->set_comm_done(0)` + +**对端 ServerSession** (OP_READ 分支): +1. 从 `hdr.addr` 读取 `hdr.size` 字节本地内存 +2. `asio::async_write(sock, raw_data, callback)` — 直接写回原始数据(无 header) +3. `readHeader()` — 继续监听下个请求 + +**超时行为**: `future.wait_for(timeout_ms)`。连接在整个读取期间被 RESERVED,超时后操作继续在后台运行。 + +### 4.4 async_write(assign, timeout_ms = 30000) + +**语义**: 将本地注册内存的数据异步写入对端注册内存。 + +与 `async_send` 相同的 post+async_write 模式,区别: +- header.opcode = OP_WRITE +- header.addr = remote_addr(对端目标 buffer 地址) +- 对端 ServerSession dispatch(OP_WRITE) → readBody → memcpy 到 `hdr.addr` + +**超时行为**: 同 async_send — SO_SNDTIMEO + future.wait_for()。 + +--- + +## 5. 连接池设计 + +### 5.1 状态机 + +``` + getConnection() + [不存在] ────────────────────────► [ACTIVE] (in_use=true) + │ + returnConnection() + │ + ▼ + [IDLE] (in_use=false, 在 deque 中) ──► 60s 无使用 → cleanupIdleConnections() → 关闭 + │ + │ getConnection() 命中 + ▼ + [ACTIVE] (in_use=true, 离开 deque) +``` + +### 5.2 接口 + +```cpp +class TcpConnectionPool { + // 获取 IDLE 连接或创建新 TCP 连接 + std::shared_ptr getConnection(host, port); + + // 归还连接到 IDLE 状态 (或关闭, 如果 socket 已断开) + void returnConnection(std::shared_ptr conn); + + // 淘汰超过 kIdleTimeout (60s) 的空闲连接 + void cleanupIdleConnections(); + + // 关闭所有连接 (shutdown 时调用) + void clear(); +}; +``` + +--- + +## 6. ServerSession 设计 + +### 6.1 生命周期 + +``` +acceptor.async_accept(socket) + → ServerSession(socket, local_pool, recv_matcher) + → session->start() + → readHeader() ──────────────────────────────────────┐ + → async_read(sock, 17B header) │ + → hdr_to_host() │ + → dispatch() │ + ├─ OP_SEND: chunk_buf_.resize → readBody() │ + │ → memcpy → recv_matcher_() → signal │ + ├─ OP_WRITE: chunk_buf_.resize → readBody() │ + │ → memcpy → hdr.addr │ + └─ OP_READ: async_write(sock, local[addr]) │ + → readHeader() ──────────────────────────────────┘ +``` + +### 6.2 RecvMatcher + +```cpp +// ServerSession 持有的回调, 由 TcpEndpoint 注入 +using RecvMatcher = std::function; + +// TcpEndpoint::make_recv_matcher(): +// 返回一个 lambda, 持有 weak_ptr +// 在 recv_mu_ 下 pop pending_recvs_ 队列前端 +// 返回 {buffer, length, op_state} +``` + +--- + +## 7. 文件结构 + +### 新建文件 + +``` +dlslime/csrc/engine/tcp/ +├── CMakeLists.txt # asio 依赖 + _slime_tcp 共享库 +├── tcp_header.h # 17B SessionHeader + 3 opcodes +├── tcp_memory_pool.h/.cpp # 纯簿记 (addr, offset, length) +├── tcp_context.h/.cpp # 共享 io_context + connection_pool + thread +├── tcp_session.h/.cpp # ServerSession (accept 端) + 分块 I/O +├── tcp_connection_pool.h/.cpp # (host, port) 连接池 +├── tcp_op_state.h # 操作状态 (signal + atomic status) +├── tcp_future.h # TcpFuture 层次 (header-only) +├── tcp_endpoint.h/.cpp # TcpEndpoint: async_send/recv/read/write +├── build_and_test.sh # 一键构建+测试 +└── test_tcp_endpoint.py # Python 端到端测试 (4 用例) +``` + +### 修改文件 + +| 文件 | 变更 | +|------|------| +| `CMakeLists.txt` | `slime_option(BUILD_TCP "Build TCP transport" ON)` | +| `dlslime/csrc/engine/CMakeLists.txt` | `if(BUILD_TCP) add_subdirectory(tcp) endif()` | +| `dlslime/csrc/CMakeLists.txt` | `if(BUILD_TCP) target_link_libraries(dlslime INTERFACE _slime_tcp) endif()` | +| `dlslime/csrc/python/CMakeLists.txt` | `if(BUILD_TCP) target_compile_definitions + list(APPEND ... _slime_tcp) endif()` | +| `dlslime/csrc/python/bind.cpp` | `#ifdef BUILD_TCP` — TcpEndpoint, TcpMemoryPool, TcpFuture pybind11 bindings | + +--- + +## 8. 超时机制总结 + +| 原语 | 超时位置 | 默认值 | 实现方式 | +|------|---------|--------|---------| +| async_send | socket write | 30000ms | `setsockopt(SO_SNDTIMEO)` + `future.wait_for()` | +| async_recv | 等待数据到达 | 30000ms | `future.wait_for(timeout_ms)` — 定时自旋轮询 signal | +| async_read | 等待远端响应 | 30000ms | `future.wait_for(timeout_ms)` — 定时自旋轮询 signal | +| async_write | socket write | 30000ms | `setsockopt(SO_SNDTIMEO)` + `future.wait_for()` | + +**wait_for 实现**: +```cpp +bool TcpFuture::wait_for(int64_t timeout_ms, int32_t* out) const { + auto deadline = steady_clock::now() + milliseconds(timeout_ms); + while (true) { + if (signal->get_comm_done_mask() matches expected_mask) { + *out = completion_status; return true; + } + if (steady_clock::now() >= deadline) { + // last check before declaring timeout + if (signal->get_comm_done_mask() matches expected_mask) { + *out = completion_status; return true; + } + return false; + } + machnet_pause(); // CPU relax + } +} +``` + +--- + +## 11. 实现步骤 + +| 阶段 | 文件 | 说明 | +|------|------|------| +| 1. 分支 | `git checkout -b tcp-v3 main` | 基于 main 创建新分支 | +| 2. 头文件 | tcp_header.h, tcp_op_state.h | 17B header + 3 opcodes + op state | +| 3. 内存池 | tcp_memory_pool.h/.cpp | 纯簿记, 无硬件注册 | +| 4. Future | tcp_future.h | header-only, wait + wait_for | +| 5. Context | tcp_context.h/.cpp | 共享 io_context + connection_pool + thread | +| 6. 连接池 | tcp_connection_pool.h/.cpp | get/return/cleanup/clear | +| 7. Session | tcp_session.h/.cpp | ServerSession async_read 回调链 | +| 8. 端点 | tcp_endpoint.h/.cpp | async_send/recv/read/write | +| 9. 构建 | CMakeLists 链 + bind.cpp | BUILD_TCP + pybind11 | +| 10. 测试 | test_tcp_endpoint.py | 5 用例 + timeout 测试 | +| 11. 脚本 | build_and_test.sh | 一键构建+测试 | +| 12. 提交 | git commit | 单 commit, 清晰消息 | + +--- + +## 9. send/recv 设计深度分析 + +### 核心矛盾:RDMA vs TCP 的 send/recv 语义差异 + +RDMA 的 send/recv 是**硬件匹配**的: +- 发送方 post Send WR → 硬件从本地 buffer 取数据 → 发到对端 RQ +- 接收方 post Recv WR → 硬件在 RQ 上预置 WQE (buffer地址 + 长度) +- 硬件按**FIFO 顺序**匹配:第 N 个到达的 SEND 消费第 N 个预置的 RECV +- 如果 SEND 到达时没有 RECV → RNR NAK (Receiver Not Ready) → 发送方重试 +- 如果 SEND 数据量 > RECV buffer → 截断或报错 + +TCP **没有硬件匹配**,所有匹配逻辑必须在软件中实现。这带来了三个核心问题: + +| 问题 | RDMA 方案 | TCP 需要解决 | +|------|---------|------------| +| 匹配: 哪个 SEND 对哪个 RECV? | 硬件 RQ FIFO | 软件队列或 tag 匹配 | +| 顺序: SEND 先到还是 RECV 先到? | 硬件 RNR 重试 | 缓冲或拒绝 | +| 大小: 发送量 > 接收 buffer? | 截断/报错 | 截断或分片 | + +### 三种匹配策略 + +#### 策略 A: FIFO 队列匹配(v3 plan 默认) + +``` +recv(chunk) → pending_recvs_.push_back({buffer, op_state}) +ServerSession dispatch(OP_SEND): + payload = readBody() + slot = recv_matcher_() // pop front + memcpy(slot.buffer, payload, min(len, slot.length)) + signal slot.op_state +``` + +**优点**: 实现简单,与 RDMA 语义一致,足够支持双端点 ping-pong 通信。 +**缺点**: 严格 FIFO——调用者无法指定"这个 recv 对应后面第 N 个 send"。多 slot 场景(如 SlimeRPC 的 slotted mailbox)无法用 FIFO 区分。 + +#### 策略 B: Tag 匹配(Gloo 风格) + +``` +wire: [header{OP_SEND, sz, tag}] + payload +recv(tag, buffer) → pending_recvs_[tag].push({buffer, op_state}) +ServerSession dispatch(OP_SEND): + payload = readBody() + slot = pending_recvs_[hdr.addr_as_tag].pop() + memcpy(slot.buffer, payload) +``` + +**优点**: 灵活,支持多路复用——一个 TCP 连接可以承载多个逻辑流(如 RPC slot)。 +**缺点**: header.addr 字段被复用为 tag(牺牲了 addr 的原始语义),协议复杂度增加。 + +#### 策略 C: Slot 预注册(Gloo Buffer 风格) + +``` +每个 Pair 预先创建 N 个 slot buffer: + pair.createSendBuffer(slot=0, ptr, size) + pair.createRecvBuffer(slot=1, ptr, size) +wire: [header{OP_SEND, sz, 0, slot}] + payload +ServerSession: 直接 lookup slot → memcpy +``` + +**优点**: 零队列开销,O(1) slot 查找,SlimeRPC 天然适配。 +**缺点**: 需要预注册 slot(与当前 DLSlime MR 模型不兼容),灵活度低。 + +### 推荐策略:分层渐进 + +``` +Phase 1 (v3) — FIFO 基础: + pending_recvs_ = deque<{buffer, op_state}> + wire: header{OP_SEND, sz, addr=0} + 匹配: 严格 FIFO + 足够: 双端点 ping-pong、简单 RPC + +Phase 2 — 缓冲早到 SEND: + early_sends_ = deque<{payload_data}> + 如果 dispatch(OP_SEND) 时 pending_recvs_ 为空: + → 缓存 payload 到 early_sends_(带大小上限) + → 下次 recv() 先检查 early_sends_ 再入队 + 避免数据丢失 + +Phase 3 — Tag 匹配 (如需要): + 扩展 header: 用 2 字节 reserved 字段承载 tag + pending_recvs_ = map> + 支持多路复用 +``` + +### send/recv 与 read/write 的本质区别 + +很多人混淆 send/recv 和 write/read: + +| | send/recv | write/read | +|---|---|---| +| 语义 | **双边**:双方都需要显式操作 | **单边**:一方发起,另一方无感知 | +| 数据方向 | send=push, recv=pull (被动) | write=push to remote addr, read=pull from remote addr | +| 远端参与 | recv 方必须预先注册 buffer | 远端 ServerSession 自动处理,无需注册 | +| 寻址方式 | **无地址**(匹配决定目标 buffer) | **有地址**(header.addr 指定远端 buffer) | +| RDMA 类比 | ibv_post_send / ibv_post_recv | ibv_post_send with RDMA_WRITE/RDMA_READ | + +核心洞察:**send/recv 的"地址"是隐式的——通过匹配关系决定; +write/read 的"地址"是显式的——header.addr 直接指向远端内存。** + +这就是为什么 v3 plan 中: +- OP_SEND: header.addr = 0(不使用),通过 FIFO 匹配目标 buffer +- OP_WRITE: header.addr = remote_addr(直接指定远端目标地址) +- OP_READ: header.addr = remote_addr(直接指定远端源地址) + +### v3 实现策略 + +v3 采用策略 A(FIFO),但为策略 C(slot)预留空间: + +```cpp +// 当前: deque — 简单 FIFO +std::deque pending_recvs_; + +// Phase 3 可演进为: map — tag 匹配 +// std::unordered_map> pending_recvs_; +// 同时扩展 header: 用 reserved 字段承载 tag + +void TcpEndpoint::async_recv(const chunk_tuple_t& chunk, + int64_t timeout_ms, void*) { + // resolve MR → op_state → push to FIFO + // Phase 3: push to pending_recvs_[tag] instead +} + +ServerSession::dispatch(OP_SEND): + readBody() → chunk_buf_ + RecvSlot slot = recv_matcher_() + if (slot.buffer == 0): + // Phase 2: buffer early send to early_sends_ + return + memcpy(slot.buffer, chunk_buf_, min(payload_len, slot.length)) + signal slot.op_state +``` + +**recv timeout 语义**(区别于 socket timeout): +- SO_RCVTIMEO 是 socket 级超时(读数据超时) +- `future.wait_for()` 是**注册后等待匹配**的超时 +- 超时后 recv 保留在队列中:后续 SEND 仍可完成它(调用者可重试 wait_for) + +## 12. 验证计划 + +```bash +# 构建 +./dlslime/csrc/engine/tcp/build_and_test.sh build + +# 测试 +./dlslime/csrc/engine/tcp/build_and_test.sh test + +# 全流程 +./dlslime/csrc/engine/tcp/build_and_test.sh +``` + +**测试用例**: +1. `test_async_send_recv` — A async_send → B async_recv, B async_send → A async_recv +2. `test_async_write_read` — A async_write → B buffer, A async_read → verify +3. `test_recv_timeout` — async_recv + wait_for(0.3s) → None (无对端发送) +4. `test_send_timeout` — async_send(timeout_ms=10000) 参数 +5. `test_default_timeout` — async_send() 无参数 → 使用 30000ms 默认值 + +## 10. TcpContext 设计 — 为同步通信和资源共享做准备 + +### 使用优先级 + +TcpContext 类始终存在,ctx_ 成员始终非空。但构造方式有两种优先级: + +| 优先级 | 构造 | 场景 | 占比 | +|--------|------|------|------| +| **主** | `TcpEndpoint(port)` | 单 endpoint, 内部自动 new TcpContext | ~90% | +| **次** | `TcpEndpoint(ctx, port)` | 多 endpoint 共享 io_context 线程 | ~10% | + +**默认路径**:调用者无需感知 TcpContext——每个 endpoint 构造时内部 `make_unique()`, +自动创建 io_context + 后台线程 + 连接池。代码最简洁。 + +**高级路径**:当 PeerAgent 连接 N 个 peer 时,可手动创建一个 TcpContext 并注入到 N 个 +TcpEndpoint,将 N 个线程合并为 1 个。TcpContext 也用于测试中精确控制 io_context 生命周期。 + +两种路径不互斥——同一进程可混合使用。TcpContext 类永不删除,ctx_ 成员永不删除。 + +### TcpContext 接口 + +```cpp +class TcpContext { +public: + TcpContext(); // 创建 io_context + 启动后台线程 + ~TcpContext(); // stop + join + clear pool + + asio::io_context& io_context() { return io_ctx_; } + TcpConnectionPool& conn_pool() { return conn_pool_; } + void shutdown(); + +private: + asio::io_context io_ctx_; + std::thread io_thread_; + TcpConnectionPool conn_pool_{io_ctx_}; + bool running_{true}; +}; +``` + +### TcpEndpoint 与 TcpContext 的关系 + +```cpp +class TcpEndpoint { + // 【主构造】自包含 — 内部创建 TcpContext + explicit TcpEndpoint(uint16_t port = 0) + : own_ctx_(std::make_unique()) // 自动创建 + , acceptor_(own_ctx_->io_context()) + , ... { + ctx_ = own_ctx_.get(); // ctx_ → 内部 context + } + + // 【次构造】共享 — 注入外部 TcpContext + TcpEndpoint(TcpContext& ctx, uint16_t port = 0) + : acceptor_(ctx.io_context()) + , ... { + ctx_ = &ctx; // ctx_ → 外部 context, own_ctx_ = nullptr + } + +private: + TcpContext* ctx_{nullptr}; // 始终非空 + std::unique_ptr own_ctx_; // 仅主构造时非空 + // ... +}; +``` + +### 为同步通信预留 + +有了共享 TcpContext,同步包装器可以不依赖单个 endpoint 的事件循环: + +```cpp +// 未来 sync_send: 调 async_send + 立刻 future.wait() +std::shared_ptr sync_send(TcpEndpoint& ep, + const chunk_tuple_t& chunk, + int64_t timeout_ms = 30000) { + auto fut = ep.async_send(chunk, timeout_ms); + fut->wait(); // 阻塞调用者线程直到 io_context 完成 + return fut; +} +``` + +同步版本只是 async + wait() 的语法糖,不需要独立的底层实现。 \ No newline at end of file diff --git a/dlslime/csrc/engine/tcp/tcp_endpoint.cpp b/dlslime/csrc/engine/tcp/tcp_endpoint.cpp index df0792e3..eda64004 100644 --- a/dlslime/csrc/engine/tcp/tcp_endpoint.cpp +++ b/dlslime/csrc/engine/tcp/tcp_endpoint.cpp @@ -20,13 +20,6 @@ static void hdr_hton(SessionHeader& h) { h.addr = htole64(h.addr); } -void TcpEndpoint::set_sndtimeo(int fd, int64_t ms) { - struct timeval tv; - tv.tv_sec = static_cast(ms / 1000); - tv.tv_usec = static_cast((ms % 1000) * 1000); - setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); -} - // ── RecvMatcher factory ──────────────────────────────── ServerSession::RecvMatcher TcpEndpoint::make_recv_matcher() { @@ -173,7 +166,7 @@ bool TcpEndpoint::write_message(tcp::socket& sock, // ── async_send ────────────────────────────────────────── std::shared_ptr -TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t timeout_ms, void*) { +TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t timeout_ms) { auto mr = local_pool_->get_mr_fast(static_cast(std::get<0>(chunk))); if (mr.length == 0) throw std::runtime_error("TcpEndpoint::async_send: invalid local MR"); @@ -191,14 +184,11 @@ TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t timeout_ms, void*) { return std::make_shared(op); } - if (timeout_ms > 0) - set_sndtimeo(conn->socket.native_handle(), timeout_ms); - SessionHeader hdr{len, 0, OP_SEND}; auto& pool = ctx_->conn_pool(); std::weak_ptr weak = weak_from_this(); - asio::post(ctx_->io_context(), [weak, conn, op, hdr, src, len, timeout_ms, &pool]() { + asio::post(ctx_->io_context(), [weak, conn, op, hdr, src, len, &pool]() { auto ep = weak.lock(); if (!ep) { op->completion_status.store(TCP_CLOSED, std::memory_order_release); @@ -214,9 +204,7 @@ TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t timeout_ms, void*) { asio::buffer(reinterpret_cast(src), len) }; asio::async_write(conn->socket, bufs, - [conn, op, timeout_ms, &pool](asio::error_code ec, size_t) { - if (timeout_ms > 0 && conn->socket.is_open()) - TcpEndpoint::set_sndtimeo(conn->socket.native_handle(), 0); + [conn, op, &pool](asio::error_code ec, size_t) { op->completion_status.store( ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release); if (op->signal) op->signal->set_comm_done(0); @@ -230,7 +218,7 @@ TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t timeout_ms, void*) { // ── async_recv ────────────────────────────────────────── std::shared_ptr -TcpEndpoint::async_recv(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/, void*) { +TcpEndpoint::async_recv(const chunk_tuple_t& chunk) { auto mr = local_pool_->get_mr_fast(static_cast(std::get<0>(chunk))); if (mr.length == 0) throw std::runtime_error("TcpEndpoint::async_recv: invalid local MR"); @@ -252,7 +240,7 @@ TcpEndpoint::async_recv(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/, void std::shared_ptr TcpEndpoint::async_read(const std::vector& assign, - int64_t /*timeout_ms*/, void*) { + int64_t /*timeout_ms*/) { if (assign.empty()) throw std::runtime_error("TcpEndpoint::async_read: empty assignment"); @@ -315,7 +303,6 @@ TcpEndpoint::async_read(const std::vector& assign, return; } - // Read raw response data (no header). asio::async_read(conn->socket, asio::buffer(reinterpret_cast(op->user_buffer), op->user_length), @@ -342,7 +329,7 @@ TcpEndpoint::async_read(const std::vector& assign, std::shared_ptr TcpEndpoint::async_write(const std::vector& assign, - int64_t timeout_ms, void*) { + int64_t /*timeout_ms*/) { if (assign.empty()) throw std::runtime_error("TcpEndpoint::async_write: empty assignment"); @@ -370,14 +357,11 @@ TcpEndpoint::async_write(const std::vector& assign, return std::make_shared(op); } - if (timeout_ms > 0) - set_sndtimeo(conn->socket.native_handle(), timeout_ms); - SessionHeader hdr{length, remote.addr + remote.offset + remote_off, OP_WRITE}; auto& pool = ctx_->conn_pool(); std::weak_ptr weak = weak_from_this(); - asio::post(ctx_->io_context(), [weak, conn, op, hdr, src, length, timeout_ms, &pool]() { + asio::post(ctx_->io_context(), [weak, conn, op, hdr, src, length, &pool]() { auto ep = weak.lock(); if (!ep) { op->completion_status.store(TCP_CLOSED, std::memory_order_release); @@ -393,9 +377,7 @@ TcpEndpoint::async_write(const std::vector& assign, asio::buffer(reinterpret_cast(src), length) }; asio::async_write(conn->socket, bufs, - [conn, op, timeout_ms, &pool](asio::error_code ec, size_t) { - if (timeout_ms > 0 && conn->socket.is_open()) - TcpEndpoint::set_sndtimeo(conn->socket.native_handle(), 0); + [conn, op, &pool](asio::error_code ec, size_t) { op->completion_status.store( ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release); if (op->signal) op->signal->set_comm_done(0); @@ -417,7 +399,6 @@ void TcpEndpoint::shutdown() { acceptor_.close(); - // Force-complete all pending operations. { std::lock_guard lk(recv_mu_); for (auto& pr : pending_recvs_) { @@ -439,7 +420,6 @@ void TcpEndpoint::shutdown() { pending_reads_.clear(); } - // If self-contained, stop the private TcpContext. if (own_ctx_) own_ctx_->shutdown(); } diff --git a/dlslime/csrc/engine/tcp/tcp_endpoint.h b/dlslime/csrc/engine/tcp/tcp_endpoint.h index 344c4901..29996b58 100644 --- a/dlslime/csrc/engine/tcp/tcp_endpoint.h +++ b/dlslime/csrc/engine/tcp/tcp_endpoint.h @@ -9,7 +9,6 @@ #include #include #include -#include #include #include @@ -32,10 +31,10 @@ class TcpEndpoint : public std::enable_shared_from_this { public: static constexpr int64_t kDefaultTimeoutMs = 30000; - // Self-contained: creates its own TcpContext + io_thread. + // 【主构造】自包含 TcpContext (最常用) explicit TcpEndpoint(uint16_t port = 0); - // Shared context: multiple endpoints share one io_context thread. + // 【次构造】共享 TcpContext (多 endpoint 复用单 io_context 线程) TcpEndpoint(TcpContext& ctx, uint16_t port = 0); ~TcpEndpoint(); @@ -55,27 +54,26 @@ class TcpEndpoint : public std::enable_shared_from_this { const json& mr_info); json mr_info() const; - // ── Async I/O (all return Future; I/O on io_context thread) ── + // ── Async I/O (all return Future immediately; I/O runs on io_context thread) ── + // Bilateral send. timeout_ms controls socket write timeout (SO_SNDTIMEO). std::shared_ptr async_send( const chunk_tuple_t& chunk, - int64_t timeout_ms = kDefaultTimeoutMs, - void* stream = nullptr); + int64_t timeout_ms = kDefaultTimeoutMs); + // Bilateral recv. Timeout via future.wait_for(). std::shared_ptr async_recv( - const chunk_tuple_t& chunk, - int64_t timeout_ms = kDefaultTimeoutMs, - void* stream = nullptr); + const chunk_tuple_t& chunk); + // Unilateral read: request remote to send data from registered buffer. std::shared_ptr async_read( const std::vector& assign, - int64_t timeout_ms = kDefaultTimeoutMs, - void* stream = nullptr); + int64_t timeout_ms = kDefaultTimeoutMs); + // Unilateral write: push data to remote registered buffer. std::shared_ptr async_write( const std::vector& assign, - int64_t timeout_ms = kDefaultTimeoutMs, - void* stream = nullptr); + int64_t timeout_ms = kDefaultTimeoutMs); // ── Accessors ─────────────────────────────────────── void setId(int64_t id) { id_.store(id, std::memory_order_relaxed); } @@ -83,16 +81,13 @@ class TcpEndpoint : public std::enable_shared_from_this { bool is_connected() const { return connected_.load(std::memory_order_acquire); } private: - // ── io_context management ─────────────────────────── void start_io(); void do_accept(); ServerSession::RecvMatcher make_recv_matcher(); - // ── helpers ───────────────────────────────────────── bool is_initiator(const std::string& peer_host, uint16_t peer_port) const; bool write_message(asio::ip::tcp::socket& sock, const SessionHeader& hdr, const void* payload); - static void set_sndtimeo(int fd, int64_t ms); // ── identity ──────────────────────────────────────── std::atomic id_{-1}; @@ -104,7 +99,7 @@ class TcpEndpoint : public std::enable_shared_from_this { // ── asio core ─────────────────────────────────────── TcpContext* ctx_{nullptr}; - std::unique_ptr own_ctx_; // if self-contained + std::unique_ptr own_ctx_; asio::ip::tcp::acceptor acceptor_; std::atomic running_{true}; @@ -119,7 +114,7 @@ class TcpEndpoint : public std::enable_shared_from_this { std::mutex recv_mu_; std::deque pending_recvs_; - // ── read matching (connections reserved for response) ── + // ── read matching ─────────────────────────────────── struct PendingRead { std::shared_ptr conn; std::shared_ptr op_state; diff --git a/dlslime/csrc/engine/tcp/test_tcp_endpoint.py b/dlslime/csrc/engine/tcp/test_tcp_endpoint.py index 4d6f85b2..9a406326 100644 --- a/dlslime/csrc/engine/tcp/test_tcp_endpoint.py +++ b/dlslime/csrc/engine/tcp/test_tcp_endpoint.py @@ -130,7 +130,7 @@ def run_b(): def run_a(): ep_a.connect(ep_b.endpoint_info()) - fut = ep_a.async_recv((h_a, 0, 5), timeout_ms=300) + fut = ep_a.async_recv((h_a, 0, 5)) result = fut.wait_for(0.3) print(f" recv wait_for(0.3s): {result} (expected None)") assert result is None, f"Expected None (timeout), got {result}" diff --git a/dlslime/csrc/python/bind.cpp b/dlslime/csrc/python/bind.cpp index 0b1efc90..e865c0e9 100644 --- a/dlslime/csrc/python/bind.cpp +++ b/dlslime/csrc/python/bind.cpp @@ -612,32 +612,26 @@ PYBIND11_MODULE(_slime_c, m) &dlslime::tcp::TcpEndpoint::register_remote_memory_region, py::arg("name"), py::arg("mr_info"), py::call_guard()) .def("async_send", - py::overload_cast( + py::overload_cast( &dlslime::tcp::TcpEndpoint::async_send), py::arg("chunk"), py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, - py::arg("stream") = nullptr, py::call_guard()) .def("async_recv", - py::overload_cast( - &dlslime::tcp::TcpEndpoint::async_recv), + &dlslime::tcp::TcpEndpoint::async_recv, py::arg("chunk"), - py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, - py::arg("stream") = nullptr, py::call_guard()) .def("async_read", - py::overload_cast&, int64_t, void*>( + py::overload_cast&, int64_t>( &dlslime::tcp::TcpEndpoint::async_read), py::arg("assign"), py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, - py::arg("stream") = nullptr, py::call_guard()) .def("async_write", - py::overload_cast&, int64_t, void*>( + py::overload_cast&, int64_t>( &dlslime::tcp::TcpEndpoint::async_write), py::arg("assign"), py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, - py::arg("stream") = nullptr, py::call_guard()); #endif // BUILD_TCP