from collections import deque
from decimal import Decimal
import logging
from typing import Any, Deque, Dict
from alpheast.models.signal import Signal
from alpheast.events.event_enums import OrderType
from alpheast.events.event_queue import EventQueue
from alpheast.events.event import FillEvent, MarketEvent, OrderEvent
from alpheast.handlers.execution_handler import ExecutionHandler
[docs]
class SimulatedExecutionHandler(ExecutionHandler):
"""
A concrete execution handler that simulates order execution.
It simulates slippage for market orders and considers high/low for limit orders.
"""
def __init__(
self,
event_queue: EventQueue,
transaction_cost_percent: Decimal = Decimal("0.001"),
slippage_percent: Decimal = Decimal("0.0005")
):
self.event_queue = event_queue
# Cache latest known market prices to simulate fills
self._latest_market_prices: Dict[str, Dict[str, Any]] = {}
self.transaction_cost_percent = transaction_cost_percent
self.slippage_percent = slippage_percent
self._open_orders: Deque[str, OrderEvent] = deque()
self._open_orders_by_id: Dict[str, OrderEvent] = {}
logging.info("SimulatedExecutionHandler initialized.")
[docs]
def on_market_event(self, event: MarketEvent):
"""
Updates the internal cache of the latest market prices based on incoming MarketEvents.
"""
self._latest_market_prices[event.symbol] = {
"price": Decimal(str(event.data["close"])),
"timestamp": event.timestamp,
"open": Decimal(str(event.data["open"])),
"high": Decimal(str(event.data["high"])),
"low": Decimal(str(event.data["low"]))
}
logging.debug(f"ExecutionHandler updated latest price for {event.symbol} to {self._latest_market_prices[event.symbol]['price']:.2f} on {event.timestamp.date()}")
orders_to_requeue = deque()
while self._open_orders:
order = self._open_orders.popleft()
if order.symbol != event.symbol:
orders_to_requeue.append(order)
continue
filled = False
if order.order_type == OrderType.MARKET:
self._attempt_fill_market_order(order)
filled = True
elif order.order_type == OrderType.LIMIT:
filled = self._attempt_fill_limit_order(order)
if not filled:
orders_to_requeue.append(order)
self._open_orders.extend(orders_to_requeue)
[docs]
def on_order_event(self, event: OrderEvent):
self._open_orders.append(event)
self._open_orders_by_id[event.order_id] = event
logging.info(f"ExecutionHandler received and opened order {event.order_id} for {event.symbol} ({event.direction} {event.quantity}) at {event.timestamp.date()}")
[docs]
def reset(self):
"""
Resets current open orders
"""
self._open_orders.clear()
self._open_orders_by_id.clear()
logging.info("SimulatedExecutionHandler reset open orders.")
def _attempt_fill_market_order(self, order: OrderEvent):
try:
fill_price_data = self._latest_market_prices.get(order.symbol)
if not fill_price_data:
logging.warning(f"No market data available for {order.symbol} to fill order on {order.timestamp.date()}. Skipping fill.")
self.push_failed_fill_event(order)
return
base_price = fill_price_data["price"]
if order.direction == Signal.BUY:
fill_price = base_price * (Decimal("1") + self.slippage_percent)
elif order.direction == Signal.SELL:
fill_price = base_price * (Decimal("1") - self.slippage_percent)
else:
fill_price = base_price
fill_price = max(Decimal("0.01"), fill_price) # Prevent zero or negative prices
commission = (order.quantity * fill_price) * self.transaction_cost_percent
self._create_and_push_fill_event(order, fill_price, successful=True, commission=commission)
self._remove_order_from_open_orders(order.order_id)
except Exception as e:
logging.error(f"Error simulating order fill for {order.symbol} on {order.timestamp.date()}: {e}", exc_info=True)
self.push_failed_fill_event(order)
def _attempt_fill_limit_order(self, order: OrderEvent):
"""
Attempts to fill a limit order.
Returns True if the order was filled, False otherwise.
"""
try:
fill_price_data = self._latest_market_prices.get(order.symbol)
if not fill_price_data:
logging.warning(f"No market data available for {order.symbol} to fill order on {order.timestamp.date()}. Skipping fill.")
return False
can_fill = False
fill_price = order.price
if order.direction == Signal.BUY and fill_price_data["low"] <= order.price:
can_fill = True
fill_price = min(order.price, fill_price_data["close"])
elif order.direction == Signal.SELL and fill_price_data["high"] >= order.price:
can_fill = True
fill_price = max(order.price, fill_price_data["close"])
if can_fill:
fill_price = max(Decimal("0.01"), fill_price)
commission = (order.quantity * fill_price) * self.transaction_cost_percent
self._create_and_push_fill_event(order, fill_price, successful=True, commission=commission)
self._remove_order_from_open_orders(order.order_id)
return True
else:
logging.debug(f"Limit order {order.order_id} for {order.symbol} ({order.direction} at {order.price:.2f}) not filled on {order.timestamp.date()}. Low: {fill_price_data['low']:.2f}, High: {fill_price_data['high']:.2f}")
return False
except Exception as e:
logging.error(f"Error simulating limit order fill for {order.symbol} on {order.timestamp.date()} (Order ID: {order.order_id}): {e}", exc_info=True)
self.push_failed_fill_event(order)
return True
[docs]
def push_failed_fill_event(self, order: OrderEvent):
self._create_and_push_fill_event(order, Decimal("0.0"), successful=False, commission=Decimal("0.0"))
self._remove_order_from_open_orders(order.order_id)
# HELPER METHOD 1: Handles creating, sending, and logging FillEvents
def _create_and_push_fill_event(
self,
order: OrderEvent,
fill_price: Decimal,
successful: bool,
commission: Decimal = Decimal("0.0")
):
"""
Helper to create and put a FillEvent onto the queue, and log the outcome.
"""
fill_event = FillEvent(
order_id=order.order_id,
symbol=order.symbol,
timestamp=order.timestamp,
direction=order.direction,
quantity=order.quantity,
fill_price=fill_price,
commission=commission,
successful=successful
)
self.event_queue.put(fill_event)
# Log based on success and order type
if successful:
log_message = (
f"Filled {order.order_type.name} order {order.order_id}: "
f"{order.direction.name} {order.quantity} of {order.symbol} at {fill_price:.2f} "
f"(Commission: {commission:.2f})"
)
if order.order_type == OrderType.LIMIT:
log_message += f" (Limit: {order.price:.2f})" # Add limit price for context
logging.info(f"{log_message} on {order.timestamp.date()}")
else:
logging.warning(f"Failed to fill order {order.order_id} for {order.symbol} on {order.timestamp.date()}.")
# HELPER METHOD 2: Handles removing orders from the internal tracking dictionary
def _remove_order_from_open_orders(self, order_id: str):
"""
Helper to safely remove an order from the _open_orders dictionary.
"""
if order_id in self._open_orders_by_id:
del self._open_orders_by_id[order_id]
else:
logging.warning(f"Attempted to remove order {order_id} from _open_orders but it was not found. It might have been removed already.")