From 1445d753d949b4aeb1e3f931ba049185ad889bf3 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Wed, 14 Jan 2026 10:23:12 -0800 Subject: [PATCH 1/3] increase sql retries --- eval_protocol/event_bus/sqlite_event_bus_database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eval_protocol/event_bus/sqlite_event_bus_database.py b/eval_protocol/event_bus/sqlite_event_bus_database.py index 5086d6e3..59a026ed 100644 --- a/eval_protocol/event_bus/sqlite_event_bus_database.py +++ b/eval_protocol/event_bus/sqlite_event_bus_database.py @@ -11,8 +11,8 @@ # Retry configuration for database operations -SQLITE_RETRY_MAX_TRIES = 5 -SQLITE_RETRY_MAX_TIME = 30 # seconds +SQLITE_RETRY_MAX_TRIES = 10 +SQLITE_RETRY_MAX_TIME = 60 # seconds def _is_database_locked_error(e: Exception) -> bool: From d4a445b0ad425d4ddfb3c7ecde367c66768eb8f7 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Thu, 15 Jan 2026 16:02:27 -0800 Subject: [PATCH 2/3] make connection more robust --- eval_protocol/dataset_logger/sqlite_evaluation_row_store.py | 5 +++-- eval_protocol/event_bus/sqlite_event_bus_database.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py b/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py index f6a81e1e..2b9885c7 100644 --- a/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py +++ b/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py @@ -42,9 +42,10 @@ class EvaluationRow(BaseModel): # type: ignore self._EvaluationRow = EvaluationRow - self._db.connect() + # Wrap connect() in retry logic since setting pragmas can fail with "database is locked" + execute_with_sqlite_retry(lambda: self._db.connect(reuse_if_open=True)) # Use safe=True to avoid errors when tables/indexes already exist - self._db.create_tables([EvaluationRow], safe=True) + execute_with_sqlite_retry(lambda: self._db.create_tables([EvaluationRow], safe=True)) @property def db_path(self) -> str: diff --git a/eval_protocol/event_bus/sqlite_event_bus_database.py b/eval_protocol/event_bus/sqlite_event_bus_database.py index 59a026ed..122fbac9 100644 --- a/eval_protocol/event_bus/sqlite_event_bus_database.py +++ b/eval_protocol/event_bus/sqlite_event_bus_database.py @@ -181,9 +181,10 @@ class Event(BaseModel): # type: ignore processed = BooleanField(default=False) # Track if event has been processed self._Event = Event - self._db.connect() + # Wrap connect() in retry logic since setting pragmas can fail with "database is locked" + execute_with_sqlite_retry(lambda: self._db.connect(reuse_if_open=True)) # Use safe=True to avoid errors when tables already exist - self._db.create_tables([Event], safe=True) + execute_with_sqlite_retry(lambda: self._db.create_tables([Event], safe=True)) def publish_event(self, event_type: str, data: Any, process_id: str) -> None: """Publish an event to the database.""" From 0843129175ecb707dbcbaf83afb5b6926f618c59 Mon Sep 17 00:00:00 2001 From: Dylan Huang Date: Fri, 16 Jan 2026 10:21:45 -0800 Subject: [PATCH 3/3] Implement robust database connection with retry logic to handle pragma execution failures --- .../sqlite_evaluation_row_store.py | 5 +- .../event_bus/sqlite_event_bus_database.py | 47 ++++++++++++++++++- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py b/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py index 2b9885c7..4bd66a48 100644 --- a/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py +++ b/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py @@ -7,6 +7,7 @@ from eval_protocol.event_bus.sqlite_event_bus_database import ( SQLITE_HARDENED_PRAGMAS, check_and_repair_database, + connect_with_retry, execute_with_sqlite_retry, ) from eval_protocol.models import EvaluationRow @@ -42,8 +43,8 @@ class EvaluationRow(BaseModel): # type: ignore self._EvaluationRow = EvaluationRow - # Wrap connect() in retry logic since setting pragmas can fail with "database is locked" - execute_with_sqlite_retry(lambda: self._db.connect(reuse_if_open=True)) + # Connect with retry logic that properly handles pragma execution failures + connect_with_retry(self._db) # Use safe=True to avoid errors when tables/indexes already exist execute_with_sqlite_retry(lambda: self._db.create_tables([EvaluationRow], safe=True)) diff --git a/eval_protocol/event_bus/sqlite_event_bus_database.py b/eval_protocol/event_bus/sqlite_event_bus_database.py index 122fbac9..4e56a060 100644 --- a/eval_protocol/event_bus/sqlite_event_bus_database.py +++ b/eval_protocol/event_bus/sqlite_event_bus_database.py @@ -55,6 +55,49 @@ def _execute() -> T: return _execute() +def connect_with_retry(db: SqliteDatabase) -> None: + """ + Connect to the database with retry logic, ensuring pragmas are always applied. + + Peewee's connect() method sets the connection state *before* executing pragmas + (in _initialize_connection). If pragma execution fails with "database is locked", + the connection is marked as open but pragmas are not applied. Subsequent calls + to connect(reuse_if_open=True) would see the connection as already open and + skip pragma execution entirely. + + This function handles this edge case by: + 1. Closing the connection if a lock error occurs during connect + 2. Retrying with exponential backoff until pragmas are successfully applied + + Args: + db: The SqliteDatabase instance to connect + """ + + @backoff.on_exception( + backoff.expo, + OperationalError, + max_tries=SQLITE_RETRY_MAX_TRIES, + max_time=SQLITE_RETRY_MAX_TIME, + giveup=lambda e: not _is_database_locked_error(e), + jitter=backoff.full_jitter, + ) + def _connect() -> None: + try: + # Close any partially-open connection before retrying to ensure + # a fresh connection is opened and pragmas are executed + if not db.is_closed(): + db.close() + db.connect() + except OperationalError: + # If connect fails (e.g., during pragma execution), ensure the + # connection is closed so the next retry starts fresh + if not db.is_closed(): + db.close() + raise + + _connect() + + # SQLite pragmas for hardened concurrency safety SQLITE_HARDENED_PRAGMAS = { "journal_mode": "wal", # Write-Ahead Logging for concurrent reads/writes @@ -181,8 +224,8 @@ class Event(BaseModel): # type: ignore processed = BooleanField(default=False) # Track if event has been processed self._Event = Event - # Wrap connect() in retry logic since setting pragmas can fail with "database is locked" - execute_with_sqlite_retry(lambda: self._db.connect(reuse_if_open=True)) + # Connect with retry logic that properly handles pragma execution failures + connect_with_retry(self._db) # Use safe=True to avoid errors when tables already exist execute_with_sqlite_retry(lambda: self._db.create_tables([Event], safe=True))