Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
75 changes: 75 additions & 0 deletions benchmark/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,78 @@ def get_supported_modalities(self):
}


class MingFlashOmni(Model):
"""Ming-flash-omni-2.0 (inclusionAI), the Ling-2.0 sparse-MoE omni model
(100B total / 6B active params) released 2026-02-11.

Reachable today via the vllm-omni server using
``vllm_omni/deploy/ming_flash_omni.yaml`` (thinker+talker) or
``ming_flash_omni_thinker_only.yaml`` (text-only). The native ``ours`` /
``ours_openai`` backends will work once the mstar-side port under
``mstar/model/ming_omni_flash/`` is finished — until then, point the
benchmark at a vllm-omni instance with ``--inference-system vllm_omni``.

Wire shape mirrors :class:`Qwen3Omni`: standard OpenAI
``/v1/chat/completions`` with multimodal content parts. The role remap
from OpenAI's ``user``/``assistant``/``system`` to Ming's internal
``HUMAN``/``ASSISTANT``/``SYSTEM`` happens inside the jinja chat_template
shipped in ``tokenizer_config.json`` — vllm-omni renders prompts via
``tokenizer.apply_chat_template`` which uses that jinja, so the benchmark
sends the standard OpenAI shape unchanged.

Caveat: Ming ALSO ships a Python-side ``BailingMM2Processor.apply_chat_template``
(in the Ming source repo) that is strict about uppercase roles and would
AssertionError on ``user``/``assistant``. mstar's native port uses that
processor for full multimodal preprocessing (vision/audio feature
extraction) and remaps roles in ``process_prompt`` accordingly — see
``mstar/model/ming_omni_flash/`` and its tokenizer tests.
"""

def get_hf_url(self):
return "inclusionAI/Ming-flash-omni-2.0"

def get_openai_system_message(self) -> Optional[dict]:
# Ming-flash-omni-2.0's cookbook uses ``sys_prompt_exp=None`` and
# ``use_cot_system_prompt=False`` by default — there's no required
# "You are Ming…"-style preamble equivalent to Qwen3-Omni's. The HF
# processor's chat_template fills in any internal system text on its
# own, and vllm-omni's serving layer goes through that template via
# ``trust_remote_code``. Sending an explicit system message here only
# risks overriding the model's own defaults, so default to None.
return None

def get_model_kwargs(self, request_type: RequestType):
# Cap thinker output at 256 tokens for cross-system fairness — same
# rationale as Qwen3Omni: comparable runs need a fixed decode budget.
# vllm-omni's released stage default is ``max_tokens: 2048`` (see
# ``vllm_omni/deploy/ming_flash_omni.yaml`` stage 0); we lower it for
# benchmark parity. Send both ``max_tokens`` (OpenAI convention) and
# ``max_output_tokens`` (mstar's native kwarg) so the cap survives
# whichever ``--inference-system`` is in use.
#
# Force greedy on the thinker (``temperature=0.0`` at payload top-level
# in VLLMOmni.send_request) for deterministic text. The talker's
# sampling defaults live server-side in the deploy yaml
# (``stage_id: 1`` → ``temperature: 0.0`` per the released config) —
# we don't override them here.
return {
"max_tokens": 256,
"max_output_tokens": 256,
}

def get_supported_modalities(self):
return {
RequestType.T2T,
RequestType.T2S,
RequestType.I2T,
RequestType.I2S,
RequestType.A2T,
RequestType.A2S,
RequestType.V2T,
RequestType.V2S,
}


class Pi05(Model):
"""Physical Intelligence Pi0.5 VLA model.

Expand Down Expand Up @@ -286,6 +358,7 @@ class ModelType(Enum):
BAGEL = "bagel"
ORPHEUS = "orpheus"
QWEN3OMNI = "qwen3omni"
MING_FLASH_OMNI = "ming_flash_omni"
PI05 = "pi05"
VJEPA2AC = "vjepa2ac"

Expand All @@ -296,6 +369,8 @@ def inst(self, **kwargs) -> Model:
return Orpheus(**kwargs)
if self == ModelType.QWEN3OMNI:
return Qwen3Omni(**kwargs)
if self == ModelType.MING_FLASH_OMNI:
return MingFlashOmni(**kwargs)
if self == ModelType.PI05:
return Pi05(**kwargs)
if self == ModelType.VJEPA2AC:
Expand Down
7 changes: 6 additions & 1 deletion benchmark/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,12 @@ async def run(self) -> tuple[list[RequestMetrics], AggregateMetrics]:
# max_concurrency don't bottleneck on aiohttp's default 100/host limit.
connector_limit = max(100, (self.config.max_concurrency or 1) + 10)
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=300),
# Per-request inactivity budget, NOT a whole-session deadline:
# the sweep runs many (sequential, in offline mode) requests on
# one session, so a ``total`` deadline would expire mid-sweep and
# spuriously fail every remaining request. ``sock_read`` resets on
# each streamed chunk, so a healthy request never trips it.
timeout=aiohttp.ClientTimeout(total=None, sock_read=120),
connector=aiohttp.TCPConnector(limit=connector_limit),
read_bufsize=5 * 2**20, # 1MB read buffer
) as session:
Expand Down
91 changes: 90 additions & 1 deletion benchmark/vllm_omni_instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,93 @@ CUDA_VISIBLE_DEVICES=3 vllm serve ByteDance-Seed/BAGEL-7B-MoT --omni --port 8000
### for qwen3-omni:
```
vllm serve Qwen/Qwen3-Omni-30B-A3B-Instruct --omni --port 8091 --stage-configs-path vllm_omni/model_executor/stage_configs/qwen3_omni_moe_async_chunk.yaml
```
```

### for ming-flash-omni-2.0:

The released `inclusionAI/Ming-flash-omni-2.0` ckpt (~238 GB / 42 shards)
does NOT load cleanly into vllm-omni's `MingFlashOmniForConditionalGeneration`
class as-is. Two patches are needed (one-time setup):

1. **Replace metadata files.** vllm-omni's model class uses
`Qwen2VLImageProcessor` + `MingWhisperFeatureExtractor` (its own
registered classes), while the inclusionAI snapshot declares the
`BailingMM2*` processor variants via `auto_map` and `trust_remote_code`.
Use `Jonathan1909/Ming-flash-omni-2.0`'s `preprocessor_config.json`,
`config.json` (auto_map stripped), and `tokenizer*.json` instead.

2. **Replace the talker weights.** vllm-omni's `MingFlashOmniTalker` expects
weights under `audio_vae.*` but the inclusionAI talker safetensors uses
`audio.*` prefix. Jonathan1909 reshipped the talker with renamed weights
(~1.5 GB).

Building a hybrid snapshot avoids re-downloading the 200+ GB thinker weights:

```bash
# 1. Make sure the inclusionAI thinker shards are cached
huggingface-cli download inclusionAI/Ming-flash-omni-2.0 \
--include="model-*.safetensors" --include="model.safetensors.index.json"

# 2. Pull only Jonathan1909's metadata + talker (no thinker weights)
huggingface-cli download Jonathan1909/Ming-flash-omni-2.0 \
--include="*.json" --include="*.py" --include="*.txt" --include="*.mvn" \
--include="talker/**" \
--cache-dir /dev/shm/hf-cache # or any path with ~3 GB free

# 3. Stitch the two together
INCL=$(huggingface-cli scan-cache | grep inclusionAI/Ming-flash-omni-2.0 \
| awk '{print $NF}')/snapshots/$(ls ~/.cache/huggingface/hub/models--inclusionAI--Ming-flash-omni-2.0/snapshots | head -1)
JONA=/dev/shm/hf-cache/models--Jonathan1909--Ming-flash-omni-2.0/snapshots/*
HYBRID=/dev/shm/ming-hybrid
mkdir -p $HYBRID
for f in $INCL/model-*.safetensors; do ln -s "$f" "$HYBRID/$(basename $f)"; done
for f in $JONA/*; do
base=$(basename "$f")
[ -L "$HYBRID/$base" ] && rm "$HYBRID/$base"
ln -s "$f" "$HYBRID/$base"
done
```

Then serve and benchmark:

```bash
CUDA_VISIBLE_DEVICES=0,1,2,3 vllm serve /dev/shm/ming-hybrid \
--omni --port 8091 --host 0.0.0.0 --trust-remote-code \
--stage-configs-path /tmp/vllm-omni/vllm_omni/model_executor/stage_configs/ming_flash_omni.yaml

# Wait for "Application startup complete" then:
MODEL=ming_flash_omni INF_SYS=vllm_omni TASK=text_to_text \
URL=http://0.0.0.0:8091 ./benchmark/run_benchmark.sh
```

NOTE: vllm-omni's `/v1/chat/completions` rejects unknown model ids, so the
client must send `"model": "/dev/shm/ming-hybrid"` (the served path), not
`"inclusionAI/Ming-flash-omni-2.0"`. Easiest is to monkey-patch
`MingFlashOmni.get_hf_url` before calling the benchmark runner:

```python
from benchmark.base import MingFlashOmni
MingFlashOmni.get_hf_url = lambda self: "/dev/shm/ming-hybrid"
```

Or pass `--served-model-name inclusionAI/Ming-flash-omni-2.0` to `vllm serve`
(untested; would also work in principle).

#### Modalities exercised on a local 4×H100 run (2026-06-06)

| Task | Status | Notes |
|---|---|---|
| T2T (text → text) | ✅ | offline B=1: 110 tok/s, closed-loop C=32: **1060 tok/s** (full scaling sweep in [`results/ming_t2t_sweep/SUMMARY.md`](../results/ming_t2t_sweep/SUMMARY.md)) |
| I2T (image → text) | ✅ | TTFT 87 ms, ~100 tok/s on Food101 |
| A2T (audio → text) | ✅ | English transcription + Chinese audio QA both work |
| T2S (text → speech) | ✅ | RTF 0.14, 24 kHz mono PCM via harness; 44.1 kHz via direct OpenAI path |
| V2T (video → text) | ✅ | Local Ming demo mp4s; coherent descriptions (`yoga.mp4` → yoga pose narration, `cup_change.mp4` → "shell game") |
| V2S (video → speech) | ✅ | Local Ming demo mp4s; 2-3 MB WAV/clip @ 44.1 kHz |
| I2S (image → speech) | ✅ | Food101 in, ~7 s/req for ~48 s of audio |
| A2S (audio → speech) | ✅ | Ming sample wavs; 0.5-3 MB WAV/clip @ 44.1 kHz |
| T2I / I2I (image gen) | not wired | requires `ming_flash_omni_image.yaml` + a benchmark wrapper similar to BAGEL's `/v1/images/generations` path |

The V2T/V2S/A2S runs sidestep the bench harness's `UCF101Dataset` and
`LibriSpeechDataset` (both want fresh HF-Hub downloads) by hitting
`/v1/chat/completions` directly with base64-inlined media from local files
(Ming repo's `figures/cases/*.mp4` and `data/wavs/*.wav`).
35 changes: 35 additions & 0 deletions configs/ming_flash_omni.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Ming-flash-omni-2.0 — full omni deploy (text/image/audio/video in, text + speech out).
#
# Node→rank mapping for the native mstar port
# (mstar/model/ming_omni_flash/). The model registers these nodes
# (see MingFlashOmniModel.get_node_engine_types):
#
# * Thinker (KV_CACHE, TP) — Ling-2.0 sparse MoE LLM, the
# multimodal understanding core.
# * vision_encoder (STATELESS) — Qwen3-MoE ViT + projector.
# * audio_encoder (STATELESS) — Whisper encoder + projector.
# * Talker (STATELESS) — CFM talker; the AudioVAE is wrapped
# INSIDE the Talker submodule (it is
# NOT a separate graph node).
#
# Thinker runs TP=8 across all 8 H100s: TP=4 OOMs at ~78.5/80 GB per rank
# during ckpt streaming (re-verified 2026-06-08), TP=8 halves the
# footprint to ~40 GB/rank with plenty of headroom. The stateless
# encoders + the talker are small (~1.5 GB each) and colocate on rank 0.
#
# The Thinker→Talker bridge passes DETOKENIZED TEXT (re-tokenized with
# the talker's own talker/llm tokenizer), so the talker is a near-
# standalone TTS partition fed by a streaming connection — see
# MingFlashOmniModel.get_partition_topology.

model: "ming_flash_omni"
max_seq_len: 32768
node_groups:
# Stateless encoders + the talker colocate on rank 0.
- node_names: [vision_encoder, audio_encoder, Talker]
ranks: [0]

# Thinker sharded across all 8 GPUs.
- node_names: [Thinker]
ranks: [0, 1, 2, 3, 4, 5, 6, 7]
tp_size: 8
21 changes: 21 additions & 0 deletions configs/ming_flash_omni_thinker_only.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Ming-flash-omni-2.0 — thinker-only deploy (text out, no talker).
#
# TP=8 across 8 H100s. Per-rank shard_inter = 1024/8 = 128;
# experts.gate_up_proj is (256, 2*128, 4096) per rank, ~33 GB across
# 31 MoE layers. With embed + lm_head + attention + dense layer 0 +
# KV cache, ~40 GB per rank fits the 80 GB H100s comfortably.
#
# TP=4 OOMs at ~78.5 / 80 GB per rank even with
# PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True (re-verified
# 2026-06-08; loader streaming overhead pushes past the 80 GB limit).
# TP=8 halves the model footprint with plenty of headroom.
#
# Audio / vision / talker / image-gen are step 4+; this config is for
# text-only T2T benchmarking and the first mstar-served Ming forward.

model: "ming_flash_omni"
max_seq_len: 32768
node_groups:
- node_names: [Thinker]
ranks: [0, 1, 2, 3, 4, 5, 6, 7]
tp_size: 8
76 changes: 70 additions & 6 deletions mstar/communication/communicator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from abc import ABC, abstractmethod
from collections import deque
from enum import Enum

import zmq
Expand All @@ -9,6 +10,12 @@

logger = logging.getLogger(__name__)

# PUSH socket send high-water-mark. ZMQ's default (1000) is small for the
# bursty large-payload traffic between worker/conductor/api_server; raising
# it gives the kernel/zmq more room to buffer before we have to queue
# in-process. Overridable for tuning.
_SNDHWM = int(os.getenv("MSTAR_ZMQ_SNDHWM", "100000"))


class BaseCommunicator(ABC):
@abstractmethod
Expand Down Expand Up @@ -53,6 +60,15 @@ def __init__(
# TODO: maybe only open sockets as we need them, and close sockets
# when we no longer need them
self.push_sockets: dict[str, zmq.SyncSocket] = {}
# Per-peer in-process backlog of messages that couldn't be sent
# immediately (peer's receive buffer full). Drained opportunistically
# by ``_flush_outbound`` before each send and on every poll. This
# makes ``send`` non-blocking, which is what breaks the worker<->
# conductor PUSH/PULL deadlock: a blocked send used to stall the
# caller's own drain loop, so two peers could each block sending to
# the other while neither drained. With local queueing, a peer that
# is momentarily full never prevents us from servicing our PULL.
self.outbound: dict[str, deque] = {}
self.my_id = my_id
self.ipc_socket_path_prefix = ipc_socket_path_prefix

Expand All @@ -69,6 +85,7 @@ def __init__(
if id == my_id:
continue
self.push_sockets[id] = self.context.socket(zmq.PUSH)
self.push_sockets[id].setsockopt(zmq.SNDHWM, _SNDHWM)
self.push_sockets[id].connect(self._endpoint(id))
self.push_sockets[id].setsockopt(zmq.LINGER, 0)
self.poller = zmq.Poller()
Expand All @@ -80,6 +97,9 @@ def register_event_for_poll(self, event: EventWakeup):
self.event = event

def wait_for_work(self, timeout_ms=50):
# Flush any deferred sends while we're here (idle poll point), so a
# backlog drains even when no new send() calls are happening.
self._flush_outbound()
events = dict(self.poller.poll(timeout=timeout_ms))
if self.event.fd in events:
self.event.drain()
Expand Down Expand Up @@ -110,20 +130,64 @@ def _tcp_port(entity_id: str) -> int:
# def get_session_id(self) -> str:
# return self.session_id

def _socket_for(self, entity_id: str) -> "zmq.SyncSocket":
sock = self.push_sockets.get(entity_id)
if sock is None:
sock = self.context.socket(zmq.PUSH)
sock.setsockopt(zmq.SNDHWM, _SNDHWM)
sock.connect(self._endpoint(entity_id))
sock.setsockopt(zmq.LINGER, 0)
self.push_sockets[entity_id] = sock
return sock

def _flush_outbound(self, entity_id: str | None = None) -> None:
"""Try to send any queued messages. Non-blocking: stops at the first
message that would block (peer still full) and leaves the rest queued,
preserving FIFO order. Called before each send and on every poll so a
backlog drains as soon as the peer has room."""
ids = [entity_id] if entity_id is not None else list(self.outbound.keys())
for eid in ids:
q = self.outbound.get(eid)
if not q:
continue
sock = self._socket_for(eid)
while q:
msg = q[0]
try:
sock.send_pyobj(msg, flags=zmq.NOBLOCK)
except zmq.Again:
break # peer still full; retry on a later flush
q.popleft()

def send(self, entity_id: str, msg):
# TODO: maybe serialize to JSON instead if more efficient
logger.debug(
"%s to send a message %s to entity %s",
self.my_id, str(msg), entity_id
)
if entity_id not in self.push_sockets:
sock = self.context.socket(zmq.PUSH)
sock.connect(self._endpoint(entity_id))
sock.setsockopt(zmq.LINGER, 0)
self.push_sockets[entity_id] = sock
self.push_sockets[entity_id].send_pyobj(msg)
sock = self._socket_for(entity_id)
# Drain any prior backlog first so ordering is preserved.
self._flush_outbound(entity_id)
q = self.outbound.get(entity_id)
if q:
# Backlog still non-empty -> peer is full; queue this one too
# rather than block (blocking here is what deadlocks the cycle).
q.append(msg)
return
try:
sock.send_pyobj(msg, flags=zmq.NOBLOCK)
except zmq.Again:
# Peer's receive buffer is full. Queue locally and move on; the
# message is delivered on a later flush once the peer drains.
self.outbound.setdefault(entity_id, deque()).append(msg)
logger.debug(
"%s deferring send to %s (peer buffer full, %d queued)",
self.my_id, entity_id, len(self.outbound[entity_id]),
)

def get_all_new_messages(self, blocking=False) -> list:
# Opportunistically push out anything we previously had to queue.
self._flush_outbound()
messages = []
while True:
try:
Expand Down
Loading
Loading