Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ jobs:
with:
name: cartesi-rollups-prt-node-linux-${{ matrix.arch }}

- name: Change node binary file permissions
run: chmod 755 cartesi-rollups-prt-node

- name: Compress node binary
run: tar -czf "$FILEPATH" cartesi-rollups-prt-node
env:
Expand Down
22 changes: 21 additions & 1 deletion cartesi-rollups/node/blockchain-reader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,10 @@ async fn latest_finalized_block(
Ok(block_number)
}

fn should_retry_with_partition(err: &Error, long_block_range_error_codes: &Vec<String>) -> bool {
fn should_retry_with_partition(
err: &impl std::error::Error,
long_block_range_error_codes: &Vec<String>,
) -> bool {
for code in long_block_range_error_codes {
let s = format!("{:?}", err);
if s.contains(&code.to_string()) {
Expand Down Expand Up @@ -804,4 +807,21 @@ mod blockchain_reader_tests {

Ok(())
}

#[tokio::test]
async fn test_should_retry() -> Result<()> {
let s = r###"Error: HTTP error 400 with body: {"jsonrpc":"2.0","id":3,"error":{"code":-32600,"message":"You can make eth_getLogs requests with up to a 10000 block range. Based on your parameters, this block range should work: [0x1754746, 0x1756e55]"}}"###;

assert!(should_retry_with_partition(
&std::io::Error::other(s),
&vec![
"-32005".to_string(),
"-32600".to_string(),
"-32602".to_string(),
"-32616".to_string()
]
));

Ok(())
}
}
1 change: 1 addition & 0 deletions cartesi-rollups/node/cartesi-rollups-prt-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub fn create_epoch_manager_task(watch: Watch, parameters: &PRTConfig) -> thread
params.address_book.consensus,
state_manager,
params.sleep_duration,
params.long_block_range_error_codes.clone(),
);

epoch_manager.execution_loop(inner_watch, provider).await
Expand Down
4 changes: 4 additions & 0 deletions cartesi-rollups/node/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct EpochManager<AS: ArenaSender, SM: StateManager> {
arena_sender: Arc<Mutex<AS>>,
consensus: Address,
sleep_duration: Duration,
long_block_range_error_codes: Vec<String>,
state_manager: SM,
last_react_epoch: (Option<Player<AS>>, u64),
}
Expand All @@ -35,11 +36,13 @@ impl<AS: ArenaSender, SM: StateManager> EpochManager<AS, SM> {
consensus_address: Address,
state_manager: SM,
sleep_duration: Duration,
long_block_range_error_codes: Vec<String>,
) -> Self {
Self {
arena_sender,
consensus: consensus_address,
sleep_duration,
long_block_range_error_codes,
state_manager,
last_react_epoch: (None, 0),
}
Expand Down Expand Up @@ -190,6 +193,7 @@ impl<AS: ArenaSender, SM: StateManager> EpochManager<AS, SM> {
snapshot.to_string_lossy().to_string(),
last_sealed_epoch.root_tournament,
last_sealed_epoch.block_created_number,
self.long_block_range_error_codes.clone(),
self.state_manager
.epoch_directory(last_sealed_epoch.epoch_number)?,
)
Expand Down
7 changes: 6 additions & 1 deletion prt/client-rs/core/src/strategy/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ impl<AS: ArenaSender> Player<AS> {
machine_path: String,
root_tournament: Address,
block_created_number: u64,
long_block_range_error_codes: Vec<String>,
state_dir: PathBuf,
) -> Result<Self> {
let db = DisputeStateAccess::new(inputs, leafs, root_tournament.to_string(), state_dir)?;
let reader = StateReader::new(provider.clone(), block_created_number)?;
let reader = StateReader::new(
provider.clone(),
block_created_number,
long_block_range_error_codes,
)?;
let gc = GarbageCollector::new(arena_sender.clone(), root_tournament);
let commitment_builder = MachineCommitmentBuilder::new(machine_path.clone());
Ok(Self {
Expand Down
185 changes: 153 additions & 32 deletions prt/client-rs/core/src/tournament/reader.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! This module defines the struct [StateReader] that is responsible for the reading the states
//! of tournaments

use anyhow::Result;
use anyhow::{Result, anyhow};
use async_recursion::async_recursion;
use std::collections::HashMap;

use alloy::{
contract::{Error, Event},
eips::BlockNumberOrTag::Latest,
providers::{DynProvider, Provider},
rpc::types::{Log, Topic},
sol_types::SolEvent,
sol_types::private::{Address, B256},
};

Expand All @@ -22,34 +25,74 @@ use cartesi_prt_contracts::tournament;
pub struct StateReader {
client: DynProvider,
block_created_number: u64,
long_block_range_error_codes: Vec<String>,
}

impl StateReader {
pub fn new(client: DynProvider, block_created_number: u64) -> Result<Self> {
pub fn new(
client: DynProvider,
block_created_number: u64,
long_block_range_error_codes: Vec<String>,
) -> Result<Self> {
Ok(Self {
client,
block_created_number,
long_block_range_error_codes,
})
}

async fn latest_block_number(&self) -> Result<u64> {
let block_number = self
.client
.get_block(Latest.into())
.await?
.expect("cannot get last block")
.header
.number;

Ok(block_number)
}

async fn query_events<E: SolEvent + Send + Sync>(
&self,
topic1: Option<&Topic>,
read_from: &Address,
) -> Result<Vec<(E, Log)>> {
let latest_block = self.latest_block_number().await?;

if latest_block < self.block_created_number {
return Ok(vec![]);
}

get_events(
&self.client,
topic1,
read_from,
self.block_created_number,
latest_block,
&self.long_block_range_error_codes,
)
.await
.map_err(|errors| anyhow!("{errors:?}"))
}

async fn created_tournament(
&self,
tournament_address: Address,
match_id: MatchID,
) -> Result<Option<TournamentCreatedEvent>> {
let tournament = tournament::Tournament::new(tournament_address, &self.client);
let events = tournament
.NewInnerTournament_filter()
.address(tournament_address)
.topic1::<B256>(match_id.hash().into())
.from_block(self.block_created_number)
.to_block(Latest)
.query()
let topic1: Topic = B256::from(match_id.hash()).into();
let events = self
.query_events::<tournament::Tournament::NewInnerTournament>(
Some(&topic1),
&tournament_address,
)
.await?;
if let Some(event) = events.last() {

if let Some((event, _)) = events.last() {
Ok(Some(TournamentCreatedEvent {
parent_match_id_hash: match_id.hash(),
new_tournament_address: event.0.childTournament,
new_tournament_address: event.childTournament,
}))
} else {
Ok(None)
Expand Down Expand Up @@ -91,21 +134,16 @@ impl StateReader {
}

async fn created_matches(&self, tournament_address: Address) -> Result<Vec<MatchCreatedEvent>> {
let tournament = tournament::Tournament::new(tournament_address, &self.client);
let events: Vec<MatchCreatedEvent> = tournament
.MatchCreated_filter()
.address(tournament_address)
.from_block(self.block_created_number)
.to_block(Latest)
.query()
let events: Vec<MatchCreatedEvent> = self
.query_events::<tournament::Tournament::MatchCreated>(None, &tournament_address)
.await?
.iter()
.map(|event| MatchCreatedEvent {
.map(|(event, _)| MatchCreatedEvent {
id: MatchID {
commitment_one: event.0.one.into(),
commitment_two: event.0.two.into(),
commitment_one: event.one.into(),
commitment_two: event.two.into(),
},
left_hash: event.0.leftOfTwo.into(),
left_hash: event.leftOfTwo.into(),
})
.collect();
Ok(events)
Expand All @@ -115,17 +153,12 @@ impl StateReader {
&self,
tournament_address: Address,
) -> Result<Vec<CommitmentJoinedEvent>> {
let tournament = tournament::Tournament::new(tournament_address, &self.client);
let events = tournament
.CommitmentJoined_filter()
.address(tournament_address)
.from_block(self.block_created_number)
.to_block(Latest)
.query()
let events = self
.query_events::<tournament::Tournament::CommitmentJoined>(None, &tournament_address)
.await?
.iter()
.map(|c| CommitmentJoinedEvent {
root: c.0.commitment.into(),
.map(|(event, _)| CommitmentJoinedEvent {
root: event.commitment.into(),
})
.collect();
Ok(events)
Expand Down Expand Up @@ -332,3 +365,91 @@ pub struct MatchCreatedEvent {
pub id: MatchID,
pub left_hash: Digest,
}

// Below is a simplified version originated from https://github.com/cartesi/state-fold
// ParitionProvider will attempt to fetch events in smaller partition if the original request is too large
#[async_recursion]
async fn get_events<E: SolEvent + Send + Sync>(
provider: &impl Provider,
topic1: Option<&Topic>,
read_from: &Address,
start_block: u64,
end_block: u64,
long_block_range_error_codes: &Vec<String>,
) -> std::result::Result<Vec<(E, Log)>, Vec<Error>> {
let event: Event<_, _, _> = {
let mut e = Event::new_sol(provider, read_from)
.from_block(start_block)
.to_block(end_block)
.event(E::SIGNATURE);

if let Some(t) = topic1 {
e = e.topic1(t.clone());
}

e
};

match event.query().await {
Ok(l) => Ok(l),
Err(e) => {
if should_retry_with_partition(&e, long_block_range_error_codes) {
let middle = {
let blocks = 1 + end_block - start_block;
let half = blocks / 2;
start_block + half - 1
};

let first_res = get_events(
provider,
topic1,
read_from,
start_block,
middle,
long_block_range_error_codes,
)
.await;

let second_res = get_events(
provider,
topic1,
read_from,
middle + 1,
end_block,
long_block_range_error_codes,
)
.await;

match (first_res, second_res) {
(Ok(mut first), Ok(second)) => {
first.extend(second);
Ok(first)
}

(Err(mut first), Err(second)) => {
first.extend(second);
Err(first)
}

(Err(err), _) | (_, Err(err)) => Err(err),
}
} else {
Err(vec![e])
}
}
}
}

fn should_retry_with_partition(
err: &impl std::error::Error,
long_block_range_error_codes: &Vec<String>,
) -> bool {
for code in long_block_range_error_codes {
let s = format!("{:?}", err);
if s.contains(&code.to_string()) {
return true;
}
}

false
}
2 changes: 0 additions & 2 deletions prt/contracts/src/ITournament.sol
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,6 @@ interface ITournament {
view
returns (TournamentArguments memory);

/// @notice Returns non-root tournament arguments

/// @notice Check whether a match can be won by timeout.
/// @param matchId The match ID
function canWinMatchByTimeout(Match.Id calldata matchId)
Expand Down