Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "3.0.6"
version = "3.0.7"
edition = "2024"

[[bin]]
Expand Down
12 changes: 11 additions & 1 deletion src/agent/services/lazer_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ mod lazer_exporter {
// consume immediate tick
publish_interval.tick().await;

let mut last_sent_timestamps: HashMap<pyth_sdk::Identifier, chrono::NaiveDateTime> =
HashMap::new();

loop {
tokio::select! {
_ = publish_interval.tick() => {
Expand All @@ -434,6 +437,12 @@ mod lazer_exporter {
// TODO: This read locks and clones local::Store::prices, which may not meet performance needs.
for (identifier, price_info) in state.get_all_price_infos().await {
if let Some(symbol) = lazer_symbols.get(&identifier) {
if let Some(last_timestamp) = last_sent_timestamps.get(&identifier) {
if price_info.timestamp <= *last_timestamp {
continue;
}
}

let source_timestamp_micros = price_info.timestamp.and_utc().timestamp_micros();
let source_timestamp = MessageField::some(Timestamp {
seconds: source_timestamp_micros / 1_000_000,
Expand All @@ -449,7 +458,8 @@ mod lazer_exporter {
..PriceUpdate::default()
})),
special_fields: Default::default(),
})
});
last_sent_timestamps.insert(identifier, price_info.timestamp);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 last_sent_timestamps updated before transaction is serialized and sent, causing price updates to be silently dropped on failure

The last_sent_timestamps map is updated eagerly at line 462, inside the per-price loop, before the batch transaction is serialized (line 475) and broadcast to relayers (line 496). If serialization fails (write_to_bytes error at line 475-479) or the broadcast send fails (no active receivers at line 496-500), the code continues or logs an error, but the timestamps have already been recorded as "sent". On the next publish tick, these price updates will be skipped by the dedup check at line 440-443 (price_info.timestamp <= *last_timestamp), even though they were never actually delivered.

Root Cause and Impact

The root cause is that last_sent_timestamps.insert(identifier, price_info.timestamp) at line 462 is executed during the collection phase, not after successful delivery. The timestamps should only be committed after the transaction has been successfully sent.

For example, if write_to_bytes() fails at line 475:

Err(e) => {
    tracing::warn!("Failed to encode Lazer transaction to bytes: {:?}", e);
    continue; // timestamps already recorded!
}

Or if relayer_sender.send() fails at line 496 (e.g., no active receivers):

Err(e) => {
    tracing::error!("Error sending transaction to relayer receivers: {e}");
    // timestamps already recorded, these updates are lost
}

Impact: Price updates are permanently lost until a newer-timestamped update arrives for each affected feed. In a scenario where the broadcast channel has no receivers (all relayer connections are down), every price update will be marked as sent but never delivered, causing a complete data gap.

Prompt for agents
In src/agent/services/lazer_exporter.rs, the last_sent_timestamps.insert() call at line 462 should be moved to after the transaction is successfully sent. One approach: collect the timestamps to update into a temporary HashMap or Vec during the for loop (lines 438-464), and only apply them to last_sent_timestamps after the relayer_sender.send() succeeds at line 496. For example, replace line 462 with collecting into a local variable like `pending_timestamps.insert(identifier, price_info.timestamp)`, and then after the successful send at line 497, iterate over pending_timestamps and insert them into last_sent_timestamps. Make sure the pending_timestamps are NOT applied when serialization fails (line 475-479 continue) or when the broadcast send fails (line 498-500).
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}
}

Expand Down
Loading