diff --git a/tildes/tildes/lib/event_stream.py b/tildes/tildes/lib/event_stream.py index 8375841..cf99607 100644 --- a/tildes/tildes/lib/event_stream.py +++ b/tildes/tildes/lib/event_stream.py @@ -13,6 +13,7 @@ from redis import Redis, ResponseError from tildes.lib.database import get_session_from_config REDIS_KEY_PREFIX = "event_stream:" +MAX_RETRIES_PER_MESSAGE = 3 class Message: @@ -45,7 +46,11 @@ class EventStreamConsumer: """ def __init__( - self, consumer_group: str, source_streams: Sequence[str], uses_db: bool = True, + self, + consumer_group: str, + source_streams: Sequence[str], + uses_db: bool = True, + skip_pending: bool = False, ): """Initialize a new consumer, creating consumer groups and streams if needed.""" ini_file_path = os.environ["INI_FILE"] @@ -78,21 +83,23 @@ class EventStreamConsumer: else: self.db_session = None + # start by reading any already-pending messages by default + self.is_reading_pending = not skip_pending + def consume_streams(self) -> None: """Process messages from the streams indefinitely.""" while True: - # Get any messages from the source streams that haven't already been - # delivered to a consumer in this group - will fetch a maximum of one - # message from each stream, and block indefinitely if none are available - response = self.redis.xreadgroup( - self.consumer_group, - self.name, - {stream: ">" for stream in self.source_streams}, - count=1, - block=0, - ) + if self.is_reading_pending: + # clear out any persistently-failing messages first + self._clear_dead_messages() - messages = self._xreadgroup_response_to_messages(response) + messages = self._get_messages(pending=True) + + # when no pending messages are left, switch to waiting for new ones + if not messages: + self.is_reading_pending = False + else: + messages = self._get_messages(pending=False) for message in messages: self.process_message(message) @@ -103,6 +110,39 @@ class EventStreamConsumer: message.ack(self.consumer_group) + def _clear_dead_messages(self) -> None: + """Clear any pending messages that have failed too many times. + + If a message seems to be failing consistently, this will use XCLAIM to transfer + ownership to a fake "consumer" named -dead. Pending + messages for -dead consumers should be monitored and inspected manually to + determine why they were unable to be processed. + """ + # for each stream, use XPENDING to check for messages that have been delivered + # repeatedly (indicating that they're failing consistently) + for stream in self.source_streams: + response = self.redis.xpending_range( + stream, + self.consumer_group, + min="-", + max="+", + count=100, # there shouldn't ever be more than one, but won't hurt + consumername=self.name, + ) + + for entry in response: + # if it hasn't had many attempts yet, leave it pending to try again + if entry["times_delivered"] < MAX_RETRIES_PER_MESSAGE: + continue + + self.redis.xclaim( + stream, + self.consumer_group, + f"{self.consumer_group}-dead", + min_idle_time=0, # shouldn't have to worry about race condition + message_ids=[entry["message_id"]], + ) + def _xreadgroup_response_to_messages(self, response: Any) -> List[Message]: """Convert a response from XREADGROUP to a list of Messages.""" messages = [] @@ -126,6 +166,34 @@ class EventStreamConsumer: return messages + def _get_messages(self, pending: bool = False) -> List[Message]: + """Get any messages from the streams for this consumer. + + This method will return at most one message from each of the source streams per + call. + + If pending is True, the messages will be ones previously delivered to this + consumer but not acked. + + If pending is False, messages will be ones that haven't been delivered to any + consumer in this group, and this method will block indefinitely until there are + messages available. + """ + if pending: + message_id = "0" + else: + message_id = ">" + + response = self.redis.xreadgroup( + self.consumer_group, + self.name, + {stream: message_id for stream in self.source_streams}, + count=1, + block=0, + ) + + return self._xreadgroup_response_to_messages(response) + @abstractmethod def process_message(self, message: Message) -> None: """Process a message from the stream (subclasses must implement)."""