diff --git a/openhands/events/stream.py b/openhands/events/stream.py index 8cff229c051..1e4c3b9d539 100644 --- a/openhands/events/stream.py +++ b/openhands/events/stream.py @@ -1,5 +1,6 @@ import asyncio import threading +from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Callable, Iterable @@ -29,24 +30,17 @@ def session_exists(sid: str, file_store: FileStore) -> bool: return False +@dataclass class EventStream: sid: str file_store: FileStore # For each subscriber ID, there is a stack of callback functions - useful # when there are agent delegates - _subscribers: dict[str, list[Callable]] - _cur_id: int - _lock: threading.Lock - - def __init__(self, sid: str, file_store: FileStore): - self.sid = sid - self.file_store = file_store - self._subscribers = {} - self._cur_id = 0 - self._lock = threading.Lock() - self._reinitialize_from_file_store() + _subscribers: dict[str, list[Callable]] = field(default_factory=dict) + _cur_id: int = 0 + _lock: threading.Lock = field(default_factory=threading.Lock) - def _reinitialize_from_file_store(self) -> None: + def __post_init__(self) -> None: try: events = self.file_store.list(f'sessions/{self.sid}/events') except FileNotFoundError: @@ -173,4 +167,4 @@ def clear(self): self.file_store.delete(f'sessions/{self.sid}') self._cur_id = 0 # self._subscribers = {} - self._reinitialize_from_file_store() + self.__post_init__()