diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6b0898e..2aff98c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Check run: cargo check --all-targets --all-features @@ -28,6 +30,7 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable with: + toolchain: 1.89.0 components: rustfmt - name: Check formatting run: cargo fmt --all -- --check @@ -39,6 +42,7 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable with: + toolchain: 1.89.0 components: clippy - uses: Swatinem/rust-cache@v2 - name: Clippy @@ -50,6 +54,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Run tests run: cargo test --all-features @@ -60,6 +66,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Run demo simulation run: | @@ -77,6 +85,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Build docs run: cargo doc --no-deps --all-features @@ -89,6 +99,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Publish dry-run (sdec-bitstream) run: cargo publish -p sdec-bitstream --dry-run @@ -99,6 +111,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Install cargo-deny run: cargo install cargo-deny --locked @@ -111,6 +125,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Run simbench run: | diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 05a40f2..af7cc38 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -14,6 +14,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.89.0 - uses: Swatinem/rust-cache@v2 - name: Validate release tag matches Cargo.toml version @@ -73,6 +75,11 @@ jobs: env: CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} + - name: Publish sdec-bevy + run: cargo publish -p sdec-bevy + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} + - name: Publish tools run: cargo publish -p sdec-tools env: diff --git a/Cargo.toml b/Cargo.toml index f8ada89..51e35a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ version = "0.7.1" edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/kplane-dev/sdec" -rust-version = "1.75" +rust-version = "1.89" [workspace.lints.rust] unsafe_code = "forbid" diff --git a/README.md b/README.md index 4ebf6be..994f620 100644 --- a/README.md +++ b/README.md @@ -17,19 +17,67 @@ For interest management and per-client change lists, use the `sdec-repgraph` crate. It decides what gets encoded and feeds directly into `codec::encode_delta_from_changes`. +## What We Solve + +Real-time state replication needs predictable bandwidth and deterministic behavior, not just fast +serialization. `sdec` is built to encode *snapshots and deltas* efficiently and safely, explicitly +prioritizing network bytes over CPU when the two are in tension, with control over change +detection, quantization, and wire budgets. + +## Philosophy + +- **Determinism over heuristics** — Same inputs produce the same bytes across platforms. +- **Bandwidth first** — Bit-packed fields and delta semantics keep packets small. +- **Explicit control** — Schemas define codecs, change policies, and thresholds. +- **Composable layers** — Codec, schema, wire framing, and relevancy are separate concerns. + ## Status 🚧 **Work in Progress** — Core protocol is stable; sessions/compact headers and repgraph integration are active, and public APIs are still evolving. -Full snapshots are for **initial sync and recovery**. Compact deltas are for -**steady-state replication**. - ## Initial Results (Simbench) -- Global delta size (dense): 259B avg, 266B p95 (vs 268B/282B naive). -- Per-client visibility: ~21B avg per client (naive list ~17B, full bincode ~65B). -- Dirty-list encode (dense): ~2x faster (97us → 47us avg). +Dense, 16 players, 300 ticks (delta encoding): + +Commands: + +```bash +cargo run -p simbench --release +``` + +| Metric | SDEC delta | Bincode delta | +| --- | --- | --- | +| Avg bytes | 259B | 1114B | +| P95 bytes | 266B | 1159B | +| Encode avg | ~10us | ~2us | + +In this codec-only harness, SDEC produces significantly smaller deltas than a +generic bincode delta payload at the cost of higher CPU per encode. This matters +in realtime replication where bandwidth (not CPU) is often the limiting factor. + +## Initial Results (sdec-bevy-demo) + +Dense, 64 entities, 300 ticks, 100% dirty (end-to-end Bevy path): + +Commands: + +```bash +cargo run -p sdec-bevy-demo --release --bin sdec-bevy-demo -- --entities 64 --ticks 300 --dirty-pct 1.0 +cargo run -p sdec-bevy-demo --release --bin sdec-bevy-demo -- --mode naive --entities 64 --ticks 300 --dirty-pct 1.0 +``` + +| Metric | SDEC | Naive (bincode snapshot) | +| --- | --- | --- | +| Avg bytes | 649B | 1416B | +| P95 bytes | 649B | 1416B | +| Encode avg | ~26us | ~2us | +| Apply avg | ~13us | ~3us | + +This path includes change extraction, delta building, encoding, and apply logic. +The naive/bincode baseline is faster but more than 2x larger on the wire. SDEC +trades CPU for bandwidth, which is usually the binding constraint in realtime +replication. See [ARCHITECTURE.md](ARCHITECTURE.md) for design details and [WIRE_FORMAT.md](WIRE_FORMAT.md) for the binary protocol specification. diff --git a/codec/src/delta.rs b/codec/src/delta.rs index 043c9aa..2f47ede 100644 --- a/codec/src/delta.rs +++ b/codec/src/delta.rs @@ -511,7 +511,8 @@ fn encode_delta_payload_from_updates( ensure_entity_ids_sorted(destroys)?; ensure_entities_sorted(creates)?; - validate_updates_for_encoding(schema, updates, limits)?; + let lookup = build_component_lookup(schema); + validate_updates_for_encoding(schema, updates, limits, &lookup)?; let mut offset = 0; if !destroys.is_empty() { @@ -537,7 +538,11 @@ fn encode_delta_payload_from_updates( SectionTag::EntityUpdateSparsePacked, &mut out[offset..], limits, - |writer| encode_update_body_sparse_packed_from_updates(schema, updates, limits, writer), + |writer| { + encode_update_body_sparse_packed_from_updates( + schema, updates, limits, &lookup, writer, + ) + }, )?; offset += written; } @@ -887,6 +892,7 @@ fn validate_updates_for_encoding( schema: &schema::Schema, updates: &[DeltaUpdateEntity], limits: &CodecLimits, + lookup: &ComponentLookup, ) -> CodecResult<()> { let mut prev: Option = None; for entity_update in updates { @@ -908,16 +914,7 @@ fn validate_updates_for_encoding( }); } for component_update in &entity_update.components { - let component = schema - .components - .iter() - .find(|component| component.id == component_update.id) - .ok_or(CodecError::InvalidMask { - kind: MaskKind::ComponentMask, - reason: MaskReason::UnknownComponent { - component: component_update.id, - }, - })?; + let component = lookup.component(schema, component_update.id)?; if component_update.fields.is_empty() { return Err(CodecError::InvalidMask { kind: MaskKind::FieldMask { @@ -958,6 +955,7 @@ fn encode_update_body_sparse_packed_from_updates( schema: &schema::Schema, updates: &[DeltaUpdateEntity], limits: &CodecLimits, + lookup: &ComponentLookup, writer: &mut BitWriter<'_>, ) -> CodecResult<()> { if updates.len() > limits.max_entities_update { @@ -983,21 +981,12 @@ fn encode_update_body_sparse_packed_from_updates( for entity_update in updates { let entity_id = entity_update.id.raw(); for component_update in &entity_update.components { - let component = schema - .components - .iter() - .find(|component| component.id == component_update.id) - .ok_or(CodecError::InvalidMask { - kind: MaskKind::ComponentMask, - reason: MaskReason::UnknownComponent { - component: component_update.id, - }, - })?; + let component = lookup.component(schema, component_update.id)?; writer.align_to_byte()?; writer.write_varu32(entity_id)?; writer.write_varu32(component.id.get() as u32)?; writer.write_varu32(component_update.fields.len() as u32)?; - let index_bits = required_bits(component.fields.len().saturating_sub(1) as u64); + let index_bits = lookup.index_bits(component.id); for (field_idx, value) in &component_update.fields { if index_bits > 0 { writer.write_bits(*field_idx as u64, index_bits)?; @@ -1015,6 +1004,54 @@ fn encode_update_body_sparse_packed_from_updates( Ok(()) } +struct ComponentLookup { + index: Vec>, + index_bits: Vec, +} + +impl ComponentLookup { + fn component<'a>( + &self, + schema: &'a schema::Schema, + id: ComponentId, + ) -> CodecResult<&'a ComponentDef> { + let idx = id.get() as usize; + let Some(Some(component_index)) = self.index.get(idx) else { + return Err(CodecError::InvalidMask { + kind: MaskKind::ComponentMask, + reason: MaskReason::UnknownComponent { component: id }, + }); + }; + Ok(&schema.components[*component_index]) + } + + fn index_bits(&self, id: ComponentId) -> u8 { + let idx = id.get() as usize; + let Some(bits) = self.index_bits.get(idx).copied() else { + return 0; + }; + bits + } +} + +fn build_component_lookup(schema: &schema::Schema) -> ComponentLookup { + let max_id = schema + .components + .iter() + .map(|component| component.id.get() as usize) + .max() + .unwrap_or(0); + let mut index = vec![None; max_id + 1]; + let mut index_bits = vec![0u8; max_id + 1]; + for (component_index, component) in schema.components.iter().enumerate() { + let id = component.id.get() as usize; + index[id] = Some(component_index); + let bits = required_bits(component.fields.len().saturating_sub(1) as u64); + index_bits[id] = bits; + } + ComponentLookup { index, index_bits } +} + fn encode_destroy_body( baseline: &Snapshot, current: &Snapshot, diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..edc4de4 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,4 @@ +[toolchain] +channel = "1.89.0" +components = ["rustfmt", "clippy"] +profile = "minimal" diff --git a/sdec-bevy-demo/Cargo.toml b/sdec-bevy-demo/Cargo.toml index 85e0da7..2ccd426 100644 --- a/sdec-bevy-demo/Cargo.toml +++ b/sdec-bevy-demo/Cargo.toml @@ -24,7 +24,7 @@ lightyear-full = ["dep:bevy", "dep:lightyear", "dep:crossbeam-channel"] [dependencies] anyhow = "1.0" -bevy_ecs = "0.13" +bevy_ecs = "0.18" bincode = "1.3" bitcode = "0.6.9" clap = { version = "4.5.2", features = ["derive"] } @@ -36,6 +36,6 @@ sdec-bevy = { path = "../sdec-bevy", version = "0.7.1" } serde = { workspace = true, features = ["derive"] } serde_json = "1.0" wire.workspace = true -bevy = { version = "0.13", default-features = false, features = ["multi-threaded"], optional = true } -lightyear = { version = "0.14.1", optional = true } +bevy = { version = "0.18", default-features = false, features = ["multi_threaded"], optional = true } +lightyear = { version = "0.26.4", features = ["crossbeam", "netcode"], optional = true } crossbeam-channel = { version = "0.5", optional = true } diff --git a/sdec-bevy-demo/src/bin/lightyear_full.rs b/sdec-bevy-demo/src/bin/lightyear_full.rs index 96dbb58..79773a9 100644 --- a/sdec-bevy-demo/src/bin/lightyear_full.rs +++ b/sdec-bevy-demo/src/bin/lightyear_full.rs @@ -7,9 +7,10 @@ use bevy::prelude::*; use bevy::time::TimeUpdateStrategy; use clap::Parser; use crossbeam_channel::{unbounded, Receiver, Sender}; -use lightyear::prelude::client::ClientCommands; -use lightyear::prelude::server::ServerCommands; -use lightyear::prelude::*; +use lightyear::crossbeam::CrossbeamIo; +use lightyear::link::SendPayload; +use lightyear::netcode::generate_key; +use lightyear::prelude::{client as ly_client, server as ly_server, *}; use serde::{Deserialize, Serialize}; #[derive(Parser)] @@ -39,7 +40,7 @@ struct Cli { validate: bool, } -#[derive(Component, Clone, Copy, Serialize, Deserialize, Reflect)] +#[derive(Component, Clone, Copy, Serialize, Deserialize, Reflect, PartialEq)] #[reflect(Component)] struct PositionYaw { x_q: i64, @@ -47,14 +48,13 @@ struct PositionYaw { yaw: u16, } -#[derive(Channel, Reflect)] struct ReplicationChannel; struct ProtocolPlugin; impl Plugin for ProtocolPlugin { fn build(&self, app: &mut App) { - app.register_component::(ChannelDirection::ServerToClient); + app.register_component::(); app.add_channel::(ChannelSettings { mode: ChannelMode::UnorderedUnreliable, ..default() @@ -104,10 +104,15 @@ struct Summary { } struct Forwarder { - s2c_raw_rx: Receiver>, - s2c_tx: Sender>, - c2s_raw_rx: Receiver>, - c2s_tx: Sender>, + s2c_raw_rx: Receiver, + s2c_tx: Sender, + c2s_raw_rx: Receiver, + c2s_tx: Sender, +} + +struct ClientHandle { + app: App, + entity: Entity, } fn main() -> Result<()> { @@ -118,80 +123,99 @@ fn main() -> Result<()> { let private_key = generate_key(); let server_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - let mut channels = Vec::new(); - let mut client_ios = Vec::new(); - let mut forwarders = Vec::new(); - for idx in 0..cli.clients { - let (s2c_raw_tx, s2c_raw_rx) = unbounded::>(); - let (s2c_tx, s2c_rx) = unbounded::>(); - let (c2s_raw_tx, c2s_raw_rx) = unbounded::>(); - let (c2s_tx, c2s_rx) = unbounded::>(); - let client_addr: SocketAddr = format!("127.0.0.1:{}", 20000 + idx).parse().unwrap(); - channels.push((client_addr, c2s_rx, s2c_raw_tx)); - client_ios.push((s2c_rx, c2s_raw_tx)); - forwarders.push(Forwarder { - s2c_raw_rx, - s2c_tx, - c2s_raw_rx, - c2s_tx, - }); - } - - let shared = SharedConfig::default(); - let server_config = server::ServerConfig { - shared: shared.clone(), - net: vec![server::NetConfig::Netcode { - config: server::NetcodeConfig::default() - .with_protocol_id(protocol_id) - .with_key(private_key), - io: IoConfig { - transport: TransportConfig::Channels { channels }, - conditioner: None, - }, - }], - ..Default::default() - }; + let tick_duration = Duration::from_secs_f32(1.0 / 60.0); let mut server_app = App::new(); server_app.add_plugins(MinimalPlugins.build()); - server_app.add_plugins((server::ServerPlugin::new(server_config), ProtocolPlugin)); + server_app.add_plugins((ly_server::ServerPlugins { tick_duration }, ProtocolPlugin)); server_app.finish(); - server_app - .world - .run_system_once(|mut commands: Commands| commands.start_server()); - let now = bevy::utils::Instant::now(); + let server_config = ly_server::NetcodeConfig::default() + .with_protocol_id(protocol_id) + .with_key(private_key); + let server_entity = server_app + .world_mut() + .spawn(ly_server::NetcodeServer::new(server_config)) + .id(); + server_app + .world_mut() + .run_system_once(move |mut commands: Commands| { + commands.trigger(ly_server::Start { + entity: server_entity, + }); + }) + .map_err(|err| anyhow::anyhow!("{err:?}"))?; + + let now = Instant::now(); server_app.insert_resource(TimeUpdateStrategy::ManualInstant(now)); + let mut forwarders = Vec::new(); let mut client_apps = Vec::new(); - for (idx, (recv, send)) in client_ios.into_iter().enumerate() { - let net_config = client::NetConfig::Netcode { - auth: client::Authentication::Manual { - server_addr, - client_id: (idx as u64) + 1, - private_key, - protocol_id, - }, - config: Default::default(), - io: IoConfig { - transport: TransportConfig::LocalChannel { recv, send }, - conditioner: None, - }, - }; - let config = client::ClientConfig { - shared: shared.clone(), - net: net_config, - ..default() + for idx in 0..cli.clients { + let (s2c_raw_tx, s2c_raw_rx) = unbounded::(); + let (s2c_tx, s2c_rx) = unbounded::(); + let (c2s_raw_tx, c2s_raw_rx) = unbounded::(); + let (c2s_tx, c2s_rx) = unbounded::(); + + let server_link = server_app + .world_mut() + .spawn(( + CrossbeamIo::new(s2c_raw_tx, c2s_rx), + ly_server::LinkOf { + server: server_entity, + }, + )) + .id(); + server_app + .world_mut() + .run_system_once(move |mut commands: Commands| { + commands.trigger(LinkStart { + entity: server_link, + }); + }) + .map_err(|err| anyhow::anyhow!("{err:?}"))?; + + let auth = Authentication::Manual { + server_addr, + client_id: (idx as u64) + 1, + private_key, + protocol_id, }; + let netcode_client = + ly_client::NetcodeClient::new(auth, ly_client::NetcodeConfig::default())?; + let mut client_app = App::new(); client_app.add_plugins(MinimalPlugins.build()); - client_app.add_plugins((client::ClientPlugin::new(config), ProtocolPlugin)); + client_app.add_plugins((ly_client::ClientPlugins { tick_duration }, ProtocolPlugin)); client_app.finish(); + + let client_entity = client_app + .world_mut() + .spawn((netcode_client, CrossbeamIo::new(c2s_raw_tx, s2c_rx))) + .id(); client_app - .world - .run_system_once(|mut commands: Commands| commands.connect_client()); + .world_mut() + .run_system_once(move |mut commands: Commands| { + commands.trigger(LinkStart { + entity: client_entity, + }); + commands.trigger(ly_client::Connect { + entity: client_entity, + }); + }) + .map_err(|err| anyhow::anyhow!("{err:?}"))?; client_app.insert_resource(TimeUpdateStrategy::ManualInstant(now)); - client_apps.push(client_app); + client_apps.push(ClientHandle { + app: client_app, + entity: client_entity, + }); + + forwarders.push(Forwarder { + s2c_raw_rx, + s2c_tx, + c2s_raw_rx, + c2s_tx, + }); } // Spawn server entities. @@ -201,7 +225,9 @@ fn main() -> Result<()> { y_q: 0, yaw: 0, }; - server_app.world.spawn((position, Replicate::default())); + server_app + .world_mut() + .spawn((position, Replicate::default())); } let mut bytes = Vec::new(); @@ -210,7 +236,6 @@ fn main() -> Result<()> { let mut per_tick_bytes: Vec = vec![0; cli.clients as usize]; let mut current_time = now; - let tick_duration = shared.tick.tick_duration; // Let connections handshake and sync before we start measuring. let mut synced = false; @@ -218,23 +243,26 @@ fn main() -> Result<()> { current_time += tick_duration; server_app.insert_resource(TimeUpdateStrategy::ManualInstant(current_time)); for client_app in &mut client_apps { - client_app.insert_resource(TimeUpdateStrategy::ManualInstant(current_time)); + client_app + .app + .insert_resource(TimeUpdateStrategy::ManualInstant(current_time)); } forward_client_to_server(&mut forwarders); server_app.update(); per_tick_bytes.fill(0); forward_server_to_client(&mut forwarders, &mut per_tick_bytes); for client_app in &mut client_apps { - client_app.update(); + client_app.app.update(); } - let all_synced = client_apps.iter().all(|client_app| { + let all_connected = client_apps.iter().all(|client_app| { client_app - .world - .get_resource::() - .is_some_and(|manager| manager.is_synced()) + .app + .world() + .get::(client_app.entity) + .is_some() }); - if all_synced { + if all_connected { synced = true; break; } @@ -250,13 +278,16 @@ fn main() -> Result<()> { current_time += tick_duration; server_app.insert_resource(TimeUpdateStrategy::ManualInstant(current_time)); for client_app in &mut client_apps { - client_app.insert_resource(TimeUpdateStrategy::ManualInstant(current_time)); + client_app + .app + .insert_resource(TimeUpdateStrategy::ManualInstant(current_time)); } // mutate a subset of entities { - let mut query = server_app.world.query::<&mut PositionYaw>(); - for mut pos in query.iter_mut(&mut server_app.world) { + let world = server_app.world_mut(); + let mut query = world.query::<&mut PositionYaw>(); + for mut pos in query.iter_mut(world) { if rng.chance() > cli.dirty_pct { continue; } @@ -271,7 +302,7 @@ fn main() -> Result<()> { server_app.update(); server_times.push(server_start.elapsed()); let server_digest = if cli.validate { - Some(state_digest_server(&mut server_app.world)) + Some(state_digest_server(server_app.world_mut())) } else { None }; @@ -281,10 +312,10 @@ fn main() -> Result<()> { let mut per_tick_client_time = Duration::default(); for client_app in &mut client_apps { let start = Instant::now(); - client_app.update(); + client_app.app.update(); per_tick_client_time += start.elapsed(); if let Some((server_count, server_hash)) = server_digest { - let (client_count, client_hash) = state_digest_client(&mut client_app.world); + let (client_count, client_hash) = state_digest_client(client_app.app.world_mut()); if client_count != server_count || client_hash != server_hash { validation_errors += 1; } diff --git a/sdec-bevy-demo/src/main.rs b/sdec-bevy-demo/src/main.rs index ab3abf7..5122e2b 100644 --- a/sdec-bevy-demo/src/main.rs +++ b/sdec-bevy-demo/src/main.rs @@ -181,6 +181,12 @@ struct Summary { bytes_p95: u64, encode_us_avg: u64, encode_us_p95: u64, + replication_us_avg: u64, + replication_us_p95: u64, + codec_us_avg: u64, + codec_us_p95: u64, + header_us_avg: u64, + header_us_p95: u64, apply_us_avg: u64, apply_us_p95: u64, errors: u64, @@ -206,6 +212,7 @@ struct ClientState<'a> { client_id: ClientId, world: World, entities: EntityMap, + visible: HashSet, session: Option, last_tick: SnapshotTick, last_applied_tick: SnapshotTick, @@ -234,6 +241,7 @@ impl<'a> ClientState<'a> { client_id, world: World::new(), entities: EntityMap::new(), + visible: HashSet::new(), session: None, last_tick: SnapshotTick::new(0), last_applied_tick: SnapshotTick::new(0), @@ -246,7 +254,6 @@ impl<'a> ClientState<'a> { } } } - fn main() -> Result<()> { let cli = Cli::parse(); fs::create_dir_all(&cli.out_dir) @@ -274,6 +281,9 @@ fn main() -> Result<()> { let mut bytes = Vec::new(); let mut enc_times = Vec::new(); + let mut replication_times = Vec::new(); + let mut codec_times = Vec::new(); + let mut header_times = Vec::new(); let mut apply_times = Vec::new(); let mut errors = 0u64; let mut resyncs = 0u64; @@ -403,44 +413,46 @@ fn main() -> Result<()> { graph.remove_entity(*destroy); } for (client_idx, client_state) in clients_state.iter_mut().enumerate() { - let visible_ids = if all_visible { - all_ids.clone() + if all_visible { + client_state.visible.clear(); + client_state.visible.extend(all_ids.iter().copied()); } else { - visible_entity_ids( + client_state.visible = visible_entity_ids( &positions, &mut server_entities, client_positions[client_idx], visibility_radius, - ) - }; + ); + } let start = Instant::now(); - let payload = match cli.mode { + let (payload, replication_elapsed, codec_elapsed, header_elapsed) = match cli.mode { Mode::Sdec => { + let replication_start = Instant::now(); let world_view = DemoWorldView { schema: &schema, world: &server_world, entities: &server_entities, }; let delta = graph.build_client_delta(client_state.client_id, &world_view); - if matches!(cli.mode, Mode::Sdec) { - total_create_entities += delta.creates.len() as u64; - total_destroy_entities += delta.destroys.len() as u64; - total_update_entities += delta.updates.len() as u64; - for create in &delta.creates { - total_create_components += create.components.len() as u64; - for component in &create.components { - total_create_fields += component.fields.len() as u64; - } + let replication_elapsed = replication_start.elapsed(); + total_create_entities += delta.creates.len() as u64; + total_destroy_entities += delta.destroys.len() as u64; + total_update_entities += delta.updates.len() as u64; + for create in &delta.creates { + total_create_components += create.components.len() as u64; + for component in &create.components { + total_create_fields += component.fields.len() as u64; } - for update in &delta.updates { - total_update_components += update.components.len() as u64; - for component in &update.components { - total_update_fields += component.fields.len() as u64; - } + } + for update in &delta.updates { + total_update_components += update.components.len() as u64; + for component in &update.components { + total_update_fields += component.fields.len() as u64; } } if tick == 1 { + let codec_start = Instant::now(); let len = encode_full_snapshot_retry( schema.schema(), SnapshotTick::new(tick), @@ -448,10 +460,13 @@ fn main() -> Result<()> { &sdec_limits, &mut client_state.send_buf, )?; + let codec_elapsed = codec_start.elapsed(); client_state.send_buf.truncate(len); client_state.last_tick = SnapshotTick::new(tick); - std::mem::take(&mut client_state.send_buf) + let payload = std::mem::take(&mut client_state.send_buf); + (payload, replication_elapsed, codec_elapsed, Duration::ZERO) } else { + let codec_start = Instant::now(); let len = encode_delta_retry( &mut client_state.encoder, SnapshotTick::new(tick), @@ -461,7 +476,9 @@ fn main() -> Result<()> { &delta.updates, &mut client_state.delta_buf, )?; + let codec_elapsed = codec_start.elapsed(); let payload = &client_state.delta_buf[wire::HEADER_SIZE..len]; + let header_start = Instant::now(); let compact_len = build_compact_packet( &mut client_state.send_buf, wire::SessionFlags::delta_snapshot(), @@ -470,29 +487,55 @@ fn main() -> Result<()> { SnapshotTick::new(tick.saturating_sub(1)), payload, )?; + let header_elapsed = header_start.elapsed(); client_state.send_buf.truncate(compact_len); client_state.last_tick = SnapshotTick::new(tick); - std::mem::take(&mut client_state.send_buf) + let packet = std::mem::take(&mut client_state.send_buf); + (packet, replication_elapsed, codec_elapsed, header_elapsed) } } Mode::Naive => { + let replication_start = Instant::now(); let snapshot = build_snapshot_for_ids( &mut server_world, &mut server_entities, - &visible_ids, + &client_state.visible, ); - bincode::serialize(&snapshot).context("serialize naive snapshot")? + let replication_elapsed = replication_start.elapsed(); + let codec_start = Instant::now(); + let payload = + bincode::serialize(&snapshot).context("serialize naive snapshot")?; + ( + payload, + replication_elapsed, + codec_start.elapsed(), + Duration::ZERO, + ) } Mode::Lightyear => { + let replication_start = Instant::now(); let snapshot = build_snapshot_for_ids( &mut server_world, &mut server_entities, - &visible_ids, + &client_state.visible, ); - bitcode::encode(&snapshot) + let replication_elapsed = replication_start.elapsed(); + let codec_start = Instant::now(); + let payload = bitcode::encode(&snapshot); + ( + payload, + replication_elapsed, + codec_start.elapsed(), + Duration::ZERO, + ) } }; enc_times.push(start.elapsed()); + replication_times.push(replication_elapsed); + codec_times.push(codec_elapsed); + if header_elapsed > Duration::ZERO { + header_times.push(header_elapsed); + } bytes.push(payload.len() as u64); if rng.chance() >= cli.drop_rate { @@ -506,6 +549,7 @@ fn main() -> Result<()> { } } else if matches!(cli.mode, Mode::Sdec) { client_state.send_buf = payload; + client_state.send_buf.clear(); } if let Some(delivered) = client_state.queue.pop_front() { @@ -558,9 +602,6 @@ fn main() -> Result<()> { } } } - if matches!(cli.mode, Mode::Sdec) { - client_state.send_buf = delivered; - } if apply_result.is_err() { errors += 1; client_state.errors += 1; @@ -580,7 +621,7 @@ fn main() -> Result<()> { &mut client_state.world, &mut client_state.entities, &mut client_state.session, - &visible_ids, + &client_state.visible, &sdec_limits, ) { if resynced { @@ -672,6 +713,10 @@ fn main() -> Result<()> { } } } + if matches!(cli.mode, Mode::Sdec) { + client_state.send_buf = delivered; + client_state.send_buf.clear(); + } } } @@ -698,6 +743,12 @@ fn main() -> Result<()> { bytes_p95: p95_u64(&mut bytes.clone()), encode_us_avg: avg_duration_us(&enc_times), encode_us_p95: p95_duration_us(&mut enc_times.clone()), + replication_us_avg: avg_duration_us(&replication_times), + replication_us_p95: p95_duration_us(&mut replication_times.clone()), + codec_us_avg: avg_duration_us(&codec_times), + codec_us_p95: p95_duration_us(&mut codec_times.clone()), + header_us_avg: avg_duration_us(&header_times), + header_us_p95: p95_duration_us(&mut header_times.clone()), apply_us_avg: avg_duration_us(&apply_times), apply_us_p95: p95_duration_us(&mut apply_times.clone()), errors, diff --git a/sdec-bevy/Cargo.toml b/sdec-bevy/Cargo.toml index dc71e4a..a18e73d 100644 --- a/sdec-bevy/Cargo.toml +++ b/sdec-bevy/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "sdec-bevy" description = "Bevy adapter for sdec schema + delta encoding" -readme = "../README.md" +readme = "README.md" keywords = ["bevy", "replication", "delta", "snapshot", "netcode"] categories = ["game-development", "network-programming", "encoding"] +documentation = "https://docs.rs/sdec-bevy" version.workspace = true edition.workspace = true license.workspace = true @@ -15,7 +16,7 @@ name = "sdec_bevy" [dependencies] anyhow = "1.0" -bevy_ecs = "0.13" +bevy_ecs = "0.18" codec.workspace = true schema.workspace = true wire.workspace = true diff --git a/sdec-bevy/README.md b/sdec-bevy/README.md new file mode 100644 index 0000000..1b77ed9 --- /dev/null +++ b/sdec-bevy/README.md @@ -0,0 +1,25 @@ +# sdec-bevy + +Bevy ECS integration for the SDEC snapshot + delta codec. + +## What it provides + +- Build a `schema::Schema` from Bevy components via `BevySchemaBuilder`. +- Extract per-tick creates/destroys/updates from a `World`. +- Apply decoded SDEC updates back into a Bevy `World`. +- Map Bevy `Entity` IDs to stable SDEC `EntityId` values. + +## Typical flow + +1. Define replicated components with `ReplicatedComponent`/`ReplicatedField`. +2. Build a schema with `BevySchemaBuilder`. +3. Call `extract_changes` (or `extract_changes_with_scratch`) each tick. +4. Encode deltas with `sdec-codec` and send over your transport. +5. Decode and apply on the client with `apply_changes`. + +## Notes + +This crate focuses on Bevy ECS integration only. Transport, packetization, +relevancy, and chunking are handled outside `sdec-bevy`. + +For a working example, see `sdec-bevy-demo`. diff --git a/sdec-bevy/src/extract.rs b/sdec-bevy/src/extract.rs index 3640eab..9b3689c 100644 --- a/sdec-bevy/src/extract.rs +++ b/sdec-bevy/src/extract.rs @@ -14,75 +14,91 @@ pub struct BevyChangeSet { pub updates: Vec, } +#[derive(Debug, Default)] +pub(crate) struct ExtractScratch { + create_entities: HashSet, + update_entities: HashMap>, + destroys: HashSet, +} + pub fn extract_changes( schema: &BevySchema, world: &mut World, entities: &mut EntityMap, ) -> BevyChangeSet { - let mut create_entities = HashSet::new(); - let mut update_entities: HashMap> = HashMap::new(); - let mut destroys: HashSet = HashSet::new(); + let mut scratch = ExtractScratch::default(); + let mut changes = BevyChangeSet::default(); + extract_changes_with_scratch(schema, world, entities, &mut scratch, &mut changes); + changes +} + +pub(crate) fn extract_changes_with_scratch( + schema: &BevySchema, + world: &mut World, + entities: &mut EntityMap, + scratch: &mut ExtractScratch, + out: &mut BevyChangeSet, +) { + scratch.create_entities.clear(); + scratch.update_entities.clear(); + scratch.destroys.clear(); + out.creates.clear(); + out.updates.clear(); + out.destroys.clear(); for adapter in schema.adapters() { for entity in adapter.added_entities(world) { - create_entities.insert(entity); + scratch.create_entities.insert(entity); } for entity in adapter.changed_entities(world) { - update_entities + scratch + .update_entities .entry(entity) .or_default() .push(adapter.component_id()); } for entity in adapter.removed_entities(world) { if let Some(id) = entities.entity_id_known(entity) { - destroys.insert(id); + scratch.destroys.insert(id); } } } - let mut creates = Vec::with_capacity(create_entities.len()); - for entity in create_entities.iter().copied() { + out.creates.reserve(scratch.create_entities.len()); + for entity in scratch.create_entities.iter().copied() { let id = entities.entity_id(entity); let components = schema.snapshot_entity(world, entity); if components.is_empty() { continue; } - creates.push(EntitySnapshot { id, components }); + out.creates.push(EntitySnapshot { id, components }); } - let mut updates = Vec::with_capacity(update_entities.len()); - for (entity, components) in update_entities { - if create_entities.contains(&entity) { + out.updates.reserve(scratch.update_entities.len()); + for (entity, components) in scratch.update_entities.iter() { + if scratch.create_entities.contains(entity) { continue; } - let id = entities.entity_id(entity); - let mut delta_components = Vec::with_capacity(components.len()); - for adapter in schema.adapters() { - if !components.contains(&adapter.component_id()) { - continue; - } - if let Some(component_update) = adapter.update_component(world, entity) { - delta_components.push(component_update); + let id = entities.entity_id(*entity); + let mut delta_components = Vec::new(); + for component_id in components { + if let Some(adapter) = schema.adapter_by_component(*component_id) { + if let Some(component_update) = adapter.update_component(world, *entity) { + delta_components.push(component_update); + } } } if !delta_components.is_empty() { - updates.push(DeltaUpdateEntity { + out.updates.push(DeltaUpdateEntity { id, components: delta_components, }); } } - let mut destroys_vec: Vec = Vec::with_capacity(destroys.len()); - destroys_vec.extend(destroys); - destroys_vec.sort_by_key(|id| id.raw()); - - creates.sort_by_key(|entity| entity.id.raw()); - updates.sort_by_key(|entity| entity.id.raw()); - - BevyChangeSet { - creates, - destroys: destroys_vec, - updates, - } + out.destroys.reserve(scratch.destroys.len()); + out.destroys.extend(scratch.destroys.iter().copied()); + out.destroys.sort_by_key(|id| id.raw()); + out.creates.sort_by_key(|entity| entity.id.raw()); + out.updates.sort_by_key(|entity| entity.id.raw()); } diff --git a/sdec-bevy/src/replicator.rs b/sdec-bevy/src/replicator.rs index 4bb4f18..957fcba 100644 --- a/sdec-bevy/src/replicator.rs +++ b/sdec-bevy/src/replicator.rs @@ -8,7 +8,7 @@ use codec::{ use wire::{decode_packet, Limits as WireLimits}; use crate::apply::apply_changes; -use crate::extract::extract_changes; +use crate::extract::{extract_changes_with_scratch, BevyChangeSet, ExtractScratch}; use crate::mapping::EntityMap; use crate::metrics::{EncodeMetrics, MetricsSink}; use crate::schema::BevySchema; @@ -20,6 +20,8 @@ pub struct BevyReplicator { entities: EntityMap, session: Option, metrics: Option>, + change_set: BevyChangeSet, + extract_scratch: ExtractScratch, } impl BevyReplicator { @@ -32,6 +34,8 @@ impl BevyReplicator { entities: EntityMap::new(), session: None, metrics: None, + change_set: BevyChangeSet::default(), + extract_scratch: ExtractScratch::default(), } } @@ -56,16 +60,22 @@ impl BevyReplicator { baseline_tick: codec::SnapshotTick, out: &mut [u8], ) -> Result { - let changes = extract_changes(&self.schema, world, &mut self.entities); + extract_changes_with_scratch( + &self.schema, + world, + &mut self.entities, + &mut self.extract_scratch, + &mut self.change_set, + ); let mut encoder = SessionEncoder::new(self.schema.schema(), &self.limits); let start = Instant::now(); let bytes = encode_delta_from_changes( &mut encoder, tick, baseline_tick, - &changes.creates, - &changes.destroys, - &changes.updates, + &self.change_set.creates, + &self.change_set.destroys, + &self.change_set.updates, out, )?; if let Some(metrics) = self.metrics.as_mut() { diff --git a/sdec-bevy/src/schema.rs b/sdec-bevy/src/schema.rs index a406e23..7fd1452 100644 --- a/sdec-bevy/src/schema.rs +++ b/sdec-bevy/src/schema.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::marker::PhantomData; use anyhow::{anyhow, Result}; +use bevy_ecs::component::Mutable; use bevy_ecs::prelude::{Component, Entity, World}; use codec::{ComponentSnapshot, DeltaUpdateComponent, DeltaUpdateEntity, FieldValue}; use schema::{ChangePolicy, ComponentDef, ComponentId, FieldCodec, FieldDef, FieldId, Schema}; @@ -14,7 +15,7 @@ pub struct ReplicatedField { pub change: Option, } -pub trait ReplicatedComponent: Component { +pub trait ReplicatedComponent: Component { const COMPONENT_ID: u16; fn fields() -> Vec; @@ -271,7 +272,7 @@ impl BevySchemaBuilder { } pub fn build(self) -> Result { - let mut components = Vec::new(); + let mut components = Vec::with_capacity(self.adapters.len()); for adapter in &self.adapters { components.push(adapter.schema_def()); } diff --git a/simbench/Cargo.toml b/simbench/Cargo.toml index bb95b3b..65a8b42 100644 --- a/simbench/Cargo.toml +++ b/simbench/Cargo.toml @@ -22,6 +22,10 @@ clap = { version = "4.5.2", features = ["derive"] } serde = { workspace = true, features = ["derive"] } serde_json = "1.0" bincode = "1.3" +bitcode = "0.6.9" +lightyear_core = "0.26.4" +lightyear_replication = "0.26.4" +lightyear_serde = "0.26.4" [dev-dependencies] criterion.workspace = true diff --git a/simbench/src/main.rs b/simbench/src/main.rs index 35f4a70..81ca719 100644 --- a/simbench/src/main.rs +++ b/simbench/src/main.rs @@ -9,8 +9,12 @@ use codec::{ encode_delta_from_changes, encode_delta_snapshot_with_scratch, encode_full_snapshot, CodecLimits, CodecScratch, DeltaUpdateComponent, DeltaUpdateEntity, SessionEncoder, }; +use lightyear_core::tick::Tick; +use lightyear_replication::delta::{DeltaType, Diffable}; +use lightyear_serde::registry::SerializeFns; +use lightyear_serde::writer::Writer; use repgraph::{ClientId, ClientView, ReplicationConfig, ReplicationGraph, Vec3, WorldView}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use wire::Limits as WireLimits; #[derive(Parser)] @@ -77,6 +81,9 @@ struct Cli { /// Fail if average delta packet size exceeds this value. #[arg(long)] max_avg_delta_bytes: Option, + /// Use full snapshots every tick (no deltas). + #[arg(long, default_value_t = false)] + full_only: bool, } #[derive(Clone, Copy, Debug, ValueEnum, Serialize, PartialEq, Eq)] @@ -116,7 +123,13 @@ fn main() -> Result<()> { let mut sdec = EncoderStats::default(); let mut sdec_dirty = EncoderStats::default(); let mut naive = EncoderStats::default(); - let mut full_bincode_bytes_total = 0u64; + let mut bincode_full = SizeTimeStats::default(); + let mut lightyear_bitcode = SizeTimeStats::default(); + let mut lightyear_delta = EncoderStats::default(); + let mut sdec_full_session = SizeTimeStats::default(); + let mut sdec_session_init_bytes = 0u64; + let mut sdec_session_state: Option = None; + let mut bincode_delta = EncoderStats::default(); let mut full_bytes_total = 0u64; let mut full_count = 0u32; @@ -133,13 +146,49 @@ fn main() -> Result<()> { }; let mut client_baselines: Vec = Vec::new(); + if cli.full_only { + let mut init_buf = vec![0u8; 128]; + let init_len = codec::encode_session_init_packet( + &schema, + codec::SnapshotTick::new(0), + Some(1), + codec::CompactHeaderMode::SessionV1, + &limits, + &mut init_buf, + )?; + sdec_session_init_bytes = init_len as u64; + let init_packet = wire::decode_packet(&init_buf[..init_len], &wire::Limits::default()) + .context("decode session init")?; + let session = codec::decode_session_init_packet(&schema, &init_packet, &limits) + .context("decode session init packet")?; + sdec_session_state = Some(session); + } + for tick in 1..=cli.ticks { step_states(&mut states, &mut rng, tick, &cli); let snapshot = build_snapshot(codec::SnapshotTick::new(tick), &states); - full_bincode_bytes_total += encode_bincode_snapshot(&states)? as u64; - - if tick == 1 { + let bincode_start = Instant::now(); + let bincode_bytes = encode_bincode_snapshot(&states)? as u64; + let bincode_us = bincode_start.elapsed().as_micros() as u64; + bincode_full.add(bincode_bytes, bincode_us); + let bitcode_start = Instant::now(); + let bitcode_bytes = encode_bitcode_snapshot(&states)? as u64; + let bitcode_us = bitcode_start.elapsed().as_micros() as u64; + lightyear_bitcode.add(bitcode_bytes, bitcode_us); + + if cli.full_only { + let full_start = Instant::now(); + let full_bytes = encode_full(&schema, &snapshot, &limits)?; + let full_elapsed = full_start.elapsed(); + full_bytes_total += full_bytes.len() as u64; + full_count += 1; + if let Some(session) = sdec_session_state.as_mut() { + let session_bytes = + encode_full_snapshot_session_header(&snapshot, session, full_bytes.len())?; + sdec_full_session.add(session_bytes as u64, full_elapsed.as_micros() as u64); + } + } else if tick == 1 { let full_bytes = encode_full(&schema, &snapshot, &limits)?; full_bytes_total += full_bytes.len() as u64; full_count += 1; @@ -172,6 +221,19 @@ fn main() -> Result<()> { let naive_elapsed = naive_start.elapsed(); let naive_us = naive_elapsed.as_micros() as u64; naive.add_with_tick(naive_bytes as u64, naive_us, tick); + let bincode_delta_start = Instant::now(); + let bincode_delta_bytes = encode_bincode_delta(&schema, &baseline_snapshot, &snapshot)?; + let bincode_delta_elapsed = bincode_delta_start.elapsed(); + let bincode_delta_us = bincode_delta_elapsed.as_micros() as u64; + bincode_delta.add_with_tick(bincode_delta_bytes as u64, bincode_delta_us, tick); + if let Some(prev_states) = last_states.as_deref() { + let lightyear_start = Instant::now(); + let lightyear_bytes = + encode_lightyear_delta(prev_states, &states, tick.saturating_sub(1))?; + let lightyear_elapsed = lightyear_start.elapsed(); + let lightyear_us = lightyear_elapsed.as_micros() as u64; + lightyear_delta.add_with_tick(lightyear_bytes as u64, lightyear_us, tick); + } if cli.debug_burst_ticks && burst_now { println!( "burst tick {}: sdec_us={} sdec_bytes={} naive_us={} naive_bytes={}", @@ -226,9 +288,9 @@ fn main() -> Result<()> { graph.clear_dirty(); graph.clear_removed(); } - last_states = Some(states.clone()); } + last_states = Some(states.clone()); baseline_snapshot = snapshot; } @@ -242,10 +304,15 @@ fn main() -> Result<()> { &cli, full_count, full_bytes_total, - full_bincode_bytes_total, sdec, sdec_dirty, naive, + bincode_delta, + bincode_full, + lightyear_bitcode, + lightyear_delta, + sdec_full_session, + sdec_session_init_bytes, per_client_stats, ); @@ -285,6 +352,36 @@ fn encode_full( Ok(buf) } +fn encode_full_snapshot_session_header( + snapshot: &codec::Snapshot, + session: &mut codec::SessionState, + full_bytes_len: usize, +) -> Result { + let payload_len = full_bytes_len.saturating_sub(wire::HEADER_SIZE); + let tick = snapshot.tick.raw(); + let last_tick = session.last_tick.raw(); + if tick <= last_tick { + anyhow::bail!( + "session header tick {} is not after last {}", + tick, + last_tick + ); + } + let tick_delta = tick - last_tick; + let baseline_delta = tick; + let mut header_buf = [0u8; wire::SESSION_MAX_HEADER_SIZE]; + let header_len = wire::encode_session_header( + &mut header_buf, + wire::SessionFlags::full_snapshot(), + tick_delta, + baseline_delta, + payload_len as u32, + ) + .context("encode session header")?; + session.last_tick = snapshot.tick; + Ok(header_len + payload_len) +} + fn encode_delta_with_scratch( schema: &schema::Schema, baseline: &codec::Snapshot, @@ -362,6 +459,66 @@ impl ClientBreakdown { } } +#[derive(Default)] +struct SizeTimeStats { + sizes: Vec, + encode_us: Vec, + total_bytes: u64, + count: u32, +} + +impl SizeTimeStats { + fn add(&mut self, bytes: u64, encode_us: u64) { + self.total_bytes += bytes; + self.count += 1; + self.sizes.push(bytes); + if encode_us > 0 { + self.encode_us.push(encode_us); + } + } + + fn finalize(&mut self) -> SizeTimeSummary { + let avg_bytes = if self.count > 0 { + self.total_bytes / self.count as u64 + } else { + 0 + }; + let p95_bytes = if self.sizes.is_empty() { + 0 + } else { + p95(&mut self.sizes) + }; + let avg_encode = if self.encode_us.is_empty() { + 0 + } else { + self.encode_us.iter().sum::() / self.encode_us.len() as u64 + }; + let p95_encode = if self.encode_us.is_empty() { + 0 + } else { + p95(&mut self.encode_us) + }; + SizeTimeSummary { + count: self.count, + bytes_total: self.total_bytes, + bytes_avg: avg_bytes, + bytes_p95: p95_bytes, + encode_us_avg: avg_encode, + encode_us_p95: p95_encode, + } + } +} + +#[derive(Debug, Serialize)] +struct SizeTimeSummary { + count: u32, + bytes_total: u64, + bytes_avg: u64, + bytes_p95: u64, + encode_us_avg: u64, + encode_us_p95: u64, +} + fn encode_bincode_snapshot(states: &[DemoEntityState]) -> Result { let snapshot = SerdeSnapshot { entities: states @@ -379,6 +536,84 @@ fn encode_bincode_snapshot(states: &[DemoEntityState]) -> Result { Ok(bytes.len()) } +fn encode_bitcode_snapshot(states: &[DemoEntityState]) -> Result { + let snapshot = SerdeSnapshot { + entities: states + .iter() + .map(|state| SerdeEntity { + id: state.id.raw(), + pos_q: state.pos_q, + vel_q: state.vel_q, + yaw: state.yaw, + flags: state.flags, + }) + .collect(), + }; + let bytes = bitcode::encode(&snapshot); + Ok(bytes.len()) +} + +fn encode_bincode_delta( + schema: &schema::Schema, + baseline: &codec::Snapshot, + current: &codec::Snapshot, +) -> Result { + let mut entities = Vec::new(); + for entity in ¤t.entities { + let base = baseline.entities.iter().find(|e| e.id == entity.id); + if let Some(base) = base { + let changed_fields = diff_entity_fields(schema, base, entity)?; + if !changed_fields.is_empty() { + let fields = changed_fields + .into_iter() + .map(|(idx, value)| SerdeDeltaField { + idx: idx as u16, + value: serde_field_value(value), + }) + .collect(); + entities.push(SerdeDeltaEntity { + id: entity.id.raw(), + fields, + }); + } + } + } + let snapshot = SerdeDeltaSnapshot { entities }; + let bytes = bincode::serialize(&snapshot).context("bincode delta")?; + Ok(bytes.len()) +} + +fn encode_lightyear_delta( + prev_states: &[DemoEntityState], + states: &[DemoEntityState], + prev_tick: u32, +) -> Result { + if prev_states.len() != states.len() { + anyhow::bail!("lightyear delta requires matching state lengths"); + } + let mut entities = Vec::new(); + let previous_tick = Tick(prev_tick as u16); + for (prev, curr) in prev_states.iter().zip(states.iter()) { + let prev_state = LightyearState::from(prev); + let curr_state = LightyearState::from(curr); + let delta = prev_state.diff(&curr_state); + if delta.mask != 0 { + entities.push(LightyearDeltaEntity { + id: curr.id.raw(), + delta: LightyearDeltaMessage { + delta_type: DeltaType::Normal { previous_tick }, + delta, + }, + }); + } + } + let snapshot = LightyearDeltaSnapshot { entities }; + let mut writer = Writer::with_capacity(256); + let serializer = SerializeFns::::default(); + (serializer.serialize)(&snapshot, &mut writer).context("lightyear delta serialize")?; + Ok(writer.len()) +} + fn encode_naive_delta( schema: &schema::Schema, baseline: &codec::Snapshot, @@ -1138,7 +1373,12 @@ struct Summary { sdec: EncoderSummary, sdec_dirty: EncoderSummary, delta_naive: EncoderSummary, + delta_bincode: EncoderSummary, + lightyear_delta: EncoderSummary, + sdec_full_session: SizeTimeSummary, + sdec_session_init_bytes: u64, full_bincode: FullSummary, + lightyear_bitcode: SizeTimeSummary, per_client: Option, } @@ -1148,22 +1388,32 @@ impl Summary { cli: &Cli, full_count: u32, full_bytes_total: u64, - full_bincode_bytes_total: u64, mut sdec: EncoderStats, mut sdec_dirty: EncoderStats, mut naive: EncoderStats, + mut bincode_delta: EncoderStats, + mut bincode_full: SizeTimeStats, + mut lightyear_bitcode: SizeTimeStats, + mut lightyear_delta: EncoderStats, + mut sdec_full_session: SizeTimeStats, + sdec_session_init_bytes: u64, per_client_stats: Option, ) -> Self { let sdec_summary = sdec.finalize(); let sdec_dirty_summary = sdec_dirty.finalize(); let naive_summary = naive.finalize(); + let bincode_delta_summary = bincode_delta.finalize(); + let bincode_full_summary = bincode_full.finalize(); + let lightyear_bitcode_summary = lightyear_bitcode.finalize(); + let lightyear_delta_summary = lightyear_delta.finalize(); + let sdec_full_session_summary = sdec_full_session.finalize(); let avg_full = if cli.ticks > 0 { full_bytes_total / cli.ticks as u64 } else { 0 }; let avg_bincode = if cli.ticks > 0 { - full_bincode_bytes_total / cli.ticks as u64 + bincode_full_summary.bytes_total / cli.ticks as u64 } else { 0 }; @@ -1184,13 +1434,21 @@ impl Summary { sdec: sdec_summary, sdec_dirty: sdec_dirty_summary, delta_naive: naive_summary, + delta_bincode: bincode_delta_summary, + lightyear_delta: lightyear_delta_summary, + sdec_full_session: sdec_full_session_summary, + sdec_session_init_bytes, full_bincode: FullSummary { full_count, full_bytes_total, - full_bincode_bytes_total, + full_bincode_bytes_total: bincode_full_summary.bytes_total, avg_full_bytes: avg_full, avg_full_bincode_bytes: avg_bincode, + full_bincode_p95_bytes: bincode_full_summary.bytes_p95, + full_bincode_encode_us_avg: bincode_full_summary.encode_us_avg, + full_bincode_encode_us_p95: bincode_full_summary.encode_us_p95, }, + lightyear_bitcode: lightyear_bitcode_summary, per_client, } } @@ -1272,6 +1530,9 @@ struct FullSummary { full_bincode_bytes_total: u64, avg_full_bytes: u64, avg_full_bincode_bytes: u64, + full_bincode_p95_bytes: u64, + full_bincode_encode_us_avg: u64, + full_bincode_encode_us_p95: u64, } #[derive(Debug, Serialize)] @@ -1310,12 +1571,12 @@ impl Rng { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, bitcode::Encode, bitcode::Decode)] struct SerdeSnapshot { entities: Vec, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, bitcode::Encode, bitcode::Decode)] struct SerdeEntity { id: u32, pos_q: [i64; 3], @@ -1323,3 +1584,201 @@ struct SerdeEntity { yaw: u16, flags: [bool; 3], } + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct LightyearState { + pos_q: [i64; 3], + vel_q: [i64; 3], + yaw: u16, + flags: [bool; 3], +} + +impl From<&DemoEntityState> for LightyearState { + fn from(state: &DemoEntityState) -> Self { + Self { + pos_q: state.pos_q, + vel_q: state.vel_q, + yaw: state.yaw, + flags: state.flags, + } + } +} + +#[derive(Serialize, Deserialize)] +struct LightyearDeltaSnapshot { + entities: Vec, +} + +#[derive(Serialize, Deserialize)] +struct LightyearDeltaEntity { + id: u32, + delta: LightyearDeltaMessage, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct LightyearDeltaMessage { + delta_type: DeltaType, + delta: LightyearDelta, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct LightyearDelta { + mask: u16, + values: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +enum LightyearValue { + FixedPoint(i64), + UInt(u16), + Bool(bool), +} + +impl Diffable for LightyearState { + fn base_value() -> Self { + Self { + pos_q: [0; 3], + vel_q: [0; 3], + yaw: 0, + flags: [false; 3], + } + } + + fn diff(&self, new: &Self) -> LightyearDelta { + let mut mask = 0u16; + let mut values = Vec::new(); + if self.pos_q[0] != new.pos_q[0] { + mask |= 1 << 0; + values.push(LightyearValue::FixedPoint(new.pos_q[0])); + } + if self.pos_q[1] != new.pos_q[1] { + mask |= 1 << 1; + values.push(LightyearValue::FixedPoint(new.pos_q[1])); + } + if self.pos_q[2] != new.pos_q[2] { + mask |= 1 << 2; + values.push(LightyearValue::FixedPoint(new.pos_q[2])); + } + if self.vel_q[0] != new.vel_q[0] { + mask |= 1 << 3; + values.push(LightyearValue::FixedPoint(new.vel_q[0])); + } + if self.vel_q[1] != new.vel_q[1] { + mask |= 1 << 4; + values.push(LightyearValue::FixedPoint(new.vel_q[1])); + } + if self.vel_q[2] != new.vel_q[2] { + mask |= 1 << 5; + values.push(LightyearValue::FixedPoint(new.vel_q[2])); + } + if self.yaw != new.yaw { + mask |= 1 << 6; + values.push(LightyearValue::UInt(new.yaw)); + } + if self.flags[0] != new.flags[0] { + mask |= 1 << 7; + values.push(LightyearValue::Bool(new.flags[0])); + } + if self.flags[1] != new.flags[1] { + mask |= 1 << 8; + values.push(LightyearValue::Bool(new.flags[1])); + } + if self.flags[2] != new.flags[2] { + mask |= 1 << 9; + values.push(LightyearValue::Bool(new.flags[2])); + } + LightyearDelta { mask, values } + } + + fn apply_diff(&mut self, delta: &LightyearDelta) { + let mut values = delta.values.iter(); + if delta.mask & (1 << 0) != 0 { + if let Some(LightyearValue::FixedPoint(value)) = values.next() { + self.pos_q[0] = *value; + } + } + if delta.mask & (1 << 1) != 0 { + if let Some(LightyearValue::FixedPoint(value)) = values.next() { + self.pos_q[1] = *value; + } + } + if delta.mask & (1 << 2) != 0 { + if let Some(LightyearValue::FixedPoint(value)) = values.next() { + self.pos_q[2] = *value; + } + } + if delta.mask & (1 << 3) != 0 { + if let Some(LightyearValue::FixedPoint(value)) = values.next() { + self.vel_q[0] = *value; + } + } + if delta.mask & (1 << 4) != 0 { + if let Some(LightyearValue::FixedPoint(value)) = values.next() { + self.vel_q[1] = *value; + } + } + if delta.mask & (1 << 5) != 0 { + if let Some(LightyearValue::FixedPoint(value)) = values.next() { + self.vel_q[2] = *value; + } + } + if delta.mask & (1 << 6) != 0 { + if let Some(LightyearValue::UInt(value)) = values.next() { + self.yaw = *value; + } + } + if delta.mask & (1 << 7) != 0 { + if let Some(LightyearValue::Bool(value)) = values.next() { + self.flags[0] = *value; + } + } + if delta.mask & (1 << 8) != 0 { + if let Some(LightyearValue::Bool(value)) = values.next() { + self.flags[1] = *value; + } + } + if delta.mask & (1 << 9) != 0 { + if let Some(LightyearValue::Bool(value)) = values.next() { + self.flags[2] = *value; + } + } + } +} + +#[derive(Debug, Serialize)] +struct SerdeDeltaSnapshot { + entities: Vec, +} + +#[derive(Debug, Serialize)] +struct SerdeDeltaEntity { + id: u32, + fields: Vec, +} + +#[derive(Debug, Serialize)] +struct SerdeDeltaField { + idx: u16, + value: SerdeFieldValue, +} + +#[derive(Debug, Serialize)] +enum SerdeFieldValue { + Bool(bool), + UInt(u64), + SInt(i64), + VarUInt(u64), + VarSInt(i64), + FixedPoint(i64), +} + +fn serde_field_value(value: codec::FieldValue) -> SerdeFieldValue { + match value { + codec::FieldValue::Bool(value) => SerdeFieldValue::Bool(value), + codec::FieldValue::UInt(value) => SerdeFieldValue::UInt(value), + codec::FieldValue::SInt(value) => SerdeFieldValue::SInt(value), + codec::FieldValue::VarUInt(value) => SerdeFieldValue::VarUInt(value), + codec::FieldValue::VarSInt(value) => SerdeFieldValue::VarSInt(value), + codec::FieldValue::FixedPoint(value) => SerdeFieldValue::FixedPoint(value), + } +}