[docs]classBaseStrategy(ABC):""" Abstract base class for the trading strategy in the new event-driven backtesting engine. Strategies process MarketEvents and generate SignalEvents. """def__init__(self,symbol:str,**kwargs:Any):ifnotsymbol:raiseValueError("Strategy must be initialized with a target symbol.")self.event_queue:Optional[EventQueue]=Noneself.symbol:str=symbolself.params:Dict[str,Any]=kwargslogging.info(f"{self.__class__.__name__} initialized for {symbol} with params: {kwargs}")
[docs]@abstractmethoddefon_market_event(self,event:MarketEvent):""" Called when a new MarketEvent (e.g. a new bar of data) is received. Strategies should implement their trading logic here. """pass
def_put_signal_event(self,timestamp:datetime,direction:Signal):ifself.event_queueisNone:raiseRuntimeError("Event queue not set for strategy. Call set_event_queue() first.")signal_event=SignalEvent(symbol=self.symbol,timestamp=timestamp,direction=direction)self.event_queue.put(signal_event)logging.debug(f"Strategy for {self.symbol} issued {direction} signal on {timestamp.date()}.")