Skip to content

feat: use async queue to enable the ovelaping of weight loading and RDMA transferring#16

Merged
JD-ETH merged 1 commit into
JD-ETH:jd/rdma-integrationfrom
JensenFire:jsf/async_0118
Jan 22, 2026
Merged

feat: use async queue to enable the ovelaping of weight loading and RDMA transferring#16
JD-ETH merged 1 commit into
JD-ETH:jd/rdma-integrationfrom
JensenFire:jsf/async_0118

Conversation

@JensenFire
Copy link
Copy Markdown
Collaborator

Description

Before this PR, the update_weights in RDMA mode is sth like :

Sequential Execution:
load_weights() -> execute_each() -> load_weights() -> execute_each() -> ...
[===CPU===]     [==WAIT==]      [===CPU===]     [==WAIT==]

. while execute_each() is actually an asynchronous operation. After the load_weights of the model_replica is done, the related updated weights could be transferred, and another round load_weights could be started. That's what this pr does: build a queue to execute the RDMA transferring in a asynchronous way, and reduce the latency of the single _update_bucket_weights_from_remote() from 70+ ms ->16ms , with a 10%-20% e2e time cost saving of updating weights

Before :
image

After:
image

@JensenFire JensenFire requested review from JD-ETH and Risc-lt and removed request for JD-ETH January 18, 2026 11:08
@JensenFire JensenFire closed this Jan 18, 2026
@JensenFire JensenFire reopened this Jan 18, 2026
@JD-ETH
Copy link
Copy Markdown
Owner

JD-ETH commented Jan 18, 2026

I just have one concern about the logic:

for a q_proj, k_proj, v_proj this will call the async task 3 times and there is no guarantee that the right weight will be transferred in order. We need to only submit those parameter once all the shards are updated. Let's combine the two PRs to achieve this.

Otherwise, if this async queue impl is clearly better, we should remove the old execute_each.

What was blocking by the way? is it about the async execute not having it's own async loop?

Copy link
Copy Markdown
Owner

@JD-ETH JD-ETH left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's discuss tomorrow and try to merge them together, otherwise the results will be wrong.

if self.pipelined_transfer:
transfer_bundle.execute_each(updated_name)
# Use executable queue for async transfer operations
transfer_bundle.execute_each(updated_name, self.executable_queue)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's default to execute with the queue if it's clearly better

@JensenFire
Copy link
Copy Markdown
Collaborator Author

JensenFire commented Jan 19, 2026

I just have one concern about the logic:

for a q_proj, k_proj, v_proj this will call the async task 3 times and there is no guarantee that the right weight will be transferred in order. We need to only submit those parameter once all the shards are updated. Let's combine the two PRs to achieve this.

Agree. The updating order does matter. But it should not a problem here according to the reply from letian

What was blocking by the way? is it about the async execute not having it's own async loop?

I think it is. Not sure why the self.engine.batch_transfer_async_write()''s behavior is actually executed in a sequential way.

Copy link
Copy Markdown
Owner

@JD-ETH JD-ETH left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good for me to merge right now to get better profiling numbers

@JD-ETH JD-ETH merged commit 26173a7 into JD-ETH:jd/rdma-integration Jan 22, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants