diff --git a/snuba/subscriptions/executor_consumer.py b/snuba/subscriptions/executor_consumer.py index 500725e1fe0..0d74b7e2762 100644 --- a/snuba/subscriptions/executor_consumer.py +++ b/snuba/subscriptions/executor_consumer.py @@ -5,6 +5,7 @@ import time from collections import deque from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import TimeoutError as FutureTimeoutError from datetime import datetime from typing import Deque, Mapping, Optional, Sequence, Tuple @@ -370,8 +371,16 @@ def join(self, timeout: Optional[float] = None) -> None: message, result_future = self.__queue.popleft() + try: + result = result_future.future.result(remaining) + except FutureTimeoutError: + logger.warning( + f"Timed out waiting for future, {len(self.__queue)} futures remaining in queue" + ) + break + transformed_message = self.__result_encoder.encode( - SubscriptionTaskResult(result_future.task, result_future.future.result(remaining)) + SubscriptionTaskResult(result_future.task, result) ) self.__next_step.submit(message.replace(transformed_message))