Skip to content

FlagCX P2P Engine optimization#478

Open
mikethegoblin wants to merge 11 commits into
flagos-ai:mainfrom
mikethegoblin:p2p_optimization
Open

FlagCX P2P Engine optimization#478
mikethegoblin wants to merge 11 commits into
flagos-ai:mainfrom
mikethegoblin:p2p_optimization

Conversation

@mikethegoblin
Copy link
Copy Markdown
Collaborator

PR Category

UIL & PAL

PR Types

New Features

PR Description

@MC952-arch
Copy link
Copy Markdown
Collaborator

Please rebase the code

…be independently adjusted using environment variables.
@leoda1 leoda1 force-pushed the p2p_optimization branch 2 times, most recently from 3ca2228 to 0d4e277 Compare May 29, 2026 03:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Reworks the FlagCX P2P engine for higher RDMA throughput: replaces the per-engine async worker with a shared per-IB-device FlagcxWorkerPool that owns a shared CQ, sharded slice queues, multiple QPs per connection, and centralized notification polling. The IBRC P2P adaptor is reworked from single-QP/per-comm-CQ to negotiated multi-QP using the pool's shared CQ, and exposes new optional testBatch / igetBatch net-adaptor hooks. Adds a configurable FlagcxP2pGlobalConfig populated from many new FLAGCX_P2P_* environment variables.

Changes:

  • Introduce FlagcxWorkerPool (shared CQ, sharded slice queues, worker threads, notification thread owner) and route vectored READ/WRITE through it via FlagcxSlices.
  • Negotiate numQps per connection (min(local, remote), max kFlagcxP2pMaxQpsPerEngine = 8); replace per-comm CQ with the pool's shared CQ; redesign request as flagcxP2pSliceReq with a FlagcxTransferTask completion counter.
  • Add FlagcxP2pGlobalConfig + ~18 new env-var knobs; extend flagcxNetAdaptor_latest with optional testBatch and igetBatch; drop iputSignal support and the per-comm scratchpad MR.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 11 comments.

File Description
flagcx/include/flagcx_p2p.h Adds FlagcxSlice/FlagcxTransferTask, kFlagcxP2pMaxQpsPerEngine, and global config struct/accessors.
flagcx/core/flagcx_p2p.cc Implements global config + worker pool; rewrites vectored READ/WRITE to build slices and submit via the pool; adds MR-id→base-addr map; moves notification thread into the pool.
flagcx/adaptor/net/ibrc_p2p_adaptor.cc Switches P2P adaptor to multi-QP with shared CQ, slice-based posting, flagcxP2pSliceBatch; replaces per-request CQ poll with task completion; adds testBatch/igetBatch; removes iputSignal and scratchpad MR.
flagcx/adaptor/include/flagcx_net_adaptor.h Extends flagcxNetAdaptor_latest with optional testBatch and igetBatch function pointers.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread flagcx/core/flagcx_p2p.cc
Comment on lines +2031 to 2092
if (conn == NULL || numIovs <= 0 || transferId == NULL) {
fprintf(stderr,
"[FlagCX P2P] ReadVector early exit: invalid args (conn=%p, "
"numIovs=%d, transferId=%p)\n",
conn, numIovs, (void *)transferId);
return -1;
}

if (dstVec.size() < static_cast<size_t>(numIovs) ||
sizeVec.size() < static_cast<size_t>(numIovs) ||
descs.size() < static_cast<size_t>(numIovs)) {
fprintf(stderr,
"[FlagCX P2P] ReadVector early exit: vector length mismatch "
"(numIovs=%d)\n",
numIovs);
return -1;
}

if (conn->isLocal && (conn->sameProcess || !ipcBufs.empty())) {
return startLocalTransfer(conn, dstVec, sizeVec, descs, numIovs, transferId,
ipcBufs, false);
fprintf(stderr,
"[FlagCX P2P] ReadVector taking local transfer path: numIovs=%d\n",
numIovs);
int rc = startLocalTransfer(conn, dstVec, sizeVec, descs, numIovs,
transferId, ipcBufs, false);
fprintf(stderr, "[FlagCX P2P] ReadVector local transfer returned: rc=%d\n",
rc);
return rc;
}

if (mrIds.size() < static_cast<size_t>(numIovs)) {
fprintf(stderr,
"[FlagCX P2P] ReadVector early exit: mrIds length mismatch "
"(numIovs=%d)\n",
numIovs);
return -1;
}

std::vector<FlagcxP2pMemRegEntry> localEntries(numIovs);
{
std::lock_guard<std::mutex> memLock(gMemMutex);
for (int i = 0; i < numIovs; i++) {
if (!findMemReg((uintptr_t)dstVec[i], &localEntries[i]))
FlagcxP2pMemRegEntry *entry = findMemRegByMr(mrIds[i]);
if (entry == NULL) {
fprintf(
stderr,
"[FlagCX P2P] ReadVector memReg lookup failed: iov=%d, mr=%lu\n", i,
(unsigned long)mrIds[i]);
return -1;
}

if (!memRegContains(*entry, reinterpret_cast<uintptr_t>(dstVec[i]),
sizeVec[i])) {
fprintf(stderr,
"[FlagCX P2P] ReadVector memReg bounds check failed: iov=%d, "
"mr=%lu, addr=%p, size=%zu\n",
i, (unsigned long)mrIds[i], dstVec[i], sizeVec[i]);
return -1;
}

localEntries[i] = *entry;
}
}
Comment thread flagcx/core/flagcx_p2p.cc
Comment on lines +730 to +734
flagcxResult_t rc =
flagcxP2pSliceBatch(sc, chosen->qp, (int)take, chunk.data());
if (rc != flagcxSuccess)
processed_.fetch_add(take, std::memory_order_release);
i += take;
struct flagcxP2pSendComm *comm;
FLAGCXCHECK(flagcxCalloc(&comm, 1));
int ready = 0;
auto connectStart = std::chrono::steady_clock::time_point();
Comment on lines +139 to +149
// Optional batch completion test — polls CQ once for multiple requests.
// If NULL, caller falls back to per-request test().
flagcxResult_t (*testBatch)(void **requests, int nRequests, int *doneFlags,
int *doneCount);
// Optional one-side batch READ. Success returns one logical request for the
// full batch.
flagcxResult_t (*igetBatch)(void *sendComm, int count,
const uint64_t *srcOffs, const uint64_t *dstOffs,
const size_t *sizes, int srcRank, int dstRank,
void *const *srcHandles, void *const *dstHandles,
void **request);
Comment on lines 814 to 854
static flagcxResult_t flagcxP2pTest(void *request, int *done, int *sizes) {
*done = 0;
struct flagcxP2pRequest *req = (struct flagcxP2pRequest *)request;
if (req == NULL || req->type == FLAGCX_P2P_REQ_UNUSED) {
if (sizes)
*sizes = 0;
if (request == NULL) {
*done = 1;
return flagcxSuccess;
}

int nCqe = 0;
struct ibv_wc wc;
FLAGCXCHECK(flagcxWrapIbvPollCq(req->cq, 1, &wc, &nCqe));

if (nCqe == 0)
return flagcxSuccess;

if (wc.status != IBV_WC_SUCCESS) {
WARN("NET/IB_P2P : CQ error: status=%d opcode=%d wr_id=%lu", wc.status,
wc.opcode, wc.wr_id);
return flagcxRemoteError;
}

// Map CQE back to the correct request via wr_id
uint32_t reqIdx = wc.wr_id;
if (reqIdx >= FLAGCX_P2P_MAX_REQUESTS) {
WARN("NET/IB_P2P : invalid wr_id %u in CQE", reqIdx);
return flagcxInternalError;
}
struct flagcxP2pRequest *completedReq = &req->reqs[reqIdx];

completedReq->events--;
if (completedReq->events == 0) {
completedReq->type = FLAGCX_P2P_REQ_UNUSED;
auto *req = static_cast<struct flagcxP2pSliceReq *>(request);
if (req->task.isAllDone()) {
*done = 1;
if (sizes) {
uint64_t total = 0;
for (auto *s : req->task.sliceList)
total += s->length;
*sizes = (int)std::min<uint64_t>(total, (uint64_t)INT32_MAX);
}
flagcxP2pFreeSliceReq(req);
}
return flagcxSuccess;
}

// Check if the originally requested op is done
if (req->type == FLAGCX_P2P_REQ_UNUSED) {
*done = 1;
if (sizes)
*sizes = 0;
static flagcxResult_t flagcxP2pTestBatch(void **requests, int nRequests,
int *doneFlags, int *doneCount) {
int completed = 0;
for (int i = 0; i < nRequests; i++) {
doneFlags[i] = 0;
auto *req = static_cast<struct flagcxP2pSliceReq *>(requests[i]);
if (req == NULL) {
doneFlags[i] = 1;
completed++;
continue;
}
if (req->task.isAllDone()) {
doneFlags[i] = 1;
completed++;
flagcxP2pFreeSliceReq(req);
}
}
*doneCount = completed;
return flagcxSuccess;
Comment thread flagcx/core/flagcx_p2p.cc
Comment on lines +307 to +335
template <typename Policy>
inline void flagcxBuildSlices(FlagcxTransferTask *task, uint64_t srcVa,
uint64_t dstVa, size_t totalLen, uint32_t lkey,
uint32_t rkey, uint8_t opcode,
const std::string &peerNicPath) {
if (!Policy::kFurtherCut) {
auto *s = new FlagcxSlice{srcVa, dstVa, (uint32_t)totalLen,
lkey, rkey, opcode,
peerNicPath, task, nullptr};
task->sliceList.push_back(s);
task->sliceCount.fetch_add(1, std::memory_order_release);
return;
}

static constexpr int kWindowSize = 64;
static constexpr int kBatchPollCqe = 32;
size_t off = 0;
while (off < totalLen) {
bool merge =
(totalLen - off) <= Policy::kBlockSize + Policy::kFragmentSize;
size_t len = merge ? (totalLen - off) : Policy::kBlockSize;
auto *s = new FlagcxSlice{srcVa + off, dstVa + off, (uint32_t)len,
lkey, rkey, opcode,
peerNicPath, task, nullptr};
task->sliceList.push_back(s);
task->sliceCount.fetch_add(1, std::memory_order_release);
off += len;
if (merge)
break;
}
}
Comment thread flagcx/core/flagcx_p2p.cc
Comment on lines +471 to +484
FlagcxWorkerPool::~FlagcxWorkerPool() {
running_.store(false, std::memory_order_release);
cv_.notify_all();
for (auto &t : transferThreads_) {
if (t.joinable())
t.join();
}
// notifThread_ is joined explicitly via stopNotif() in EngineDestroy;
// by the time ~pool runs at process exit it should already be joined.
if (notifThread_.joinable()) {
notifThread_.join();
}
// CQ destruction skipped: pool is process-lived; OS reclaims.
}
Comment thread flagcx/core/flagcx_p2p.cc
Comment on lines +2098 to 2117
if (!buildAndSubmitToPool(task.get(), dstVec, sizeVec, descs, localEntries,
numIovs, conn->sendComm, connIbDevN,
FLAGCX_SLICE_OP_READ)) {
// sentinel so isAllDone() converges (needs total>0)
auto *sentinel = new FlagcxSlice{0, 0, 0, 0, 0, FLAGCX_SLICE_OP_READ,
std::string(), &task->fx, nullptr};
task->fx.sliceList.push_back(sentinel);
task->fx.sliceCount.fetch_add(1, std::memory_order_release);
sentinel->markFailed();
task->postOk.store(false, std::memory_order_release);
}
pthread_cond_signal(&gAsyncWorker.cv);

uint64_t xferId;
{
std::lock_guard<std::mutex> lock(gPoolXferMutex);
xferId = gNextXferId++;
gPoolXferMap[xferId] = task;
}
*transferId = xferId;
return 0;
Comment on lines +115 to +137
struct FlagcxSlice {
// WRITE: local source VA; READ: local destination VA.
uint64_t srcVa = 0;
// WRITE: remote destination VA; READ: remote source VA.
uint64_t dstVa;
uint32_t length;
uint32_t lkey;
uint32_t rkey;
uint8_t opcode;
std::string peerNicPath;
FlagcxTransferTask *task;
volatile int *qpDepth;

inline void markSuccess() {
if (task)
task->doneSliceCount.fetch_add(1, std::memory_order_release);
}

inline void markFailed() {
if (task)
task->doneSliceCount.fetch_add(1, std::memory_order_release);
}
};
Comment thread flagcx/core/flagcx_p2p.cc
Comment on lines +53 to +70
FLAGCX_PARAM(P2pQpsPerConn, "P2P_QPS_PER_CONN", 4);
FLAGCX_PARAM(P2pWorkersPerPool, "P2P_WORKERS_PER_POOL", 4);
FLAGCX_PARAM(P2pShardCount, "P2P_SHARD_COUNT", 8);
FLAGCX_PARAM(P2pCqDepth, "P2P_CQ_DEPTH", 4096);
FLAGCX_PARAM(P2pMaxWrPerPost, "P2P_MAX_WR_PER_POST", 64);
FLAGCX_PARAM(P2pMaxRequests, "P2P_MAX_REQUESTS", 256);
FLAGCX_PARAM(P2pBatchPollSize, "P2P_BATCH_POLL_SIZE", 64);
FLAGCX_PARAM(P2pSliceSize, "P2P_SLICE_SIZE", 65536);
FLAGCX_PARAM(P2pFragmentLimit, "P2P_FRAGMENT_LIMIT", 4096);
FLAGCX_PARAM(P2pMaxSge, "P2P_MAX_SGE", 4);
FLAGCX_PARAM(P2pMaxInline, "P2P_MAX_INLINE", 64);
FLAGCX_PARAM(P2pIbPort, "P2P_IB_PORT", 1);
FLAGCX_PARAM(P2pGidIndex, "P2P_GID_INDEX", -1);
FLAGCX_PARAM(P2pMtu, "P2P_MTU", 4096);
FLAGCX_PARAM(P2pIbTc, "P2P_IB_TC", -1);
FLAGCX_PARAM(P2pRetryCnt, "P2P_RETRY_CNT", 7);
FLAGCX_PARAM(P2pNotifMaxPeers, "P2P_NOTIF_MAX_PEERS", 64);
FLAGCX_PARAM(P2pDestDevAffinity, "P2P_DEST_DEV_AFFINITY", 0);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants