Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion include/ofi_tree.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/*
* Copyright (c) 2015 Cray Inc. All rights reserved.
* Copyright (c) 2018 Intel Corp, Inc. All rights reserved.
*
* Copyright (C) 2026 Cornelis Networks.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
* General Public License (GPL) Version 2, available from the file
Expand Down Expand Up @@ -92,9 +93,15 @@ struct ofi_rbnode *ofi_rbmap_search(struct ofi_rbmap *map, void *key,
int (*compare)(struct ofi_rbmap *map, void *key, void *data));
int ofi_rbmap_insert(struct ofi_rbmap *map, void *key, void *data,
struct ofi_rbnode **node);
int ofi_rbmap_insert_at(struct ofi_rbmap *map, void *key, void *data,
struct ofi_rbnode **ret_node,
struct ofi_rbnode *prealloc);
void ofi_rbmap_delete(struct ofi_rbmap *map, struct ofi_rbnode *node);
int ofi_rbmap_find_delete(struct ofi_rbmap *map, void *key);
int ofi_rbmap_empty(struct ofi_rbmap *map);

struct ofi_rbnode *ofi_rbnode_alloc(struct ofi_rbmap *map);
void ofi_rbnode_free(struct ofi_rbmap *map, struct ofi_rbnode *node);


#endif /* OFI_TREE_H_ */
10 changes: 9 additions & 1 deletion man/fi_opx.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,15 @@ OPX is not compatible with Open MPI 4.1.x PML/BTL.
*FI_OPX_RZV_MIN_PAYLOAD_BYTES*
: Integer. The minimum length in bytes where rendezvous will be used.
For messages smaller than this threshold, the send will first try to be completed using eager or multi-packet eager.
Value must be between 64 and 65536. Defaults to 16385.
Value must be between 64 and 65536.
The default, which applies to all memory types, is selected at build time based on the device-memory (HMEM) backend
that OPX was configured with:

- 4096 — HMEM builds with CUDA support
- 8192 — HMEM builds with AMD ROCR support
- 16385 — host-only builds (one byte above the multi-packet eager maximum),
which effectively disables rendezvous for any payload that fits in
multi-packet eager.

*FI_OPX_MP_EAGER_DISABLE*
: Boolean (1/0, on/off, true/false, yes/no). Disables multi-packet eager. Defaults to 0.
Expand Down
17 changes: 12 additions & 5 deletions prov/opx/include/rdma/opx/fi_opx_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ void fi_opx_handle_recv_rts_hfisvc(const union opx_hfi1_packet_hdr *const hdr,
uint64_t xfer_len = hdr->rzv_rts.message_length;

#ifdef OPX_TRACER_ENABLED
const uint64_t trace_slid_len = OPX_TRACE_PACK_SLID_LEN(lid, xfer_len);
const uint64_t trace_slid_len = OPX_TRACE_PACK_SLID_LEN(OPX_TRACE_GET_SLID(hdr, hfi1_type), xfer_len);
OPX_TRACE_RX_BEGIN(OPX_TRACE_EVENT_RX_RZV_RTS_HFISVC, trace_slid_len, origin_tag);
#endif

Expand All @@ -1132,7 +1132,15 @@ void fi_opx_handle_recv_rts_hfisvc(const union opx_hfi1_packet_hdr *const hdr,
uint64_t do_dmabuf = (uint64_t) (context->flags & FI_OPX_CQ_CONTEXT_DMABUF_HMEM);
if (do_dmabuf) {
opx_mr = ((struct fi_opx_hmem_info *) context->hmem_info_qws)->dmabuf.opx_mr;
if (opx_mr->hfisvc.state != OPX_MR_HFISVC_STATE_OPENED) {
if (opx_mr->hfisvc.state != OPX_MR_HFISVC_STATE_OPENED && OFI_LIKELY(xfer_len <= recv_len)) {
context->byte_counter = niov;
context->len = xfer_len;
context->data = ofi_data;
context->tag = origin_tag;
context->next = NULL;
context->flags |= FI_RECV | FI_OPX_HFI_BTH_OPCODE_GET_CQ_FLAG(opcode) |
FI_OPX_HFI_BTH_OPCODE_GET_MSG_FLAG(opcode);

int deferred_rc = opx_hfisvc_deferred_recv_rts_enqueue(opx_ep, context, niov, recv_buf,
&payload->rendezvous.hfisvc.iovs[0]);
if (OFI_UNLIKELY(deferred_rc)) {
Expand Down Expand Up @@ -4441,13 +4449,12 @@ static inline ssize_t fi_opx_ep_tx_send_internal(struct fid_ep *ep, const void *
return -FI_EAGAIN;
}

/* If hmem_iface != FI_HMEM_SYSTEM, we skip MP EGR because RZV yields better performance for devices */
if (total_len <= opx_tx->mp_eager_max_payload_bytes && is_contiguous &&
!fi_opx_hfi1_tx_is_shm(opx_ep, addr, caps) && (caps & FI_TAGGED) && hmem_iface == FI_HMEM_SYSTEM) {
!fi_opx_hfi1_tx_is_shm(opx_ep, addr, caps) && (caps & FI_TAGGED)) {
assert(total_len >= opx_tx->mp_eager_min_payload_bytes);
rc = opx_hfi1_tx_send_try_mp_egr(ep, buf, len, addr, tag, context, data, lock_required,
override_flags, tx_op_flags, caps, reliability,
do_cq_completion, FI_HMEM_SYSTEM, 0ul, OPX_HMEM_NO_HANDLE,
do_cq_completion, hmem_iface, hmem_device, hmem_handle,
hfi1_type, ctx_sharing);
if (OFI_LIKELY(rc == FI_SUCCESS)) {
OPX_TRACE_TX_END_SUCCESS(OPX_TRACE_EVENT_TX_SEND, 0, 0);
Expand Down
4 changes: 2 additions & 2 deletions prov/opx/include/rdma/opx/opx_hmem_domain.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024-2025 Cornelis Networks.
* Copyright (C) 2024-2026 Cornelis Networks.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -41,7 +41,7 @@

#define OPX_HMEM_NO_LOCK_ON_CLEANUP (0)

#define OPX_HMEM_DEV_REG_SEND_THRESHOLD_DEFAULT (4096)
#define OPX_HMEM_DEV_REG_SEND_THRESHOLD_DEFAULT (512)
#define OPX_HMEM_DEV_REG_RECV_THRESHOLD_DEFAULT (OPX_HFI1_PKT_SIZE)
#define OPX_HMEM_DEV_REG_THRESHOLD_MAX (OPX_HFI1_PKT_SIZE)
#define OPX_HMEM_DEV_REG_THRESHOLD_MIN (0)
Expand Down
43 changes: 42 additions & 1 deletion prov/opx/src/fi_opx_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -3845,14 +3845,55 @@ fi_opx_ep_rx_process_context_noinline(struct fi_opx_ep *opx_ep, const uint64_t s
struct fi_context *op_context = (struct fi_context *) context->err_entry.op_context;
struct fi_opx_hfi1_ue_packet *claimed_pkt = op_context->internal[0];

const unsigned is_shm = claimed_pkt->is_shm;
const unsigned is_shm = claimed_pkt->is_shm;
uint8_t is_mp_eager = (FI_OPX_HFI_BTH_OPCODE_BASE_OPCODE(claimed_pkt->hdr.bth.opcode) ==
FI_OPX_HFI_BTH_OPCODE_MSG_MP_EAGER_FIRST);

opx_ep_complete_receive_operation(
ep, &claimed_pkt->hdr, (union fi_opx_hfi1_packet_payload *) &claimed_pkt->payload,
claimed_pkt->hdr.match.ofi_tag, context, claimed_pkt->hdr.bth.opcode, OPX_MULTI_RECV_FALSE,
is_shm, rx_op_flags & (FI_OPX_CQ_CONTEXT_HMEM | FI_OPX_CQ_CONTEXT_DMABUF_HMEM), lock_required,
reliability, hfi1_type);

if (is_mp_eager) {
/*
* A claimed multi-packet eager message starts from an unexpected FIRST
* packet, just like fi_opx_ep_process_context_match_ue_packets(). The
* FIRST packet initializes context->byte_counter, but the remaining NTH
* packets are matched through mp_egr_queue. If we skip that bookkeeping
* here, FI_CLAIM receives can wait forever after MPI_Mprobe claims the
* FIRST packet of a multi-packet eager message.
*/
opx_lid_t slid;
if (hfi1_type & (OPX_HFI1_WFR | OPX_HFI1_MIXED_9B)) {
slid = (opx_lid_t) __be16_to_cpu24((__be16) claimed_pkt->hdr.lrh_9B.slid);
} else {
slid = (opx_lid_t) __le24_to_cpu((__le24) (claimed_pkt->hdr.lrh_16B.slid20 << 20) |
(claimed_pkt->hdr.lrh_16B.slid));
}

const uint64_t mp_egr_id =
OPX_GET_MP_EGR_ID(claimed_pkt->hdr.reliability.psn, slid, claimed_pkt->subctxt_rx);

fi_opx_ep_rx_process_pending_mp_eager_ue(ep, context, mp_egr_id, is_shm, lock_required,
reliability, hfi1_type);

if (context->byte_counter) {
context->mp_egr_id = mp_egr_id;
slist_insert_tail((struct slist_entry *) context, &opx_ep->rx->mp_egr_queue.mq);
} else {
FI_OPX_DEBUG_COUNTERS_INC(
opx_ep->debug_counters.mp_eager.recv_completed_process_context);

context->next = NULL;
if (OFI_UNLIKELY(context->err_entry.err == FI_ETRUNC)) {
slist_insert_tail((struct slist_entry *) context, opx_ep->rx->cq_err_ptr);
} else {
fi_opx_enqueue_completed(opx_ep->rx->cq_completed_ptr, context, lock_required);
}
}
}

/* ... and prepend the claimed uepkt to the ue free list.
claimed_pkt->next should have been set to NULL at the time we
stored it in context->claim */
Expand Down
18 changes: 18 additions & 0 deletions prov/opx/src/fi_opx_mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,24 @@ static inline int fi_opx_mr_reg_internal(struct fid *fid, const struct iovec *io
return -errno;
}

#if HAVE_HFISVC
/* HFISVC pins the memory region eagerly at registration time via the
* kernel hfi1_mem_region_pin path. A NULL base address cannot be pinned
* (the kernel returns EFAULT), and the HFISVC cmd_mr_open API truncates
* its length argument to 32 bits, so any length exceeding UINT32_MAX
* would silently register only the low 4 GiB of the requested range.
* Reject these cases up front with a libfabric-level error rather than
* letting them propagate to a kernel-side failure or a silent
* truncation. TODO: document this limit along with HFISVC */
if (opx_domain->use_hfisvc && (!iov->iov_base || iov->iov_len > UINT32_MAX)) {
FI_WARN(fi_opx_global.prov, FI_LOG_MR,
"HFISVC cannot register MR with iov_base=%p iov_len=%zu (NULL base or len > UINT32_MAX is unsupported by HFISVC pinning)\n",
iov->iov_base, iov->iov_len);
errno = FI_EOPNOTSUPP;
return -errno;
}
#endif

struct fi_opx_mr *opx_mr;
__attribute__((__unused__)) uint64_t hmem_device = 0UL;
__attribute__((__unused__)) enum fi_hmem_iface hmem_iface = FI_HMEM_SYSTEM;
Expand Down
36 changes: 22 additions & 14 deletions prov/opx/src/fi_opx_tid_cache.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (c) 2017-2020 Amazon.com, Inc. or its affiliates. All rights reserved.
* Copyright (C) 2022-2024 Cornelis Networks.
* Copyright (C) 2022-2026 Cornelis Networks.
*
* Copyright (c) 2016-2017 Cray Inc. All rights reserved.
* Copyright (c) 2017-2019 Intel Corporation, Inc. All rights reserved.
Expand Down Expand Up @@ -633,7 +633,11 @@ int opx_tid_cache_crte(struct ofi_mr_cache *cache, const struct ofi_mr_info *inf
}

OPX_DEBUG_ENTRY(info);
/* drop the mm lock across alloc/register */
/* drop the mm lock across alloc/register and rbnode pre-allocation
* to avoid deadlock: malloc() can trigger madvise() which OPAL
* intercepts, calling back into ofi_import_monitor_notify() which
* needs mm_lock.
*/
pthread_mutex_unlock(&mm_lock);
struct opx_mr_tid_info *tid_info;
int ret = opx_mr_entry_alloc_init(cache, info, opx_ep, entry, &tid_info);
Expand All @@ -648,18 +652,22 @@ int opx_tid_cache_crte(struct ofi_mr_cache *cache, const struct ofi_mr_info *inf
ret = opx_register_tid_region_retryable(cache, (uint64_t) info->iov.iov_base,
MIN(info->iov.iov_len, register_max_len), info->iface, info->device,
opx_ep, tid_info);

/* re-acquire mm_lock */
pthread_mutex_lock(&mm_lock);

if (ret) {
/* Failed, tid_info->ninfo will be zero */
FI_DBG(fi_opx_global.prov, FI_LOG_MR,
"opx_register_tid_region failed with return code %d (%s), FREE node %p\n", ret, strerror(ret),
(*entry)->node);
FI_DBG(fi_opx_global.prov, FI_LOG_MR, "opx_register_tid_region failed with return code %d (%s)\n", ret,
strerror(ret));
goto error;
}

struct ofi_rbnode *rbnode = ofi_rbnode_alloc(&cache->tree);
if (OFI_UNLIKELY(!rbnode)) {
FI_DBG(fi_opx_global.prov, FI_LOG_MR, "Failed to alloc rbnode, return -FI_ENOMEM\n");
goto error;
}

/* re-acquire mm_lock */
pthread_mutex_lock(&mm_lock);

FI_DBG(fi_opx_global.prov, FI_LOG_MR,
"NEW vaddr [%#lx - %#lx] length %lu, tid vaddr [%#lx - %#lx] , tid length %lu\n",
(uint64_t) info->iov.iov_base, (uint64_t) info->iov.iov_base + (uint64_t) info->iov.iov_len,
Expand All @@ -669,11 +677,12 @@ int opx_tid_cache_crte(struct ofi_mr_cache *cache, const struct ofi_mr_info *inf
(*entry)->info.iov.iov_base = (void *) tid_info->tid_vaddr;
(*entry)->info.iov.iov_len = tid_info->tid_length;

ret = ofi_rbmap_insert(&cache->tree, (void *) &(*entry)->info, (void *) *entry, &(*entry)->node);
ret = ofi_rbmap_insert_at(&cache->tree, (void *) &(*entry)->info, (void *) *entry, &(*entry)->node, rbnode);

if (OFI_UNLIKELY(ret)) {
FI_DBG(fi_opx_global.prov, FI_LOG_MR, "ofi_rbmap_insert returned %d (%s) %p\n", ret, strerror(ret),
(*entry)->node);
ofi_rbnode_free(&cache->tree, rbnode);
goto error;
}
cache->cached_cnt++;
Expand All @@ -682,9 +691,8 @@ int opx_tid_cache_crte(struct ofi_mr_cache *cache, const struct ofi_mr_info *inf
ret = ofi_monitor_subscribe(monitor, info->iov.iov_base, info->iov.iov_len, &(*entry)->hmem_info);
if (OFI_UNLIKELY(ret)) {
opx_mr_uncache_entry_storage(cache, *entry);
cache->uncached_cnt++;
cache->uncached_size += (*entry)->info.iov.iov_len;
FI_WARN(fi_opx_global.prov, FI_LOG_MR, "MONITOR SUBSCRIBE FAILED UNCACHED ERROR\n");
FI_WARN(fi_opx_global.prov, FI_LOG_MR, "MONITOR SUBSCRIBE FAILED UNCACHED ERROR %d (%s)\n", ret,
strerror(ret));
goto error;
}
OPX_DEBUG_EXIT((*entry), OPX_TID_CACHE_ENTRY_FOUND);
Expand All @@ -696,7 +704,7 @@ int opx_tid_cache_crte(struct ofi_mr_cache *cache, const struct ofi_mr_info *inf
(char *) info->iov.iov_base + info->iov.iov_len, info->iov.iov_len, info->iov.iov_len);
tid_info->npairs = 0; /* error == no tid pairs */
OPX_DEBUG_EXIT((*entry), OPX_TID_CACHE_ENTRY_NOT_FOUND);
return ret; // TODO - handle case for free
return ret; // entry freed by caller
}

__OPX_FORCE_INLINE__
Expand Down
31 changes: 27 additions & 4 deletions prov/util/src/util_mr_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
* Copyright (c) 2020 Cisco Systems, Inc. All rights reserved.
* (C) Copyright 2020 Hewlett Packard Enterprise Development LP
* Copyright (C) 2026 Cornelis Networks.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -268,6 +269,12 @@ void ofi_mr_cache_delete(struct ofi_mr_cache *cache, struct ofi_mr_entry *entry)
* new entry, then check under lock that a conflict with another thread
* hasn't occurred. If a conflict occurred, we return -EAGAIN and
* restart the entire operation.
*
* The rbnode for the tree insertion is also pre-allocated outside the
* lock for the same reason: ofi_rbmap_insert() may call malloc() which
* can trigger madvise() via the allocator, and an intercepting memory
* monitor (e.g. OPAL) would call back into ofi_import_monitor_notify()
* which needs mm_lock -- deadlocking the thread.
*/
static int
util_mr_cache_create(struct ofi_mr_cache *cache, struct ofi_mr_info *info,
Expand Down Expand Up @@ -300,6 +307,12 @@ util_mr_cache_create(struct ofi_mr_cache *cache, struct ofi_mr_info *info,
assert(ofi_iov_within(&(*info).iov, &(*entry)->info.iov));
*info = (*entry)->info;

struct ofi_rbnode *rbnode = ofi_rbnode_alloc(&cache->tree);
if (OFI_UNLIKELY(!rbnode)) {
ret = -FI_ENOMEM;
goto free;
}

pthread_mutex_lock(&mm_lock);
cur = ofi_mr_rbt_find(&cache->tree, info);
if (cur) {
Expand All @@ -311,11 +324,14 @@ util_mr_cache_create(struct ofi_mr_cache *cache, struct ofi_mr_info *info,
cache->uncached_cnt++;
cache->uncached_size += info->iov.iov_len;
} else {
if (ofi_rbmap_insert(&cache->tree, (void *) &(*entry)->info,
(void *) *entry, &(*entry)->node)) {
ret = -FI_ENOMEM;
ret = ofi_rbmap_insert_at(&cache->tree,
(void *) &(*entry)->info,
(void *) *entry, &(*entry)->node,
rbnode);
if (ret)
goto unlock;
}
rbnode = NULL; /* now owned by the tree */

cache->cached_cnt++;
cache->cached_size += info->iov.iov_len;

Expand All @@ -328,10 +344,17 @@ util_mr_cache_create(struct ofi_mr_cache *cache, struct ofi_mr_info *info,
cache->uncached_size += (*entry)->info.iov.iov_len;
}
}
/* ofi_rbnode_free() only returns the node to the tree free list (never
* allocates), so it is safe to call while holding mm_lock.
*/
if (rbnode)
ofi_rbnode_free(&cache->tree, rbnode);
pthread_mutex_unlock(&mm_lock);
return 0;

unlock:
if (rbnode)
ofi_rbnode_free(&cache->tree, rbnode);
pthread_mutex_unlock(&mm_lock);
free:
util_mr_free_entry(cache, *entry);
Expand Down
Loading
Loading