Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
[![Latest](https://img.shields.io/badge/Latest-v0.2.1-f28500)](./fluxon_release)
[![Interfaces](https://img.shields.io/badge/Interfaces-KV%2FRPC%20%7C%20MQ%20%7C%20FS-1f6feb)](#interface-capabilities)

<details>
<summary>
<img src="https://img.shields.io/badge/WeChat-Join%20Group-07C160?logo=wechat&logoColor=white" alt="WeChat group" />
</summary>
<br/>
<img src="./pics/wechat_group.png" width="260" alt="WeChat group QR code" />
</details>

[English](./README.md) | [中文](./README_CN.md) | [Docs](https://tele-ai.github.io/Fluxon/) | [中文文档](https://tele-ai.github.io/Fluxon/cn/) | <a href="https://github.com/Tele-AI/Fluxon" title="GitHub Repository"><img src="https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png" width="18" height="18" alt="GitHub repository" /></a>

</div>
Expand Down
29 changes: 19 additions & 10 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@

![](./pics/post_en.png)

<div align="center">

[![Linux Only](https://img.shields.io/badge/Linux-Only-2ea44f)](#运行要求)
[![Python](https://img.shields.io/badge/Python-%3E%3D3.10-3776AB)](#运行要求)
[![Rust](https://img.shields.io/badge/Rust-1.93.0-000000)](./fluxon_rs/rust-toolchain.toml)
[![Latest](https://img.shields.io/badge/Latest-v0.2.1-f28500)](./fluxon_release)
[![Interfaces](https://img.shields.io/badge/Interfaces-KV%2FRPC%20%7C%20MQ%20%7C%20FS-1f6feb)](#接口能力)

<details>
<summary>
<img src="https://img.shields.io/badge/WeChat-加入微信群-07C160?logo=wechat&logoColor=white" alt="WeChat group" />
</summary>
<br/>
<img src="./pics/wechat_group.png" width="260" alt="微信群二维码" />
</details>

[中文](./README_CN.md) | [English](./README.md) | [用户文档](https://tele-ai.github.io/Fluxon/cn/) | [English Docs](https://tele-ai.github.io/Fluxon/) | <a href="https://github.com/Tele-AI/Fluxon" title="GitHub 仓库"><img src="https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png" width="18" height="18" alt="GitHub repository" /></a>

</div>

当 GPU 算力持续提升,AI 系统的瓶颈正在从单点算子扩展到数据面。推理服务需要跨节点复用 `KV Cache`;训练流水线需要在异构资源池之间传递中间态;模型文件与 `Checkpoint` 需要在远端访问与本地缓存间稳定流动。

Expand All @@ -24,17 +43,7 @@ Fluxon 的设计正是围绕这些问题展开。它将数据面资源、对象
![](./pics/fluxon架构图20260423.png)


<div align="center">

[![Linux Only](https://img.shields.io/badge/Linux-Only-2ea44f)](#运行要求)
[![Python](https://img.shields.io/badge/Python-%3E%3D3.10-3776AB)](#运行要求)
[![Rust](https://img.shields.io/badge/Rust-1.93.0-000000)](./fluxon_rs/rust-toolchain.toml)
[![Latest](https://img.shields.io/badge/Latest-v0.2.1-f28500)](./fluxon_release)
[![Interfaces](https://img.shields.io/badge/Interfaces-KV%2FRPC%20%7C%20MQ%20%7C%20FS-1f6feb)](#接口能力)

[中文](./README_CN.md) | [English](./README.md) | [用户文档](https://tele-ai.github.io/Fluxon/cn/) | [English Docs](https://tele-ai.github.io/Fluxon/) | <a href="https://github.com/Tele-AI/Fluxon" title="GitHub 仓库"><img src="https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png" width="18" height="18" alt="GitHub repository" /></a>

</div>

<a id="当前目录"></a>

Expand Down
825 changes: 825 additions & 0 deletions fluxon_doc_cn/design/kv_5_SSD存储设计.md

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions fluxon_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fluxon_rs/fluxon_kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ bytes = "1"
pprof = { version = "0.15", features = ["flamegraph"] }
hex = "0.4"
sha2 = "0.10"
io-uring = "0.7"
tokio-tungstenite = { version = "0.21", default-features = false, features = ["connect", "handshake"], optional = true }

sockudo-ws = { version = "^1.7.4", default-features = false, features = ["tokio-runtime", "fastrand"], optional = true }
Expand Down
117 changes: 107 additions & 10 deletions fluxon_rs/fluxon_kv/src/client_kv_api/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
cluster_manager::NodeID,
master_kv_router::msg_pack::{
GetAllocationMode, GetDoneReq, GetDoneResp, GetMetaReq, GetMetaResp, GetRevokeReq,
GetStartReq, GetStartResp,
GetSourceKind, GetStartReq, GetStartResp,
},
p2p::msg_pack::MsgPack,
rpcresp_kvresult_convert::msg_and_error::codes_api,
Expand All @@ -26,19 +26,27 @@ use std::sync::Arc;
pub struct RemoteGetInfo {
get_id: u64,
data_len: usize,
source_kind: GetSourceKind,
src_addr: u64,
target_addr: u64,
node_id: NodeID,
peer_is_src_or_target: bool,
}

impl RemoteGetInfo {
pub fn source_kind(&self) -> GetSourceKind {
self.source_kind
}
}

impl std::fmt::Display for RemoteGetInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"GetInfo{{ get_id: {}, data_len: {} bytes, src_addr: {:#x}, target_addr: {:#x}, node_id: {:?}, remote_transfer: {} }}",
"GetInfo{{ get_id: {}, data_len: {} bytes, source_kind: {:?}, src_addr: {:#x}, target_addr: {:#x}, node_id: {:?}, remote_transfer: {} }}",
self.get_id,
self.data_len,
self.source_kind,
self.src_addr,
self.target_addr,
self.node_id,
Expand Down Expand Up @@ -177,8 +185,80 @@ impl ClientKvApiInner {
);
}

let mut ssd_done_resp = None;
if resp.source_kind == GetSourceKind::Ssd {
let ssd_stage_len = resp.ssd_stage_len;
if ssd_stage_len < data_len as u64 {
#[cfg(test)]
{
self.test_record.remove_transfering_get(get_id);
}

self.get_revoke(get_id).await?;
return Err(KvError::Api(ApiError::InvalidArgument {
detail: format!(
"invalid ssd stage len for key={} get_id={} data_len={} ssd_stage_len={}",
key, get_id, data_len, ssd_stage_len
),
}));
}
let done_resp = match self
.stage_kv_from_ssd_source(
&resp.node_id,
key,
put_id,
get_id,
abs_src,
abs_target,
data_len as u64,
ssd_stage_len,
)
.await
{
Ok(done_resp) => done_resp,
Err(err) => {
tracing::warn!(
"kv get ssd stage failed: key={}, source_node={}, stage={:#x}, target={:#x}, len={}, ssd_stage_len={}, err={}",
key,
resp.node_id,
abs_src,
abs_target,
data_len,
ssd_stage_len,
err
);

#[cfg(test)]
{
self.test_record.remove_transfering_get(get_id);
}

obe_get_transfer_error(&metrics, &client_id, &node_role, key, data_len as u64);
self.get_revoke_ssd_source(get_id).await?;
return Err(err);
}
};
ssd_done_resp = Some(done_resp);
tracing::debug!(
"kv get ssd staged and pushed: key={}, source_node={}, stage={:#x}, target={:#x}, len={}, ssd_stage_len={}",
key,
resp.node_id,
abs_src,
abs_target,
data_len,
ssd_stage_len
);
}

// transfer data (skip if local and src==target to avoid redundant copy)
if peer_id.is_none() && abs_src == abs_target {
if resp.source_kind == GetSourceKind::Ssd {
tracing::debug!(
"kv get ssd owner push complete: key={}, target={:#x}, len={} (skip requester transfer)",
key,
abs_target,
data_len
);
} else if peer_id.is_none() && abs_src == abs_target {
tracing::debug!(
"kv get local no-op: src==target {:#x}, len={} (skip transfer)",
abs_target,
Expand Down Expand Up @@ -249,12 +329,17 @@ impl ClientKvApiInner {

// Removed post-transfer zero-header verification per request.

// Complete the get operation and get holder_id
let done_resp = match self.get_done(get_id).await {
Ok(resp) => resp,
Err(err) => {
obe_get_end_error_rpc(&metrics, &client_id, &node_role, key, data_len as u64);
return Err(err);
// Complete the get operation and get holder_id. SSD source already called
// get_done after pushing into the requester target.
let done_resp = if let Some(done_resp) = ssd_done_resp {
done_resp
} else {
match self.get_done(get_id).await {
Ok(resp) => resp,
Err(err) => {
obe_get_end_error_rpc(&metrics, &client_id, &node_role, key, data_len as u64);
return Err(err);
}
}
};
let end_handle_us = done_resp.server_process_us;
Expand Down Expand Up @@ -326,6 +411,7 @@ impl ClientKvApiInner {
let get_info = RemoteGetInfo {
get_id,
data_len,
source_kind: resp.source_kind,
src_addr: abs_src,
target_addr: abs_target,
node_id: resp.node_id.into(),
Expand Down Expand Up @@ -435,8 +521,19 @@ impl ClientKvApiInner {

/// 撤销 Get 操作,释放已分配的资源
pub async fn get_revoke(&self, get_id: u64) -> KvResult<()> {
self.get_revoke_inner(get_id, false).await
}

async fn get_revoke_ssd_source(&self, get_id: u64) -> KvResult<()> {
self.get_revoke_inner(get_id, true).await
}

async fn get_revoke_inner(&self, get_id: u64, drop_ssd_source: bool) -> KvResult<()> {
let req = MsgPack {
serialize_part: GetRevokeReq { get_id },
serialize_part: GetRevokeReq {
get_id,
drop_ssd_source,
},
raw_bytes: Vec::new(),
};

Expand Down
Loading
Loading