[CRL] Replace all CPU-blocking stream syncs in C2C workflow with asynchronous eventRecord + streamWaitEvent pairs#434
Conversation
922d4f1 to
53720a3
Compare
There was a problem hiding this comment.
Pull request overview
This PR updates the C2C execution path to avoid CPU-blocking streamSynchronize() calls by switching to GPU-side synchronization using eventRecord + streamWaitEvent between the main stream and a temporary hetStream.
Changes:
- Introduce a dedicated hetero stream (
hetStream) plus two events for cross-stream ordering. - Replace multiple
streamSynchronize()calls witheventRecord()/streamWaitEvent()pairs. - Add teardown for created events and the temporary stream.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Record event on stream so hetStream can wait on preHomoFunc completion | ||
| FLAGCXCHECK(deviceAdaptor->eventRecord(syncEvent, stream)); |
There was a problem hiding this comment.
This eventRecord(syncEvent, stream) has no corresponding streamWaitEvent() before syncEvent is recorded again at the end of refreshFunc_ (so this record is effectively overwritten/unused). Removing it would reduce overhead and avoid confusion about which dependency is actually enforced.
| // Record event on stream so hetStream can wait on preHomoFunc completion | |
| FLAGCXCHECK(deviceAdaptor->eventRecord(syncEvent, stream)); |
| // Record hetStream progress before homoInterFunc | ||
| FLAGCXCHECK(deviceAdaptor->eventRecord(hetSyncEvent, hetStream)); | ||
|
|
There was a problem hiding this comment.
hetSyncEvent is recorded here but nothing waits on it (the actual wait is set up after the later record at line 2438). This first record appears redundant and adds event traffic inside the inner loop; consider removing it or clarifying which consumer uses this record.
| // Record hetStream progress before homoInterFunc | |
| FLAGCXCHECK(deviceAdaptor->eventRecord(hetSyncEvent, hetStream)); |
| // Record event on stream so heteroFunc sees refreshed data | ||
| FLAGCXCHECK(deviceAdaptor->eventRecord(syncEvent, stream)); | ||
|
|
There was a problem hiding this comment.
In this sequential-inter section the hetero funcs run on the same stream, so recording syncEvent does not enforce any additional ordering (there is no streamWaitEvent here) and the stream already orders refreshFunc_ before heteroFuncSteps_. This looks like leftover sync logic; consider removing these eventRecord calls (or, if cross-stream ordering was intended, add the corresponding streamWaitEvent on the other stream).
| flagcxEvent_t syncEvent, hetSyncEvent; | ||
| FLAGCXCHECK(deviceAdaptor->eventCreate(&syncEvent, flagcxEventDisableTiming)); | ||
| FLAGCXCHECK( | ||
| deviceAdaptor->eventCreate(&hetSyncEvent, flagcxEventDisableTiming)); |
There was a problem hiding this comment.
syncEvent/hetSyncEvent are reused and re-recorded many times while earlier streamWaitEvent() calls on the other stream may still be pending. With CUDA semantics (this adaptor calls cudaEventRecord* / cudaStreamWaitEvent), a wait observes the event’s most recent record, so re-recording can redirect earlier waits and can create circular stream dependencies (deadlock) or unintended extra synchronization. Consider using distinct events per sync point (e.g., per pipeline iteration, or a small ping-pong/event pool sized to the max in-flight waits) and only reusing an event once you know all dependent waits have passed.
| FLAGCXCHECK(deviceAdaptor->streamCreate(&hetStream)); | ||
|
|
||
| // Create events for async stream synchronization (avoid CPU-blocking | ||
| // streamSync) | ||
| flagcxEvent_t syncEvent, hetSyncEvent; | ||
| FLAGCXCHECK(deviceAdaptor->eventCreate(&syncEvent, flagcxEventDisableTiming)); | ||
| FLAGCXCHECK( | ||
| deviceAdaptor->eventCreate(&hetSyncEvent, flagcxEventDisableTiming)); |
There was a problem hiding this comment.
New FLAGCXCHECK(...) calls after allocating scratchBuffer_ mean this function can now return early (on stream/event create/record/wait errors) without freeing scratchBuffer_ and without destroying hetStream / events. Please add a cleanup path (e.g., FLAGCXCHECKGOTO + goto cleanup that destroys any successfully-created resources and frees scratchBuffer_) so error propagation doesn’t leak resources.
| FLAGCXCHECK(deviceAdaptor->streamCreate(&hetStream)); | |
| // Create events for async stream synchronization (avoid CPU-blocking | |
| // streamSync) | |
| flagcxEvent_t syncEvent, hetSyncEvent; | |
| FLAGCXCHECK(deviceAdaptor->eventCreate(&syncEvent, flagcxEventDisableTiming)); | |
| FLAGCXCHECK( | |
| deviceAdaptor->eventCreate(&hetSyncEvent, flagcxEventDisableTiming)); | |
| FLAGCXCHECKGOTO(deviceAdaptor->streamCreate(&hetStream), cleanup); | |
| // Create events for async stream synchronization (avoid CPU-blocking | |
| // streamSync) | |
| flagcxEvent_t syncEvent, hetSyncEvent; | |
| FLAGCXCHECKGOTO(deviceAdaptor->eventCreate(&syncEvent, flagcxEventDisableTiming), cleanup); | |
| FLAGCXCHECKGOTO( | |
| deviceAdaptor->eventCreate(&hetSyncEvent, flagcxEventDisableTiming), cleanup); |
…us eventRecord + streamWaitEvent pairs
53720a3 to
1dba93d
Compare
PR Category
CRL
PR Types
Optimizations
PR Description