Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a2c620b
Catch `BaseException` during distributed-ucxx `write()`
pentschev Apr 17, 2026
4ce668c
Fix possible ApplicationContext leak and f-strings
pentschev Apr 17, 2026
4a3b245
Revert wrong circular reference breaking
pentschev Apr 17, 2026
5daeb14
Flush event loop before ucxx.reset()
pentschev Apr 23, 2026
d9c2732
Merge remote-tracking branch 'upstream/main' into more-distributed-uc…
pentschev Apr 23, 2026
74ee825
Attempt to break UCXXListener refcycle
pentschev Apr 23, 2026
047402e
Stop ucxx_server in UCXXListener.stop()
pentschev Apr 23, 2026
91b8d21
Replace ApplicationContext self reference for UCXListener
pentschev Apr 23, 2026
c0aa3be
Fix asyncio timeout for Python 3.14 + pytest-asyncio 1.3.0
pentschev Apr 24, 2026
4d03d73
Merge remote-tracking branch 'upstream/main' into more-distributed-uc…
pentschev Apr 24, 2026
9597e3a
Increase timeout to 120s for cupy/numba 16 MB in tag_multi tests
pentschev Apr 27, 2026
aa4cd32
Increase timeout to 240s seconds
pentschev Apr 27, 2026
a35bc22
Add timeout comment
pentschev Apr 27, 2026
7917b45
Prevent segfault when test timeout cancels mid-CUDA-transfer handler
pentschev Apr 28, 2026
0b1dc50
Merge remote-tracking branch 'upstream/main' into more-distributed-uc…
pentschev Apr 28, 2026
afa3f47
Remove unnecessary del and misleading comment from listener handler
pentschev Apr 28, 2026
68bdc70
Close client endpoint before waiting for listener handlers in CUDA tests
pentschev Apr 28, 2026
37ee61a
Do not cancel inflight requests before endpoint close
pentschev Apr 28, 2026
f4d4cb9
Cancel inflight requests and submit force-close atomically in a singl…
Apr 28, 2026
53dcb83
Fix invalid _handle usage
Apr 29, 2026
2bd2e1d
Scoped inflight request cancelation
May 12, 2026
8ab7600
Merge remote-tracking branch 'upstream/main' into more-distributed-uc…
pentschev May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cpp/include/ucxx/inflight_requests.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,22 @@ class InflightRequests {
void remove(const std::shared_ptr<Request>& request);

/**
* @brief Issue cancelation of all inflight requests and clear the internal container.
* @brief Issue cancelation of inflight requests and clear the internal container.
*
* Issue cancelation of all inflight requests known to this object and clear the
* internal container. The total number of canceled requests is returned.
* Issue cancelation of inflight requests known to this object. The total number of
* canceled requests is returned.
*
* When `workerOnly` is `true`, only worker-scoped operations
* (`Request::isWorkerOperation() == true`, i.e. receive variants) are cancelled and
* removed; endpoint-scoped operations are left in the container untouched, on the
* assumption that the caller will follow up with `ucp_ep_close_nbx(FORCE)` which
* handles their UCT-level cleanup atomically. When `workerOnly` is `false` (the
* default), all inflight requests are cancelled.
*
* @param[in] workerOnly if `true`, cancel only worker-scoped requests.
* @returns The total number of canceled requests.
*/
size_t cancelAll();
size_t cancelAll(bool workerOnly = false);

/**
* @brief Releases the internally-tracked containers.
Expand Down
21 changes: 21 additions & 0 deletions cpp/include/ucxx/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,27 @@ class Request : public Component {
*/
virtual void cancel();

/**
* @brief Whether this request is worker-scoped (vs endpoint-scoped).
*
* Returns `true` for operations that are tracked by the UCP worker rather than a
* specific UCP endpoint, currently the receive variants (`TagReceive`,
* `TagReceiveWithHandle`, `AmReceive`, `StreamReceive`, `TagMultiReceive`).
*
* `Endpoint::closeBlocking()` uses this to decide which requests must be cancelled
* explicitly via `ucp_request_cancel`: worker-scoped requests are not cancelled by
* `ucp_ep_close_nbx(UCP_EP_CLOSE_FLAG_FORCE)` (which only tears down endpoint-bound
* state) and would otherwise hang forever. Endpoint-scoped requests are left to
* UCX's FORCE close, which handles their UCT-level cleanup atomically, calling
* `ucp_request_cancel` on them in addition to FORCE close has been observed to
* leave UCT-level pending queue entries referencing freed staging buffers, which
* the next `ucp_worker_progress()` then crashes dispatching (see
* test_send_recv_multi.py CUDA segfault).
*
* @returns `true` if this request is worker-scoped, `false` if endpoint-scoped.
*/
[[nodiscard]] bool isWorkerOperation() const;

/**
* @brief Return the status of the request.
*
Expand Down
69 changes: 52 additions & 17 deletions cpp/src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,6 @@ void Endpoint::closeBlocking(uint64_t period, uint64_t maxAttempts)
{
if (_closing.exchange(true) || _handle == nullptr) return;

size_t canceled = cancelInflightRequestsBlocking(3000000000 /* 3s */, 3);
ucxx_debug("ucxx::Endpoint::%s, Endpoint: %p, UCP handle: %p, canceled %lu requests",
__func__,
this,
_handle,
canceled);

ucp_request_param_t param{};
if (_endpointErrorHandling)
param = {.op_attr_mask = UCP_OP_ATTR_FIELD_FLAGS, .flags = UCP_EP_CLOSE_FLAG_FORCE};
Expand All @@ -288,8 +281,45 @@ void Endpoint::closeBlocking(uint64_t period, uint64_t maxAttempts)
bool submitted = false;
for (uint64_t i = 0; i < maxAttempts && !closeSuccess; ++i) {
if (!submitted) {
// Cancel WORKER-SCOPED inflight requests (tag_recv & friends) and submit
// FORCE close ATOMICALLY in a single pre-callback no
// ucp_worker_progress() between them, and no ucp_request_cancel() on
// endpoint-scoped operations.
//
// Why cancel anything at all (UCX FORCE close handles endpoint state):
// Receive operations (`tag_recv`, `am_recv`, ...) post requests on the
// UCP worker via `ucp_*_recv_nbx(worker, ...)`. ucp_ep_close_nbx(FORCE)
// tears down the UCP endpoint but leaves worker-scoped requests
// pending forever, an `await ep.close()` racing with an outstanding
// `await ep.recv()` would hang. See
// test_shutdown.py::test_{server,client}_shutdown.
//
// Why ONLY worker-scoped (not endpoint-scoped sends/RMA):
// Calling ucp_request_cancel() on an endpoint-scoped tag_send and then
// FORCE-closing the endpoint has been observed to leave UCT-level TCP
// pending queue entries pointing at freed staging buffers; the next
// ucp_worker_progress() then crashed dispatching them
// (uct_tcp_pending_queue_dispatch -> uct_cuda_copy_ep_get_short ->
// cuMemcpyAsync -> SIGSEGV), see test_send_recv_multi.py CUDA segfault.
// FORCE close handles endpoint-scoped requests' UCT cleanup atomically
// on its own, so we leave them alone.
//
// Why atomic with FORCE close (not as a separate pre-callback):
// When cancelAll and FORCE close were separate pre-callbacks (the
// old cancelInflightRequestsBlocking path), a full ucp_worker_progress()
// ran between them, see prior commit "Cancel inflight requests and
// submit force-close atomically in a single pre-callback".
if (!worker->registerGenericPre(
[this, &status, &param]() { status = ucp_ep_close_nbx(_handle, &param); }, period))
[this, &status, &param]() {
_inflightRequests->cancelAll(/*workerOnly=*/true);
status = ucp_ep_close_nbx(_handle, &param);
// Invalidate _handle synchronously immediately, to prevent
// time window where _handle` points to freed UCP memory, usually
// observed in `populateDelayedSubmission()`.
_originalHandle = _handle;
_handle = nullptr;
},
period))
continue;
submitted = true;
}
Expand All @@ -306,7 +336,7 @@ void Endpoint::closeBlocking(uint64_t period, uint64_t maxAttempts)
"endpoint: %s",
__func__,
this,
_handle,
_originalHandle,
ucs_status_string(UCS_PTR_STATUS(status)));
}
},
Expand All @@ -323,10 +353,16 @@ void Endpoint::closeBlocking(uint64_t period, uint64_t maxAttempts)
"ucxx::Endpoint::%s, Endpoint: %p, UCP handle: %p, all attempts to close timed out",
__func__,
this,
_handle);
_originalHandle != nullptr ? _originalHandle : _handle);
}
} else {
status = ucp_ep_close_nbx(_handle, &param);
// No progress thread: cancel inflight + FORCE close back-to-back, then
// drive progress here. Same atomicity reasoning as the progress-thread
// path above (no ucp_worker_progress() between cancel and FORCE close).
_inflightRequests->cancelAll(/*workerOnly=*/true);
status = ucp_ep_close_nbx(_handle, &param);
_originalHandle = _handle;
_handle = nullptr;
if (UCS_PTR_IS_PTR(status)) {
ucs_status_t s;
while ((s = ucp_request_check_status(status)) == UCS_INPROGRESS)
Expand All @@ -337,11 +373,12 @@ void Endpoint::closeBlocking(uint64_t period, uint64_t maxAttempts)
"ucxx::Endpoint::%s, Endpoint: %p, UCP handle: %p, Error while closing endpoint: %s",
__func__,
this,
_handle,
_originalHandle,
ucs_status_string(UCS_PTR_STATUS(status)));
}
}
ucxx_trace("ucxx::Endpoint::%s, Endpoint: %p, UCP handle: %p, closed", __func__, this, _handle);
ucxx_trace(
"ucxx::Endpoint::%s, Endpoint: %p, UCP handle: %p, closed", __func__, this, _originalHandle);

if (UCS_PTR_IS_PTR(status)) ucp_request_free(status);

Expand All @@ -351,14 +388,12 @@ void Endpoint::closeBlocking(uint64_t period, uint64_t maxAttempts)
ucxx_debug("ucxx::Endpoint::%s, Endpoint: %p, UCP handle: %p, calling user close callback",
__func__,
this,
_handle);
_originalHandle);
_closeCallback(_status, _closeCallbackArg);
_closeCallback = nullptr;
_closeCallbackArg = nullptr;
}
}

std::swap(_handle, _originalHandle);
}

ucp_ep_h Endpoint::getHandle() { return _handle; }
Expand Down Expand Up @@ -448,7 +483,7 @@ size_t Endpoint::cancelInflightRequestsBlocking(uint64_t period, uint64_t maxAtt
"cancel inflight requests failed",
__func__,
this,
_handle);
_originalHandle != nullptr ? _originalHandle : _handle);
} else {
canceled = _inflightRequests->cancelAll();
}
Expand Down
22 changes: 19 additions & 3 deletions cpp/src/inflight_requests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,34 @@ void InflightRequests::merge(TrackedRequests&& trackedRequests)
if (r) _canceling.insert(std::move(r));
}

size_t InflightRequests::cancelAll()
size_t InflightRequests::cancelAll(bool workerOnly)
{
decltype(_inflight) toCancel;
decltype(_inflight) toKeep;
{
std::lock_guard<std::mutex> lock(_mutex);
toCancel = std::exchange(_inflight, {});
if (workerOnly) {
// Partition: receive (worker-scoped) requests get explicit ucp_request_cancel,
// endpoint-scoped requests stay in `_inflight` to be cleaned up by FORCE close.
for (auto& r : _inflight) {
if (r && r->isWorkerOperation())
toCancel.insert(r);
else
toKeep.insert(r);
}
_inflight = std::move(toKeep);
} else {
toCancel = std::exchange(_inflight, {});
}
}

size_t total = toCancel.size();
if (total == 0) return 0;

ucxx_debug("ucxx::InflightRequests::%s, canceling %lu requests", __func__, total);
ucxx_debug("ucxx::InflightRequests::%s, canceling %lu requests (workerOnly=%d)",
__func__,
total,
workerOnly);

for (auto& r : toCancel) {
if (r) r->cancel();
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,24 @@ Request::~Request()
ucxx_trace("ucxx::Request destroyed (%s): %p", _operationName.c_str(), this);
}

bool Request::isWorkerOperation() const
{
// Receive operations go through ucp_tag_recv_nbx / ucp_am_recv_nbx and friends,
// which post requests against the UCP worker rather than a specific endpoint.
// ucp_ep_close_nbx(FORCE) does not cancel them, so closeBlocking() must do so
// explicitly. Send / RMA operations are endpoint-bound and are cleaned up by
// FORCE close itself.
return std::visit(data::dispatch{
[](const data::TagReceive&) { return true; },
[](const data::TagReceiveWithHandle&) { return true; },
[](const data::TagMultiReceive&) { return true; },
[](const data::AmReceive&) { return true; },
[](const data::StreamReceive&) { return true; },
[](const auto&) { return false; },
},
_requestData);
}

void Request::cancel()
{
std::lock_guard<std::recursive_mutex> lock(_mutex);
Expand Down
6 changes: 4 additions & 2 deletions python/distributed-ucxx/distributed_ucxx/ucxx.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,9 @@ async def write(
for each_frame in send_frames:
await self.ep.send(each_frame)
return sum(sizes)
except ucxx.exceptions.UCXError:
except BaseException as e:
self.abort()
raise CommClosedError("While writing, the connection was closed")
raise CommClosedError("While writing, the connection was closed") from e

@log_errors
async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
Expand Down Expand Up @@ -729,6 +729,8 @@ async def serve_forever(client_ep):
self.ucxx_server = ucxx.create_listener(serve_forever, port=self._input_port)

def stop(self):
if self.ucxx_server is not None:
self.ucxx_server.close()
self.ucxx_server = None
_deregister_dask_resource(self._resource_id)

Expand Down
7 changes: 6 additions & 1 deletion python/distributed-ucxx/distributed_ucxx/utils_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION.
# SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION.
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import asyncio
import gc
import logging
import sys

Expand Down Expand Up @@ -84,6 +85,10 @@ def ucxx_loop(request):

with check_thread_leak():
yield loop

# Collect garbage to break any remaining reference cycles before reset.
gc.collect()

if ignore_alive_references:
try:
ucxx.reset()
Expand Down
5 changes: 3 additions & 2 deletions python/ucxx/ucxx/_lib_async/application_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: BSD-3-Clause

import logging
Expand Down Expand Up @@ -311,7 +311,7 @@ def create_listener(
cb_args=(
loop,
callback_func,
self,
weakref.ref(self),
endpoint_error_handling,
connect_timeout,
listener_id,
Expand All @@ -321,6 +321,7 @@ def create_listener(
),
listener_id,
self._listener_active_clients,
ctx=self,
)
return ret

Expand Down
Loading
Loading