Skip to content

Replace the conductor_new_token path with a token-count path #124

@NSagan271

Description

@NSagan271

Difficulty: 🟢 Good first issue (small, well-isolated)

Scope: Small; a few dozen LoC across 3 files; mostly deletion.

Subsystems: graph · worker · conductor · node_manager_utils

Prerequisites: Read how new-token signals flow from worker to conductor. No GPU work.

Problem

The conductor_new_token field on GraphEdge
(graph/base.py) is half-deprecated. It used to drive
check_stop, but stop-checking now happens on the worker (in
_postprocess_batch's check_stop call, worker.py).
The only remaining consumer of the materialized tokens is a count, in the
conductor's handler for the worker→conductor "graphs done" message
(conductor.py):

if body.new_tokens:
    for name, tokens in body.new_tokens.items():
        pstate.num_output_tokens += len(tokens)
        for conn in request_data.streaming_connections.values():
            if conn.from_partition == partition_name and conn.edge_name == name:
                conn.token_count += len(tokens)

Yet the worker pays to materialize the actual token values on the host to
produce that list: the D→H .cpu().numpy().tolist() in _send_outputs
(worker.py), plus the buffer/flush bookkeeping
(buffer_new_tokens / flush_new_tokens in
node_manager_utils.py). We move integers to
the host only to call len() on them.

Suggested tasks

  • Replace the new_tokens: dict[str, list[int]] payload on the
    worker→conductor "graphs done" message with new_token_counts: dict[str, int].
  • On the worker, count tokens from tensor shapes (the new-token tensors'
    sizes are already known — see the new_token_outputs edge collection in
    node_manager_utils.py and
    _d2h_new_tokens in worker.py) instead of doing
    the D→H copy and .tolist().
  • Update the conductor to consume counts directly.
  • Remove the now-dead buffer/flush-new-tokens machinery if nothing else uses
    it. Consider renaming conductor_new_tokenconductor_token_count (or
    similar) for clarity.

Acceptance criteria

  • num_output_tokens and streaming token_count are unchanged for all existing
    models (verify against an autoregressive model like orpheus / qwen3_omni).
  • No host materialization of token values occurs solely for counting — i.e.
    the .cpu() in the _send_outputs new-token path is gone. (The separate D→H
    in _prematerialize_for_check_stop is unrelated and stays; see Gotchas.)

Gotchas

  • This change only touches the new-token counting path. check_stop does its own
    D→H copy in _prematerialize_for_check_stop
    (worker.py) — that's a separate part of the code that
    legitimately needs the token values on the host, so leave it alone. The goal is
    to remove the redundant materialization in _send_outputs, not all D→H
    copies of tokens.
  • Confirm no other consumer reads the token values off this path (grep
    new_token, pending_new_tokens, flush_new_tokens).

New to M*? Skim How it works and the Contributing guide first.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No fields configured for Task.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions