Source code for alpheast.events.event_queue


import logging
import queue
from typing import Optional

from alpheast.events.event import Event


[docs] class EventQueue: """ A synchronized queue for managing events in the event-driven backtesting system. """ def __init__(self): self._queue = queue.Queue() logging.info(f"EventQueue initialized.")
[docs] def put(self, event: Event): self._queue.put(event)
[docs] def get(self) -> Optional[Event]: try: return self._queue.get(block=False) except queue.Empty: return None
[docs] def empty(self) -> bool: return self._queue.empty()