Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 62 additions & 30 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Run Container and Execute Tests
name: Perf Tests in Container

on:
push:
Expand All @@ -8,8 +8,12 @@ on:
branches:
- main

env:
MPI_HOME: /usr/local/mpi
PERF_BIN: /__w/FlagCX/FlagCX/test/perf/host_api/build/bin

Comment thread
MC952-arch marked this conversation as resolved.
jobs:
test-in-container:
perf-test:
runs-on: [self-hosted, cx-build]
container:
image: localhost:5000/flagscale:cuda12.8.1-cudnn9.7.1-python3.12-torch2.7.0-time2507111538
Expand All @@ -32,42 +36,70 @@ jobs:
submodules: true
set-safe-directory: true

- name: Set up Python and Install Dependencies
- name: Build FlagCX
run: |
apt update -y
apt-get install -y python3 python3-pip python3-venv git
python3 -m venv venv
. venv/bin/activate
cd /__w/FlagCX/FlagCX
git config --global --add safe.directory /__w/FlagCX/FlagCX
pip install setuptools pre-commit
pre-commit install
make -j$(nproc) USE_NVIDIA=1

- name: Build perf tests
run: |
cd /__w/FlagCX/FlagCX/test/perf
make -j$(nproc) USE_NVIDIA=1

- name: Run Code Format Check with pre-commit
- name: Wait for GPU
shell: bash
run: |
cd /__w/FlagCX/FlagCX
. venv/bin/activate
apt update -y
apt-get install clang-format -y
git fetch --all
if [ -n "$GITHUB_HEAD_REF" ] && [ -n "$GITHUB_BASE_REF" ]; then
from_ref="origin/$GITHUB_HEAD_REF"
to_ref="origin/$GITHUB_BASE_REF"
echo "From reference: $from_ref; To reference: $to_ref"
pre-commit run --from-ref "$from_ref" --to-ref "$to_ref"
fi
continue-on-error: false
source test/script/_gpu_check.sh
wait_for_gpu

- name: "Perf tests (homoRunner)"
run: |
export PATH=$MPI_HOME/bin:$PATH
export LD_LIBRARY_PATH=/__w/FlagCX/FlagCX/build/lib:$LD_LIBRARY_PATH
set -e
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_alltoall -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_alltoallv -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_sendrecv -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_allreduce -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_allgather -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_reducescatter -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_broadcast -b 128M -e 1G -f 2 -r 0 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_gather -b 128M -e 1G -f 2 -r 0 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_scatter -b 128M -e 1G -f 2 -r 0 -p 1
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_reduce -b 128M -e 1G -f 2 -r 0 -p 1

- name: Check the current working directory
- name: "Perf tests (uniRunner)"
run: |
echo "Current directory: $(pwd)"
ls -l ./test/script
export PATH=$MPI_HOME/bin:$PATH
export LD_LIBRARY_PATH=/__w/FlagCX/FlagCX/build/lib:$LD_LIBRARY_PATH
set -e
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_alltoall -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_alltoallv -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_sendrecv -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_allgather -b 128M -e 1G -f 2 -p 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_broadcast -b 128M -e 1G -f 2 -r 0 -p 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_gather -b 128M -e 1G -f 2 -r 0 -p 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_scatter -b 128M -e 1G -f 2 -r 0 -p 1

- name: Ensure script has execute permissions
run: chmod +x /__w/FlagCX/FlagCX/test/script/auto_script.sh
- name: "Registration -R 1 (homoRunner)"
run: |
export PATH=$MPI_HOME/bin:$PATH
export LD_LIBRARY_PATH=/__w/FlagCX/FlagCX/build/lib:$LD_LIBRARY_PATH
mpirun -np 8 --allow-run-as-root $PERF_BIN/perf_allreduce -b 128M -e 1G -f 2 -p 1 -R 1

- name: Run Auto Test Script in Container
- name: "Registration -R 1 (uniRunner P2P)"
run: |
cd /__w/FlagCX/FlagCX
./test/script/auto_script.sh
export PATH=$MPI_HOME/bin:$PATH
export LD_LIBRARY_PATH=/__w/FlagCX/FlagCX/build/lib:$LD_LIBRARY_PATH
set -e
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_VMM_ENABLE=0 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_sendrecv -b 128M -e 1G -f 2 -p 1 -R 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_VMM_ENABLE=0 -x FLAGCX_USE_HETERO_COMM=1 $PERF_BIN/perf_alltoall -b 128M -e 1G -f 2 -p 1 -R 1

- name: "Registration -R 1 (uniRunner NET)"
run: |
export PATH=$MPI_HOME/bin:$PATH
export LD_LIBRARY_PATH=/__w/FlagCX/FlagCX/build/lib:$LD_LIBRARY_PATH
set -e
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_VMM_ENABLE=0 -x FLAGCX_USE_HETERO_COMM=1 -x FLAGCX_P2P_DISABLE=1 $PERF_BIN/perf_sendrecv -b 128M -e 1G -f 2 -p 1 -R 1
mpirun -np 8 --allow-run-as-root -x FLAGCX_MEM_ENABLE=1 -x FLAGCX_VMM_ENABLE=0 -x FLAGCX_USE_HETERO_COMM=1 -x FLAGCX_P2P_DISABLE=1 $PERF_BIN/perf_alltoall -b 128M -e 1G -f 2 -p 1 -R 1
17 changes: 12 additions & 5 deletions flagcx/core/include/reg_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
#include "flagcx.h"
#include "net.h"
#include "register.h"
#include <map>
#include <memory>
#include <unistd.h>
#include <unordered_map>

class flagcxRegPool {
public:
static constexpr uintptr_t GLOBAL_POOL_KEY = 0; // nullptr comm maps here

flagcxRegPool();
~flagcxRegPool();

Expand All @@ -23,18 +26,22 @@ class flagcxRegPool {
struct flagcxProxyConnector *proxyConn);
flagcxResult_t removeRegItemP2pHandles(void *comm, flagcxRegItem *reg);
flagcxResult_t removeAllP2pHandles(void *comm);
flagcxResult_t removeAllNetHandles(void *comm);
flagcxResult_t registerBuffer(void *comm, void *data, size_t length);
flagcxResult_t deregisterBuffer(void *comm, void *handle);
std::map<uintptr_t, std::map<uintptr_t, flagcxRegItem *>> &getGlobalMap();
std::unordered_map<uintptr_t, std::unordered_map<uintptr_t, flagcxRegItem *>>
&getGlobalMap();
flagcxRegItem *getItem(const void *comm, void *data);
void dump();

private:
void mapRegItemPages(uintptr_t commKey, flagcxRegItem *reg);
std::map<uintptr_t, std::map<uintptr_t, flagcxRegItem *>>
std::unordered_map<uintptr_t, std::unordered_map<uintptr_t, flagcxRegItem *>>
regMap; // <commPtr, <pageBasePtr, regItemPtr>>
std::map<uintptr_t, std::list<flagcxRegItem>>
regPool; // <commPtr, regItemList>
std::unordered_map<
uintptr_t, std::unordered_map<uintptr_t, std::unique_ptr<flagcxRegItem>>>
regPool; // <commPtr, <beginAddr, regItem>> (only GLOBAL_POOL_KEY owns
// data)
Comment thread
MC952-arch marked this conversation as resolved.
uintptr_t pageSize;
};

Expand Down
13 changes: 9 additions & 4 deletions flagcx/core/include/register.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

#include "core.h"
#include "device.h"
#include <list>
#include <unordered_map>
#include <vector>
Comment thread
MC952-arch marked this conversation as resolved.

#define FLAGCX_IPC_HANDLE_SIZE 64

Expand All @@ -28,11 +29,13 @@ struct netRegInfo {
struct flagcxRegNetHandle {
void *handle = NULL;
struct flagcxProxyConnector *proxyConn = NULL;
void *ownerComm = NULL; // comm that registered this handle
};

struct flagcxRegP2pHandle {
void *handle = NULL;
struct flagcxProxyConnector *proxyConn = NULL;
void *ownerComm = NULL; // comm that registered this handle
};

struct flagcxIpcImpInfo {
Expand Down Expand Up @@ -60,9 +63,11 @@ struct flagcxRegItem {
uintptr_t beginAddr = 0;
uintptr_t endAddr = 0;
int refCount = 1;
std::list<std::pair<flagcxRegNetHandle, flagcxRegP2pHandle>> handles;
void *homoRegHandle = nullptr; // backend CCL handle (homo path only)
flagcxIpcHandleData ipcHandleData = {}; // IPC handle bytes (both paths)
std::vector<std::pair<flagcxRegNetHandle, flagcxRegP2pHandle>> handles;
flagcxIpcHandleData localIpcHandleData =
{}; // sender's IPC handle bytes (hetero path)
std::unordered_map<uintptr_t, void *>
homoRegHandles; // commKey → backend CCL handle
};

struct flagcxReg {
Expand Down
3 changes: 2 additions & 1 deletion flagcx/core/init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,9 @@ flagcxResult_t flagcxHeteroCommUserRank(const flagcxHeteroComm_t comm,

flagcxResult_t flagcxHeteroCommDestroy(flagcxHeteroComm_t comm) {
FLAGCXCHECK(flagcxHeteroRmaProxyStop(comm));
// Clean up P2P IPC handles while proxy is still alive and peerSocks valid
// Clean up P2P/Net handles while proxy is still alive and peerSocks valid
FLAGCXCHECK(globalRegPool.removeAllP2pHandles(comm));
FLAGCXCHECK(globalRegPool.removeAllNetHandles(comm));
// Stop: send stop + close peerSocks
FLAGCXCHECK(flagcxProxyStop(comm));
// Destroy: join thread, free proxy resources
Expand Down
3 changes: 2 additions & 1 deletion flagcx/core/net.cc
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,8 @@ static flagcxResult_t netRegisterBuffer(flagcxHeteroComm *comm,
peerProxyConn = &peerConn->proxyConn;
for (auto it = regRecord->handles.begin(); it != regRecord->handles.end();
it++) {
if (it->first.proxyConn == peerProxyConn && it->first.handle) {
if (it->first.proxyConn == peerProxyConn && it->first.handle &&
it->first.ownerComm == comm) {
found = true;
outHandle[p] = it->first.handle;
*outRegBufFlag = 1;
Expand Down
11 changes: 6 additions & 5 deletions flagcx/core/p2p.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,11 @@ static flagcxResult_t p2pRegisterBuffer(flagcxHeteroComm *comm,
for (int p = 0; p < nPeers; p++) {
int peerRank = peerRanks[p];

// Check cache: existing info with handleReady for this peer
// Check cache: existing info with handleReady for this peer (this comm
// only)
flagcxIpcRegInfo *existingInfo = NULL;
for (auto &handlePair : regItem->handles) {
if (handlePair.second.handle) {
if (handlePair.second.handle && handlePair.second.ownerComm == comm) {
flagcxIpcRegInfo *info = (flagcxIpcRegInfo *)handlePair.second.handle;
if (info->peerRank == peerRank) {
existingInfo = info;
Expand Down Expand Up @@ -817,9 +818,9 @@ static flagcxResult_t p2pRegisterBuffer(flagcxHeteroComm *comm,
} else if (legacyIpcCap) {
// Different process: get IPC handle for our own buffer
char zeros[sizeof(flagcxIpcHandleData)] = {};
if (memcmp(&regItem->ipcHandleData, zeros,
if (memcmp(&regItem->localIpcHandleData, zeros,
sizeof(flagcxIpcHandleData)) != 0) {
memcpy(&handleData, &regItem->ipcHandleData,
memcpy(&handleData, &regItem->localIpcHandleData,
sizeof(flagcxIpcHandleData));
Comment thread
MC952-arch marked this conversation as resolved.
} else {
flagcxIpcMemHandle_t ipcHandle = NULL;
Expand All @@ -832,7 +833,7 @@ static flagcxResult_t p2pRegisterBuffer(flagcxHeteroComm *comm,
fail);
if (handleSize <= sizeof(flagcxIpcHandleData)) {
memcpy(&handleData, ipcHandle, handleSize);
memcpy(&regItem->ipcHandleData, ipcHandle, handleSize);
memcpy(&regItem->localIpcHandleData, ipcHandle, handleSize);
}
deviceAdaptor->ipcMemHandleFree(ipcHandle);
}
Expand Down
Loading