Skip to content

refactor(comm): Bucket is the sole transfer unit; Chunk is pure data#106

Merged
junjzhang merged 1 commit into
mainfrom
refactor/bucket-sole-transfer-unit
May 29, 2026
Merged

refactor(comm): Bucket is the sole transfer unit; Chunk is pure data#106
junjzhang merged 1 commit into
mainfrom
refactor/bucket-sole-transfer-unit

Conversation

@junjzhang
Copy link
Copy Markdown
Contributor

@junjzhang junjzhang commented May 29, 2026

What did you do

Implements #104 — a cleanup of the tensor-transfer layer that collapses it to one transfer unit, one execution engine, and threads the mesh-to-mesh topology as a single struct. No behavior change for the supported (disjoint-mesh) configs.

Execution layer

  • Chunk is now a pure shape-dependent descriptor — it no longer issues a wire op (dropped execute() and the work field). Bucket is the sole transfer unit: it owns buffer/work and issues the op in launch(). The Transferable base class and Bucket.execute are gone.
  • The two execution paths collapse into one. chunk_comm / execute_chunk_simple (per-chunk synchronous, with a cuda.synchronize() after every chunk) are deleted; a non-coalesced transfer is now the degenerate bucket_size=1 case of bucket_comm (one chunk → one single-entry bucket). execute_bucket_pipeline was inlined into bucket_comm and execution.py removed.
  • BucketEntry removed: Bucket holds chunks: list[Chunk] directly; byte offsets are the prefix sum precomputed once in __post_init__, and total_bytes is O(1).

Topology layer

  • get_m2m_map now returns an M2MMap (routes + slicers + partial reductions) instead of a loose 4-tuple; M2MMap was moved from tensor_bus/pair_state.py into comm/ir.py (it is pure comm-layer data). m2m_to_chunks (renamed from map_to_chunk_ops) takes the M2MMap directly rather than three unpacked args — the agent neither reassembles nor destructures it. The materialized list[Route] value is consistently named routes.

Agent / state

  • The agent always bucketizes (bucket_size or 1) and dispatches only bucket_comm. BatchState.send_chunks/recv_chunks are removed — chunks are a build-time intermediate; only buckets are persisted and executed, and the transfer guard checks buckets.

Net: Transferable + Chunk + Bucket + BucketEntry + 2 execution pathsChunk (descriptor) + Bucket (transfer unit) + 1 engine (get_m2m_map → m2m_to_chunks → chunk_to_bucket_ops → bucket_comm). 17 files, −114 net lines (+297 / −411).

New test cases

tests/test_self_copy_bucket.py covers the two Bucket.prepare branches now that everything is a bucket: test_self_copy_single_entry (single-entry) and test_self_copy_bundled (two local chunks coalesced → multi-entry path; asserts both the bundling and that the writes land).

Test results

All three layers (per CLAUDE.md):

Layer Result
CPU suite (test_communication_*, test_partial_chunk_reduce, test_self_copy_bucket) 57/57
vLLM weight-sync (8 GPUs, end-to-end — exercises the always-bucketize agent path) ✅ 3/3 rounds, weights propagated
Transfer benchmark — 2-node × 8-GPU NCCL, all 3 configs (no_partial / replicate_dp / partial_dp) ✅ 24 charts; Bucket throughput 318.9–326.6 GB/s, unchanged vs the pre-refactor baseline (no regression from removing BucketEntry / build-once offsets)

Reviewed by Codex (clean) and an independent scan agent (no missed renames, no incorrect changes). One regression was caught by the vLLM layer during validation — a NameError after the M2MMap-return change where the agent still referenced the removed partial_red_1/2 locals — and fixed (the CPU/unit layer didn't catch it since it doesn't drive the agent registration path).

Other comments

Deferred follow-ups, tracked separately:

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 29, 2026

Warning

Review limit reached

@junjzhang, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 44 minutes and 24 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0774bac2-72b9-4e6a-b675-772a8590f41e

📥 Commits

Reviewing files that changed from the base of the PR and between 54ffcbe and 773133d.

📒 Files selected for processing (17)
  • bench/README.md
  • bench/transfer_benchmark.py
  • src/etha/comm/__init__.py
  • src/etha/comm/comm_methods.py
  • src/etha/comm/execution.py
  • src/etha/comm/get_buckets.py
  • src/etha/comm/get_chunks.py
  • src/etha/comm/get_m2m_map.py
  • src/etha/comm/ir.py
  • src/etha/comm/transfer.py
  • src/etha/tensor_bus/agent.py
  • src/etha/tensor_bus/batch_state.py
  • src/etha/tensor_bus/pair_state.py
  • tests/test_communication_cpu.py
  • tests/test_communication_replicate_shard.py
  • tests/test_communication_symmetric_mesh.py
  • tests/test_self_copy_bucket.py
📝 Walkthrough

Walkthrough

This PR removes chunk/Transferable execution, introduces dataclass Chunk and Bucket, centralizes execution in a pipelined bucket_comm, changes get_m2m_map to return M2MMap, replaces map_to_chunk_ops with m2m_to_chunks, and updates benchmarks and tests to use bucketized transfers.

Changes

Bucket-Based Communication Migration

Layer / File(s) Summary
Benchmark: precompute single-entry M2M buckets
bench/transfer_benchmark.py, bench/README.md
Precompute single-entry M2M buckets via chunk_to_bucket_ops(..., bucket_size=1) and invoke bucket_comm for warmup/profile/timed loops; profiling traces folder renamed to m2m.
Comm API & bucket pipeline executor
src/etha/comm/__init__.py, src/etha/comm/comm_methods.py, src/etha/comm/execution.py
Removed chunk-centric exports and sequential chunk executor; bucket_comm now implements an inline pipelined executor with max_in_flight throttling and a single post-pipeline torch.cuda.synchronize().
IR: Chunk/Bucket dataclasses and transfer helpers
src/etha/comm/ir.py, src/etha/comm/transfer.py
Transferable removed. Chunk and Bucket are dataclasses with explicit descriptor/buffer/work fields; Bucket.prepare() builds a contiguous buffer and Bucket.launch() dispatches to internal transfer helpers.
Chunk/Bucket builders & M2M mapping
src/etha/comm/get_chunks.py, src/etha/comm/get_buckets.py, src/etha/comm/get_m2m_map.py
map_to_chunk_opsm2m_to_chunks(m2m, ...); get_m2m_map returns an M2MMap; chunk_to_bucket_ops constructs Bucket directly from grouped Chunk objects.
TensorBus batch wiring
src/etha/tensor_bus/agent.py, src/etha/tensor_bus/batch_state.py, src/etha/tensor_bus/pair_state.py
BatchState drops per-pair chunk lists for optional bucket lists. TensorBusAgent generates chunks with m2m_to_chunks, always unified-bucketizes collected chunks (coalesce_bytes = bucket_size or 1), and executes transfers via bucket_comm.
Tests: switch to M2M→chunks→buckets
tests/* (test_communication_cpu.py, test_communication_replicate_shard.py, test_communication_symmetric_mesh.py, test_self_copy_bucket.py)
Tests replaced map_to_chunk_ops with m2m_to_chunks and chunk_comm with bucket_comm(chunk_to_bucket_ops(..., bucket_size=1)); self-copy tests now validate single-entry and bundled bucket write paths.

Sequence Diagram(s)

sequenceDiagram
  participant Caller
  participant get_m2m_map
  participant m2m_to_chunks
  participant chunk_to_bucket_ops
  participant bucket_comm
  Caller->>get_m2m_map: compute M2MMap
  get_m2m_map->>m2m_to_chunks: m2m, rank -> chunks
  m2m_to_chunks->>chunk_to_bucket_ops: chunks
  chunk_to_bucket_ops->>bucket_comm: buckets (bucket_size=1)
  bucket_comm->>bucket_comm: prepare → launch → finalize (pipelined)
  bucket_comm->>Caller: synchronization complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

Possibly related PRs

  • cmriat/Etha#98: Related routing/Partial reductions work that this refactor builds upon.
  • cmriat/Etha#105: Overlaps on IR/transport and bucketization changes used by this PR.
  • cmriat/Etha#101: Related changes to get_m2m_map and route→chunk plumbing.

Suggested reviewers

  • jing-4369

Poem

🐰 Once chunks hopped down the transfer trail,
Now buckets barrel, steady and hale.
Routes fold into chunks, then buckets align,
Pipelined hops keep the transfers in line—
A carrot-cheer for cleaner comms! 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 56.41% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: Bucket becomes the sole transfer unit while Chunk is reduced to pure data descriptor, which is the core objective of the PR.
Description check ✅ Passed The description follows the template with clear sections: 'What did you do' explains the refactoring and issue #104; 'New test cases' describes test updates; 'Test results' provides validation results; 'Other comments' references follow-up work.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/bucket-sole-transfer-unit

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown

Failed to generate code suggestions for PR

@junjzhang
Copy link
Copy Markdown
Contributor Author

@codex review

@junjzhang
Copy link
Copy Markdown
Contributor Author

@coderabbitai full review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 29, 2026

✅ Actions performed

Full review triggered.

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Breezy!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/etha/comm/comm_methods.py (1)

81-119: Pipeline logic looks correct; one scalability note on the per-channel throttle.

FIFO advancement of prepared[0]/in_flight[0] preserves per-channel ordering, so cross-rank collective/P2P matching is maintained. The max_in_flight cap at Line 90 is enforced per channel (channels[bucket.key]), so the total number of simultaneously prepared/in-flight buckets — and their buffers and outstanding collectives — scales with the number of distinct channel keys. With bucket_size unset (single-entry buckets) and many distinct (src_rank, dst_ranks, transport) groups (e.g. a target rank receiving P2P from many sources), peak concurrent in-flight resources can be num_channels × max_in_flight. If channel counts grow large on big meshes, consider a global in-flight budget in addition to the per-channel cap to bound memory/descriptor pressure.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/etha/comm/comm_methods.py` around lines 81 - 119, The per-channel
max_in_flight cap allows total in-flight buckets to grow as num_channels ×
max_in_flight; add a global in-flight budget to bound overall concurrency:
introduce a shared counter (e.g., global_in_flight) and a config value (e.g.,
global_max_in_flight) and check/decrement it in the same places that manipulate
channel in-flight state — when moving a bucket from candidate -> prepared ensure
global_in_flight + total_prepared_in_flight < global_max_in_flight (or only when
moving prepared -> in_flight if you want to limit launched work), when
prepared[0].launch() succeeds increment global_in_flight, and when
in_flight[0].is_complete() and you finalize() decrement global_in_flight; update
the loops that reference channels, candidate, prepared, in_flight and
bucket.prepare()/launch()/finalize() to consult and maintain this global counter
atomically (or under the same lock) so the total memory/descriptor pressure is
bounded across all channels.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@src/etha/comm/comm_methods.py`:
- Around line 81-119: The per-channel max_in_flight cap allows total in-flight
buckets to grow as num_channels × max_in_flight; add a global in-flight budget
to bound overall concurrency: introduce a shared counter (e.g.,
global_in_flight) and a config value (e.g., global_max_in_flight) and
check/decrement it in the same places that manipulate channel in-flight state —
when moving a bucket from candidate -> prepared ensure global_in_flight +
total_prepared_in_flight < global_max_in_flight (or only when moving prepared ->
in_flight if you want to limit launched work), when prepared[0].launch()
succeeds increment global_in_flight, and when in_flight[0].is_complete() and you
finalize() decrement global_in_flight; update the loops that reference channels,
candidate, prepared, in_flight and bucket.prepare()/launch()/finalize() to
consult and maintain this global counter atomically (or under the same lock) so
the total memory/descriptor pressure is bounded across all channels.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 03a04e00-695e-453d-b0d9-c8372b9a2b7a

📥 Commits

Reviewing files that changed from the base of the PR and between 016b52b and a3d3103.

📒 Files selected for processing (15)
  • bench/README.md
  • bench/transfer_benchmark.py
  • src/etha/comm/__init__.py
  • src/etha/comm/comm_methods.py
  • src/etha/comm/execution.py
  • src/etha/comm/get_buckets.py
  • src/etha/comm/get_chunks.py
  • src/etha/comm/ir.py
  • src/etha/comm/transfer.py
  • src/etha/tensor_bus/agent.py
  • src/etha/tensor_bus/batch_state.py
  • tests/test_communication_cpu.py
  • tests/test_communication_replicate_shard.py
  • tests/test_communication_symmetric_mesh.py
  • tests/test_self_copy_bucket.py
💤 Files with no reviewable changes (3)
  • src/etha/comm/get_buckets.py
  • src/etha/comm/transfer.py
  • src/etha/comm/execution.py

Collapse the tensor-transfer layer to one transfer unit and one execution
engine, and thread the mesh-to-mesh topology as a single struct.

Execution:
- Chunk becomes a pure shape-dependent descriptor (no execute()/work). Bucket is
  the sole transfer unit, owning buffer/work and issuing the wire op in launch()
  (the Transferable base and Bucket.execute are gone). The two execution paths
  collapse into one: chunk_comm/execute_chunk_simple are deleted and a
  non-coalesced transfer is the degenerate bucket_size=1 case of bucket_comm
  (execute_bucket_pipeline inlined; execution.py removed).
- BucketEntry removed: Bucket holds chunks: list[Chunk] directly, byte offsets
  precomputed once in __post_init__, total_bytes O(1).

Topology:
- get_m2m_map returns an M2MMap (routes + slicers + partial reductions), moved
  into comm.ir. m2m_to_chunks (renamed from map_to_chunk_ops) takes the M2MMap
  directly instead of three unpacked args; the agent neither reassembles nor
  destructures it. The materialized list[Route] value is named "routes".

Agent always bucketizes (bucket_size or 1) and dispatches only bucket_comm;
BatchState.send_chunks/recv_chunks removed (chunks are a build-time intermediate).

Verified: CPU suite 57/57; transfer benchmark on 2-node x 8-GPU NCCL across all
3 configs (bucket throughput unchanged, ~319 GB/s); vLLM weight-sync 3/3 rounds.

Closes #104.
@junjzhang junjzhang force-pushed the refactor/bucket-sole-transfer-unit branch from 54ffcbe to 773133d Compare May 29, 2026 14:08
@junjzhang junjzhang merged commit a40ce15 into main May 29, 2026
3 of 4 checks passed
@junjzhang junjzhang deleted the refactor/bucket-sole-transfer-unit branch May 29, 2026 14:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant