Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/run_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ else
fi

run_cpp_tests() {
CMD_LINE="python ${TIMEOUT_TOOL_PATH} $((10*60)) ${GTESTS_PATH}/UCXX_TEST"
CMD_LINE="python ${TIMEOUT_TOOL_PATH} $((20*60)) ${GTESTS_PATH}/UCXX_TEST"

log_command "${CMD_LINE}"
UCX_TCP_CM_REUSEADDR=y ${CMD_LINE}
Expand Down
13 changes: 13 additions & 0 deletions cpp/include/ucxx/experimental/worker_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ class WorkerBuilder final {
*/
WorkerBuilder& pythonFuture(bool enable = true);

/**
* @brief Configure request attributes querying.
*
* When enabled, each `ucxx::Request` created from the worker will have its UCP
* attributes (such as the debug string) queried immediately after submission, making
* them available via `ucxx::Request::getRequestAttributes()`. This may have
* non-negligible runtime cost and is therefore disabled by default.
*
* @param[in] enable whether request attributes querying is enabled (default: true).
* @return Reference to this builder for method chaining.
*/
WorkerBuilder& requestAttributes(bool enable = true);

/**
* @brief Configure the preferred buffer type for CUDA allocations.
*
Expand Down
9 changes: 0 additions & 9 deletions cpp/include/ucxx/internal/request_am.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,6 @@ class RecvAmMessage {
AmReceiverCallbackType receiverCallback = AmReceiverCallbackType(),
std::vector<std::byte> userHeader = {});

/**
* @brief Set the UCP request.
*
* Set the underlying UCP request (`_request` attribute) of the `RequestAm`.
*
* @param[in] request the UCP request associated to the active message receive operation.
*/
void setUcpRequest(void* request);

/**
* @brief Execute the `ucxx::Request::callback()`.
*
Expand Down
54 changes: 54 additions & 0 deletions cpp/include/ucxx/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <chrono>
#include <memory>
#include <string>
#include <utility>

#include <ucp/api/ucp.h>

Expand Down Expand Up @@ -37,6 +38,14 @@ namespace ucxx {
*/
class Request : public Component {
protected:
/**
* @brief Request attributes reported by `ucp_request_query`.
*/
struct Attributes {
ucs_memory_type memoryType{UCS_MEMORY_TYPE_UNKNOWN}; ///< Memory type of the request
std::string debugString{}; ///< Stored debug string
};

ucs_status_t _status{UCS_INPROGRESS}; ///< Requests status
std::string _status_msg{}; ///< Human-readable status message
void* _request{nullptr}; ///< Pointer to UCP request
Expand All @@ -54,6 +63,9 @@ class Request : public Component {
bool _enablePythonFuture{true}; ///< Whether Python future is enabled for this request
RequestCallbackUserFunction _callback{nullptr}; ///< Completion callback
RequestCallbackUserData _callbackData{nullptr}; ///< Completion callback data
Attributes _requestAttr{}; ///< Request attributes queried when request is posted; the
///< default `memoryType == UCS_MEMORY_TYPE_UNKNOWN` doubles
///< as the "not populated yet" sentinel

/**
* @brief Protected constructor of an abstract `ucxx::Request`.
Expand Down Expand Up @@ -235,6 +247,48 @@ class Request : public Component {
* @return The received user header (if applicable) or an empty string.
*/
[[nodiscard]] virtual std::string getRecvHeader();

/**
* @brief Get the requests's attributes.
*
* Returns the request attributes as a struct. The owning `ucxx::Worker` must have been
* created with request attributes querying enabled (see
* `ucxx::experimental::WorkerBuilder::requestAttributes()`); otherwise the attributes
* are never populated and this method throws. Querying the underlying UCP request is
* an implementation detail performed eagerly when the request is submitted. All
* non-status fields exposed by UCP are queried, use `getStatus()` to obtain the status.
*
* @throw ucxx::UnsupportedError if the owning worker was not built with request
* attributes querying enabled. Requires `Worker`
* created with
* `ucxx::experimental::WorkerBuilder::requestAttributes(true)`.
* @throw ucxx::NoElemError if attributes are unavailable for this specific
* request: either because UCX took an inline-completion
* path that produced no UCP request to query, or because
* the request has not completed yet. Callers can
* distinguish the latter from the former by checking
* `isCompleted()`.
*
* @return An `Attributes` containing the request attributes.
*/
[[nodiscard]] Attributes queryAttributes();

protected:
/**
* @brief Publish the UCP request handle and capture its attributes.
*
* Single critical section that stores the UCP request pointer in `_request` and, when
* the owning worker has request attributes querying enabled, immediately queries those
* attributes. The completion path frees the UCP request inside `setStatus` under the
* same `_mutex`, so this helper guarantees the query and the free are mutually
* exclusive and that there are no use-after-free in threaded progress modes.
*
* Every submit site calls this after obtaining the request handle from the corresponding
* `ucp_*_nbx` function.
*
* @param[in] request the UCP request pointer returned by a non-blocking submit.
*/
void publishRequest(void* request);
};

} // namespace ucxx
43 changes: 42 additions & 1 deletion cpp/include/ucxx/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class Worker : public Component {
protected:
bool _enableFuture{
false}; ///< Boolean identifying whether the worker was created with future capability
bool _enableRequestAttributes{
false}; ///< Whether request attributes (e.g. UCP debug info) are queried for each request
std::mutex _futuresPoolMutex{}; ///< Mutex to access the futures pool
std::queue<std::shared_ptr<Future>>
_futuresPool{}; ///< Futures pool to prevent running out of fresh futures
Expand Down Expand Up @@ -507,6 +509,19 @@ class Worker : public Component {
*/
[[nodiscard]] bool isFutureEnabled() const;

/**
* @brief Inquire if worker has been created with request attributes querying enabled.
*
* Check whether the worker has been created with request attributes querying enabled.
* When enabled, each `ucxx::Request` will have its UCP attributes (such as the debug
* string) queried immediately after submission, making them available via
* `ucxx::Request::queryAttributes()`. Querying request attributes has a
* non-negligible runtime cost and is therefore disabled by default.
*
* @returns `true` if request attributes querying is enabled, `false` otherwise.
*/
[[nodiscard]] bool isRequestAttributesEnabled() const noexcept;

/**
* @brief Get the preferred buffer type for CUDA allocations.
*
Expand Down Expand Up @@ -1000,7 +1015,7 @@ class Worker : public Component {
*
* Using a Python future may be requested by specifying `enablePythonFuture`. If a
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
* ensure the transfer has completed.
*
* @note If a `callbackFunction` is specified, the lifetime of `callbackData` and of any
* other objects used in the scope of `callbackFunction` must be guaranteed by the caller
Expand All @@ -1020,6 +1035,32 @@ class Worker : public Component {
const bool enablePythonFuture = false,
RequestCallbackUserFunction callbackFunction = nullptr,
RequestCallbackUserData callbackData = nullptr);

/**
* @brief Worker attributes reported by `ucp_worker_query`.
*/
struct Attributes {
/// Thread safety level the worker was created with.
ucs_thread_mode_t threadMode{UCS_THREAD_MODE_MULTI};
/// Maximum allowed header size for `ucp_am_send_nbx`.
size_t maxAmHeader{0};
/// Worker name used by tracing and analysis tools.
std::string name{};
/// Maximum debug-string buffer size accepted by `ucp_request_query`.
size_t maxDebugString{0};
};

/**
* @brief Get the worker's attributes.
*
* Returns the worker attributes as a struct, querying UCP via `ucp_worker_query` under
* the hood. All non-address fields exposed by UCP are queried, use `getAddress()` to
* obtain the address.
*
* @returns An `Attributes` filled with all queried fields.
* @throws ucxx::Error if an error occurred while querying worker attributes.
*/
[[nodiscard]] Attributes queryAttributes() const;
};

/**
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/experimental/worker_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct WorkerBuilder::Impl {
std::shared_ptr<Context> context;
bool enableDelayedSubmission{false};
bool enableFuture{false};
bool enableRequestAttributes{false};
BufferType cudaBufferType{BufferType::Invalid};

explicit Impl(std::shared_ptr<Context> ctx) : context(std::move(ctx)) {}
Expand All @@ -41,6 +42,12 @@ WorkerBuilder& WorkerBuilder::pythonFuture(bool enable)
return *this;
}

WorkerBuilder& WorkerBuilder::requestAttributes(bool enable)
{
_impl->enableRequestAttributes = enable;
return *this;
}

WorkerBuilder& WorkerBuilder::cudaBufferType(BufferType bufferType)
{
_impl->cudaBufferType = bufferType;
Expand All @@ -51,6 +58,7 @@ std::shared_ptr<Worker> WorkerBuilder::build() const
{
auto worker =
ucxx::createWorker(_impl->context, _impl->enableDelayedSubmission, _impl->enableFuture);
worker->_enableRequestAttributes = _impl->enableRequestAttributes;
if (_impl->cudaBufferType != BufferType::Invalid)
worker->setCudaBufferType(_impl->cudaBufferType);
return worker;
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/internal/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ RecvAmMessage::RecvAmMessage(internal::AmData* amData,
}
}

void RecvAmMessage::setUcpRequest(void* request) { _request->_request = request; }

void RecvAmMessage::callback(void* request, ucs_status_t status)
{
std::visit(data::dispatch{
Expand Down
67 changes: 62 additions & 5 deletions cpp/src/request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: BSD-3-Clause
*/
#include <chrono>
#include <cstring>
#include <memory>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -140,20 +141,18 @@ void Request::callback(void* request, ucs_status_t status)
if (_status != UCS_INPROGRESS)
ucxx_trace_req_f(_ownerString.c_str(),
this,
_request,
request,
_operationName.c_str(),
"has status already set to %d (%s), callback setting %d (%s)",
_status,
ucs_status_string(_status),
status,
ucs_status_string(status));

if (UCS_PTR_IS_PTR(_request)) ucp_request_free(request);

ucxx_trace_req_f(_ownerString.c_str(), this, _request, _operationName.c_str(), "completed");
ucxx_trace_req_f(_ownerString.c_str(), this, request, _operationName.c_str(), "completed");
setStatus(status);
ucxx_trace_req_f(
_ownerString.c_str(), this, _request, _operationName.c_str(), "isCompleted: %d", isCompleted());
_ownerString.c_str(), this, request, _operationName.c_str(), "isCompleted: %d", isCompleted());
}

void Request::process()
Expand Down Expand Up @@ -235,11 +234,69 @@ void Request::setStatus(ucs_status_t status)
_ownerString.c_str(), this, _request, _operationName.c_str(), "invoking user callback");
_callback(status, _callbackData);
}

// Free the UCP request inside the lock so it is mutually exclusive with
// `publishRequest()`/`queryRequestAttributes()` on the submit thread.
if (UCS_PTR_IS_PTR(_request)) {
ucp_request_free(_request);
_request = nullptr;
}
}
}

const std::string& Request::getOwnerString() const { return _ownerString; }

void Request::publishRequest(void* request)
{
if (!_worker->isRequestAttributesEnabled()) {
std::lock_guard<std::recursive_mutex> lock(_mutex);
_request = request;
return;
}

std::lock_guard<std::recursive_mutex> lock(_mutex);
_request = request;

if (_requestAttr.memoryType != UCS_MEMORY_TYPE_UNKNOWN) return;

ucp_request_attr_t result;

auto worker_attr = _worker->queryAttributes();

std::string debugString(worker_attr.maxDebugString, '\0');

result.field_mask = UCP_REQUEST_ATTR_FIELD_MEM_TYPE | UCP_REQUEST_ATTR_FIELD_INFO_STRING |
UCP_REQUEST_ATTR_FIELD_INFO_STRING_SIZE;

result.debug_string = debugString.data();
result.debug_string_size = debugString.size();

if (UCS_PTR_IS_PTR(_request)) {
auto queryStatus = ucp_request_query(_request, &result);
if (queryStatus == UCS_OK && result.debug_string != nullptr) {
debugString.resize(std::strlen(debugString.c_str()));
_requestAttr.debugString = std::move(debugString);
_requestAttr.memoryType = result.mem_type;
}
}
}

Request::Attributes Request::queryAttributes()
{
if (!_worker->isRequestAttributesEnabled())
throw ucxx::UnsupportedError(
"Request attributes querying is disabled on the owning worker; build the worker "
"with `ucxx::experimental::WorkerBuilder::requestAttributes(true)` to enable it");

std::lock_guard<std::recursive_mutex> lock(_mutex);

if (_requestAttr.memoryType != UCS_MEMORY_TYPE_UNKNOWN) return _requestAttr;

throw ucxx::NoElemError(
"Request attributes are not available for this request: UCX took an inline-completion "
"path with no queryable UCP request, or the request has not completed yet");
}

std::shared_ptr<Buffer> Request::getRecvBuffer() { return nullptr; }

std::string Request::getRecvHeader() { return {}; }
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ ucs_status_t RequestAm::recvCallback(void* arg,
return s;
} else {
// The request will be handled by the callback
recvAmMessage->setUcpRequest(status);
req->publishRequest(status);
amData->_registerInflightRequest(req);

{
Expand Down Expand Up @@ -470,8 +470,7 @@ void RequestAm::request()
amSend._count,
&param);

std::lock_guard<std::recursive_mutex> lock(_mutex);
_request = request;
publishRequest(request);
},
[](auto) { throw ucxx::UnsupportedError("Only send active messages can call request()"); },
},
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/request_endpoint_close.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "ucxx/request_data.h"
Expand Down Expand Up @@ -78,8 +78,7 @@ void RequestEndpointClose::request()
else
throw ucxx::Error("A valid endpoint or worker is required for a close operation.");

std::lock_guard<std::recursive_mutex> lock(_mutex);
_request = request;
publishRequest(request);
}

void RequestEndpointClose::populateDelayedSubmission()
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/request_flush.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES.
* SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: BSD-3-Clause
*/
#include <cstdio>
Expand Down Expand Up @@ -77,8 +77,7 @@ void RequestFlush::request()
else
throw ucxx::Error("A valid endpoint or worker is required for a flush operation.");

std::lock_guard<std::recursive_mutex> lock(_mutex);
_request = request;
publishRequest(request);
}

void RequestFlush::populateDelayedSubmission()
Expand Down
Loading
Loading