[docs]classDataHandler:""" A concrete data handler that fetches price data from the database via DatabaseDataRepository for a specified interval. Also pushes DailyUpdateEvents. """def__init__(self,event_queue:EventQueue,symbols:List[str],start_date:date,end_date:date,interval:Interval,data_source:DataSource,):self.event_queue=event_queueself.symbols=symbolsself.start_date=start_dateself.end_date=end_dateself.interval=intervalself.data_source=data_sourceself._load_data_from_data_source()self._all_data_df:pd.DataFrame=pd.DataFrame()self._df_iterator=Noneself._current_row_data=Noneself._has_more_data:bool=Falseself._last_processed_date:date=Noneself._last_processed_timestamp:Optional[datetime]=Nonelogging.info(f"DataHandler initialized for symbols {symbols} from {start_date} to {end_date} with interval {interval.value}")self._preprocess_data()
[docs]defstream_next_market_event(self):""" Retrieves the next price bar(s), creates a MarketEvent and puts it onto the queue. Also pushes a DailyUpdateEvent when a new day begins. This function will now stream all events for a single timestamp in one go. """ifnotself.continue_backtest():logging.debug("No more data to stream.")returncurrent_timestamp=Nonewhileself._has_more_data:row=self._current_row_dataifcurrent_timestampisNone:current_timestamp=row.timestampelifrow.timestamp>current_timestamp:breakcurrent_date=current_timestamp.date()ifself._last_processed_dateisNone:self._last_processed_date=current_dateelifcurrent_date>self._last_processed_date:daily_update_event=DailyUpdateEvent(timestamp=datetime.combine(self._last_processed_date,datetime.min.time()))self.event_queue.put(daily_update_event)logging.debug(f"Pushed DailyUpdateEvent for {self._last_processed_date}")self._last_processed_date=current_datemarket_data={"open":row.open,"high":row.high,"low":row.low,"close":row.close,"volume":row.volume}market_event=MarketEvent(symbol=row.symbol,timestamp=row.timestamp,data=market_data)self.event_queue.put(market_event)logging.debug(f"Pushed MarketEvent for {row.symbol} on {row.timestamp}")self._load_next_row()ifnotself.continue_backtest()andself._last_processed_dateisnotNone:daily_update_event=DailyUpdateEvent(timestamp=datetime.combine(self._last_processed_date,datetime.min.time()))self.event_queue.put(daily_update_event)logging.debug(f"Pushed final DailyUpdateEvent for {self._last_processed_date}")self._last_processed_date=None
[docs]defreset(self):""" Resets the DataHandler to its initial state, ready to stream data from the beginning. """self._preprocess_data()self._last_processed_date=Noneself._last_processed_timestamp=Nonelogging.info(f"DataHandler RESET complete. Ready to stream from {self.start_date}.")
def_preprocess_data(self):""" Loads data for all specified symbols and interval, sorts it, and prepares a direct iterator over the DataFrame's rows. """all_rows_data=[]forsymbolinself.symbols:price_bars=self.price_bar_data[symbol]forpbinprice_bars:all_rows_data.append({"timestamp":pb.timestamp,"symbol":pb.symbol,"open":float(pb.open),"high":float(pb.high),"low":float(pb.low),"close":float(pb.close),"volume":float(pb.volume)})ifnotall_rows_data:logging.warning(f"No price data found for any of the symbols {self.symbols} at interval {self.interval.value}")self._has_more_data=Falsereturnself._all_data_df=pd.DataFrame(all_rows_data)self._all_data_df=self._all_data_df.sort_values(by=["timestamp","symbol"]).reset_index(drop=True)self._df_iterator=self._all_data_df.itertuples(index=False)self._load_next_row()logging.info(f"Loaded data for {len(self.symbols)} symbols across {len(self._all_data_df['timestamp'].unique())} unique timestamps.")def_load_next_row(self):""" Loads the next row of data from the DataFrame iterator. Sets _has_more_data to False if no more rows. """try:self._current_row_data=next(self._df_iterator)self._has_more_data=TrueexceptStopIteration:self._current_row_data=Noneself._has_more_data=Falselogging.debug("No more rows available from data handler.")def_load_data_from_data_source(self):price_bar_data:Dict[str,List[PriceBar]]={}type=self.data_source.typeiftype==DataSourceType.DIRECT:price_bar_data=self.data_source.price_bar_dataifprice_bar_dataisNone:raiseValueError("The provided price bar data is None, stopping backtest.")eliftype==DataSourceType.CUSTOM_CLIENT:ifself.data_source.custom_clientisNone:raiseValueError("The provided Custom Data Client is None, stopping backtest.")price_bar_data=self._load_all_symbols(self.data_source.custom_client)eliftype==DataSourceType.STD_CLIENT:ifself.data_source.api_keyisNone:raiseValueError("The provided API Key is None, stopping backtest.")ifself.data_source.providerisNone:raiseValueError("The provided Data Provider is None, stopping backtest.")matchself.data_source.provider:caseSupportedProvider.ALPHA_VANTAGE:data_client=AlphaVantageStdPriceBarClient(self.data_source.api_key)case_:raiseValueError("The provided Data Provider is not yet supported, stopping backtest.")price_bar_data=self._load_all_symbols(data_client)self.price_bar_data=price_bar_datadef_load_all_symbols(self,client:PriceBarClient):price_bar_data:Dict[str,List[PriceBar]]={}forsymbolinself.symbols:symbol_data=client.get_price_bar_data(symbol,self.start_date,self.end_date,self.interval)price_bar_data[symbol]=symbol_datareturnprice_bar_data