diff --git a/requirements.txt b/requirements.txt index 6bf89c2..87e4651 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,4 @@ requests>=2.31.0 pandas>=2.0.0 matplotlib>=3.7.0 fpdf2>=2.7.0 - -# Optional: Execution quality cross-referencing -# alpaca-py>=0.30.0 -# Note: uncomment above if you want Alpaca NBBO fill auditing +alpaca-py>=0.21.0 diff --git a/trading_system/execution/__init__.py b/trading_system/execution/__init__.py index e69de29..1a3f481 100644 --- a/trading_system/execution/__init__.py +++ b/trading_system/execution/__init__.py @@ -0,0 +1,22 @@ +""" +Execution Layer — Order Lifecycle Management + +Handles multi-source price discovery, fill monitoring, staggered repricing +within a slippage budget, and Slack escalation on crossover events. +""" + +from trading_system.execution.execution_manager import ExecutionManager, ExecutionContext +from trading_system.execution.price_discovery import PriceDiscovery +from trading_system.execution.fill_monitor import FillMonitor +from trading_system.execution.slippage_controller import SlippageController +from trading_system.execution.order_types import SmartOrderType, should_use_twap + +__all__ = [ + "ExecutionManager", + "ExecutionContext", + "PriceDiscovery", + "FillMonitor", + "SlippageController", + "SmartOrderType", + "should_use_twap", +] diff --git a/trading_system/execution/execution_manager.py b/trading_system/execution/execution_manager.py new file mode 100644 index 0000000..1c9738f --- /dev/null +++ b/trading_system/execution/execution_manager.py @@ -0,0 +1,263 @@ +""" +Execution Manager — orchestrates order lifecycle. + +Sits between strategy signals and the broker: + 1. Cross-checks price before placement + 2. Registers orders for fill monitoring + 3. Detects crossovers and reprices within slippage budget + 4. Escalates via Slack when budget is exhausted +""" + +from datetime import datetime +from typing import Dict, Optional + +from trading_system.utils.slack import send_crossover_alert + + +class ExecutionContext: + """Tracks one order through its lifecycle.""" + + def __init__(self, symbol: str, order_dict: dict, order_id: Optional[str] = None): + self.symbol = symbol + self.action = order_dict.get('action', '') + self.original_order = dict(order_dict) + self.order_id = order_id + + # Limit price tracking + limit = (order_dict.get('limit_price') + or order_dict.get('price') + or order_dict.get('current_price') + or 0.0) + self.original_limit_price = float(limit) + self.current_limit_price = float(limit) + + # Stop price (for stop-limit orders) + self.stop_price = float(order_dict.get('stop_price', 0)) + + # Repricing state + self.reprice_attempt = 0 + self.reprice_history = [] + + # Timestamps + self.submitted_at = datetime.now() + self.crossover_detected_at = None + + # Status: pending | filled | cancelled | escalated + self.status = 'pending' + + def to_dict(self) -> dict: + """Serialize for blob logging.""" + return { + 'symbol': self.symbol, + 'action': self.action, + 'order_id': self.order_id, + 'original_limit_price': self.original_limit_price, + 'current_limit_price': self.current_limit_price, + 'stop_price': self.stop_price, + 'reprice_attempt': self.reprice_attempt, + 'reprice_history': self.reprice_history, + 'submitted_at': self.submitted_at.isoformat() if self.submitted_at else None, + 'crossover_detected_at': ( + self.crossover_detected_at.isoformat() + if self.crossover_detected_at else None + ), + 'status': self.status, + } + + +class ExecutionManager: + """Orchestrates order placement, monitoring, repricing, and escalation.""" + + def __init__(self, trading_bot, price_discovery, slippage_controller, + fill_monitor, dry_run: bool = True): + """ + Args: + trading_bot: SafeCashBot instance. + price_discovery: PriceDiscovery instance. + slippage_controller: SlippageController instance. + fill_monitor: FillMonitor instance. + dry_run: If True, skip live repricing / cancellation. + """ + self.trading_bot = trading_bot + self.price_discovery = price_discovery + self.slippage_controller = slippage_controller + self.fill_monitor = fill_monitor + self.dry_run = dry_run + self._pending: Dict[str, ExecutionContext] = {} # order_id -> ctx + + def submit(self, symbol: str, order_dict: dict, paired_buy: dict = None): + """Register an order for lifecycle tracking. + + Call this *after* the order has been placed via the existing + _execute_* methods. The order_id is extracted from state_manager + or passed in the order_dict. + + Also performs a pre-flight price cross-check and logs divergence. + + Args: + symbol: Trading symbol. + order_dict: Order details dict (action, price, quantity, etc.). + paired_buy: Optional paired buy dict (tracked separately). + """ + # Pre-flight price cross-check + expected_price = (order_dict.get('limit_price') + or order_dict.get('price') + or order_dict.get('current_price')) + if expected_price: + check = self.price_discovery.cross_check(symbol, float(expected_price)) + if check['alert']: + print(f" [execution] Price divergence warning for {symbol}: " + f"{check['divergence_pct']:.2f}% from consensus " + f"(expected=${expected_price}, consensus=${check['consensus_price']:.2f})") + + # Register context — order_id may be set later by _execute_* via register_order_id + ctx = ExecutionContext(symbol, order_dict) + # Use a placeholder key until we have a real order_id + placeholder_key = f"{symbol}_{ctx.action}_{ctx.submitted_at.isoformat()}" + self._pending[placeholder_key] = ctx + + # Track paired buy separately + if paired_buy: + pb_ctx = ExecutionContext(symbol, paired_buy) + pb_key = f"{symbol}_{pb_ctx.action}_{pb_ctx.submitted_at.isoformat()}" + self._pending[pb_key] = pb_ctx + + def register_order_id(self, symbol: str, action: str, order_id: str): + """Attach a broker order_id to a pending context. + + Called after SafeCashBot returns the order_id from placement. + Finds the most recent unassigned context matching symbol+action. + """ + if not order_id: + return + for key, ctx in reversed(list(self._pending.items())): + if (ctx.symbol == symbol + and ctx.action == action + and ctx.order_id is None + and ctx.status == 'pending'): + ctx.order_id = order_id + # Re-key by order_id + del self._pending[key] + self._pending[order_id] = ctx + return + + def check_pending_orders(self, open_orders: list, recent_orders: list): + """Check all pending orders for fills, crossovers, and repricing. + + Called once per tick from run_once(), after order_book.update(). + + Args: + open_orders: List of currently open order dicts from broker. + recent_orders: List of recent order dicts (7-day history). + """ + if not self._pending: + return + + open_ids = {o.get('order_id') for o in (open_orders or []) if o.get('order_id')} + completed = [] + + for key, ctx in list(self._pending.items()): + if ctx.status != 'pending': + continue + + status = self.fill_monitor.check_order(ctx, open_ids, recent_orders) + + if status == 'filled': + ctx.status = 'filled' + completed.append(key) + print(f" [execution] Filled: {ctx.symbol} {ctx.action} " + f"(limit=${ctx.current_limit_price:.2f})") + + elif status == 'cancelled': + ctx.status = 'cancelled' + completed.append(key) + print(f" [execution] Cancelled: {ctx.symbol} {ctx.action}") + + elif status == 'crossover': + if ctx.crossover_detected_at is None: + ctx.crossover_detected_at = datetime.now() + + if self.slippage_controller.can_reprice(ctx): + self._reprice_order(ctx, open_orders) + else: + self._escalate(ctx) + completed.append(key) + + # Clean up completed + for key in completed: + if key in self._pending: + del self._pending[key] + + def _reprice_order(self, ctx: ExecutionContext, open_orders: list): + """Cancel and replace an order at a repriced limit.""" + price_data = self.price_discovery.get_price(ctx.symbol) + current_market = price_data['price'] + if current_market <= 0: + return + + new_price = self.slippage_controller.next_price(ctx, current_market) + old_price = ctx.current_limit_price + + print(f" [execution] Repricing {ctx.symbol} {ctx.action}: " + f"${old_price:.2f} -> ${new_price:.2f} " + f"(attempt {ctx.reprice_attempt + 1}/{self.slippage_controller.max_attempts})") + + ctx.reprice_history.append({ + 'from': old_price, + 'to': new_price, + 'market': current_market, + 'timestamp': datetime.now().isoformat(), + }) + self.slippage_controller.record_reprice(ctx, new_price) + + if not self.dry_run and ctx.order_id: + # Cancel old order + self.trading_bot.cancel_order_by_id(ctx.order_id) + + # Place replacement + result = None + if ctx.action in ('stop_limit_sell',): + new_stop = round(new_price * (ctx.stop_price / ctx.original_limit_price), 2) \ + if ctx.original_limit_price > 0 else new_price + result = self.trading_bot.place_stop_limit_sell_order( + ctx.symbol, ctx.original_order.get('quantity', 0), + new_stop, new_price, dry_run=False, + ) + elif ctx.action in ('sell', 'limit_sell'): + result = self.trading_bot.place_sell_order( + ctx.symbol, ctx.original_order.get('quantity', 0), + new_price, dry_run=False, + ) + elif ctx.action in ('buy', 'limit_buy'): + result = self.trading_bot.place_cash_buy_order( + ctx.symbol, ctx.original_order.get('quantity', 0), + new_price, dry_run=False, + ) + + if result and isinstance(result, dict): + new_id = result.get('id') + if new_id: + old_key = ctx.order_id + ctx.order_id = new_id + if old_key in self._pending: + del self._pending[old_key] + self._pending[new_id] = ctx + + def _escalate(self, ctx: ExecutionContext): + """Escalate an unrepriced crossover to Slack.""" + ctx.status = 'escalated' + price_data = self.price_discovery.get_price(ctx.symbol) + current_price = price_data['price'] + message = self.fill_monitor.build_crossover_alert(ctx, current_price) + print(f" [execution] ESCALATING: {message}") + send_crossover_alert(message) + + def get_pending_summary(self) -> dict: + """Summary dict for blob logging.""" + contexts = [] + for ctx in self._pending.values(): + contexts.append(ctx.to_dict()) + return { + 'pending_count': len(self._pending), + 'contexts': contexts, + } diff --git a/trading_system/execution/fill_monitor.py b/trading_system/execution/fill_monitor.py new file mode 100644 index 0000000..16060f4 --- /dev/null +++ b/trading_system/execution/fill_monitor.py @@ -0,0 +1,112 @@ +""" +Fill Monitor — detects fill status and crossover events. + +A crossover occurs when the market price moves through an order's limit price +without filling. This typically indicates the order missed the market. +""" + +from datetime import datetime + + +class FillMonitor: + """Detects fills, cancellations, and crossover events for pending orders.""" + + # Price must exceed limit by this fraction to count as a crossover. + # Avoids false alerts from bid-ask noise. + CROSSOVER_THRESHOLD_PCT = 0.001 # 0.1% + + def __init__(self, price_discovery): + """ + Args: + price_discovery: PriceDiscovery instance for current market prices. + """ + self.price_discovery = price_discovery + + def check_order(self, ctx, open_order_ids: set, recent_orders: list = None) -> str: + """Determine the current status of an order. + + Args: + ctx: ExecutionContext for the order. + open_order_ids: Set of order_id strings currently open on the broker. + recent_orders: Optional list of recent order dicts for fill lookup. + + Returns: + 'filled' | 'open' | 'crossover' | 'cancelled' + """ + order_id = ctx.order_id + if order_id is None: + return 'open' + + still_open = order_id in open_order_ids + + if not still_open: + # Disappeared from open orders — check recent for state + if recent_orders: + for ro in recent_orders: + if ro.get('order_id') == order_id: + state = ro.get('state', '') + if state == 'filled': + return 'filled' + if state in ('cancelled', 'failed', 'rejected'): + return 'cancelled' + # Disappeared but not in recent — assume filled + return 'filled' + + # Still open — check for crossover + price_data = self.price_discovery.get_price(ctx.symbol) + current_price = price_data['price'] + if current_price > 0 and self.detect_crossover(ctx, current_price): + return 'crossover' + + return 'open' + + def detect_crossover(self, ctx, current_price: float) -> bool: + """Detect whether the market has crossed through the order's limit. + + Sell limit at $30: crossover if market > $30 (buyers paying more than our ask). + Buy limit at $30: crossover if market < $30 (sellers accepting less than our bid). + Stop-limit sell (stop=$29, limit=$28.50): crossover if market < $28.50. + + Returns True if market has crossed through limit beyond the threshold. + """ + limit_price = ctx.current_limit_price + if limit_price is None or limit_price <= 0 or current_price <= 0: + return False + + action = ctx.action + threshold = limit_price * self.CROSSOVER_THRESHOLD_PCT + + if action in ('stop_limit_sell',): + # Stop-limit sell: crossover if market dropped below limit + return current_price < (limit_price - threshold) + elif action in ('sell', 'limit_sell'): + # Sell limit: crossover if market rose above limit + return current_price > (limit_price + threshold) + elif action in ('buy', 'limit_buy'): + # Buy limit: crossover if market dropped below limit + return current_price < (limit_price - threshold) + + return False + + def build_crossover_alert(self, ctx, current_price: float) -> str: + """Build a Slack crossover alert message. + + Includes @Jason Bian mention for escalation. + """ + elapsed = "" + if ctx.submitted_at: + delta = datetime.now() - ctx.submitted_at + minutes = int(delta.total_seconds() / 60) + elapsed = f", open {minutes}m" + + reprice_note = "" + if ctx.reprice_attempt > 0: + reprice_note = f", reprice attempt {ctx.reprice_attempt}" + + direction = "above" if ctx.action in ('sell', 'limit_sell') else "below" + + return ( + f"Crossover detected: {ctx.symbol} {ctx.action} " + f"limit=${ctx.current_limit_price:.2f} but market=${current_price:.2f} " + f"({direction} limit{elapsed}{reprice_note})" + ) diff --git a/trading_system/execution/order_types.py b/trading_system/execution/order_types.py new file mode 100644 index 0000000..2c5d720 --- /dev/null +++ b/trading_system/execution/order_types.py @@ -0,0 +1,15 @@ +""" +Smart Order Types — TWAP stub for future use. +""" + +from enum import Enum + + +class SmartOrderType(Enum): + IMMEDIATE = "immediate" # Default: single limit order + TWAP = "twap" # Future: time-split across ticks + + +def should_use_twap(quantity, lot_size=250) -> bool: + """True if quantity > 2x lot_size (future use, returns False for now).""" + return False diff --git a/trading_system/execution/price_discovery.py b/trading_system/execution/price_discovery.py new file mode 100644 index 0000000..78ff36a --- /dev/null +++ b/trading_system/execution/price_discovery.py @@ -0,0 +1,181 @@ +""" +Price Discovery — aggregates prices from multiple sources. + +Sources: + 1. TwelveDataProvider.get_quote() + 2. Gamma extract_gamma_prices() + 3. Alpaca latest quote (price feed only, not a broker) + 4. SafeCashBot broker quote +""" + +import os +import statistics +from typing import Dict, Optional + + +def _fetch_alpaca_quote(symbol: str) -> Optional[float]: + """Fetch latest mid-price from Alpaca market data. + + Requires ALPACA_API_KEY and ALPACA_SECRET_KEY env vars. + Returns None if not configured or on any error. + """ + api_key = os.getenv("ALPACA_API_KEY") + secret_key = os.getenv("ALPACA_SECRET_KEY") + if not api_key or not secret_key: + return None + + try: + from alpaca.data.requests import StockLatestQuoteRequest + from alpaca.data.historical import StockHistoricalDataClient + + client = StockHistoricalDataClient(api_key, secret_key) + request = StockLatestQuoteRequest(symbol_or_symbols=symbol) + quotes = client.get_stock_latest_quote(request) + quote = quotes.get(symbol) + if quote and quote.ask_price and quote.bid_price: + return round((quote.ask_price + quote.bid_price) / 2, 4) + return None + except Exception: + return None + + +class PriceDiscovery: + """Aggregates prices from Twelve Data, Gamma, Alpaca, and broker.""" + + def __init__(self, data_provider, trading_bot): + """ + Args: + data_provider: TwelveDataProvider instance + trading_bot: SafeCashBot instance + """ + self.data_provider = data_provider + self.trading_bot = trading_bot + self._gamma_prices: Dict[str, float] = {} + + def update_gamma_prices(self, gamma_data): + """Refresh cached gamma prices from a gamma snapshot. + + Called once per tick with the result of fetch_gamma_orders(). + """ + if gamma_data is None: + return + try: + from trading_system.state.gamma_client import extract_gamma_prices + self._gamma_prices = extract_gamma_prices(gamma_data) or {} + except Exception: + self._gamma_prices = {} + + def get_price(self, symbol: str) -> dict: + """Get consensus price from all available sources. + + Returns: + { + 'price': float, # median consensus + 'sources': { + 'twelve_data': float | None, + 'gamma': float | None, + 'alpaca': float | None, + 'broker': float | None, + }, + 'spread': float, # max - min of available sources + 'confidence': str, # 'high' (3+ sources) | 'medium' (2) | 'low' (1) | 'none' + 'outliers': list, # source names >1% from median + } + """ + sources: Dict[str, Optional[float]] = { + 'twelve_data': None, + 'gamma': None, + 'alpaca': None, + 'broker': None, + } + + # 1. Twelve Data + try: + quote = self.data_provider.get_quote(symbol) + if quote and quote.get('price'): + sources['twelve_data'] = float(quote['price']) + except Exception: + pass + + # 2. Gamma + gamma_price = self._gamma_prices.get(symbol) + if gamma_price: + sources['gamma'] = float(gamma_price) + + # 3. Alpaca + sources['alpaca'] = _fetch_alpaca_quote(symbol) + + # 4. Broker + try: + broker_quote = self.trading_bot.get_quote(symbol) + if broker_quote: + price = broker_quote.get('last_trade_price') or broker_quote.get('price') + if price: + sources['broker'] = float(price) + except Exception: + pass + + # Consensus: median of available + available = [v for v in sources.values() if v is not None] + + if not available: + return { + 'price': 0.0, + 'sources': sources, + 'spread': 0.0, + 'confidence': 'none', + 'outliers': [], + } + + median_price = statistics.median(available) + spread = max(available) - min(available) + + # Flag outliers: >1% from median + outliers = [] + for name, val in sources.items(): + if val is not None and median_price > 0: + divergence = abs(val - median_price) / median_price + if divergence > 0.01: + outliers.append(name) + + n = len(available) + if n >= 3: + confidence = 'high' + elif n == 2: + confidence = 'medium' + else: + confidence = 'low' + + return { + 'price': round(median_price, 4), + 'sources': sources, + 'spread': round(spread, 4), + 'confidence': confidence, + 'outliers': outliers, + } + + def cross_check(self, symbol: str, expected_price: float) -> dict: + """Cross-check an expected price against consensus. + + Returns: + { + 'divergence_pct': float, + 'consensus_price': float, + 'alert': bool, # True if >1% divergence + } + """ + result = self.get_price(symbol) + consensus = result['price'] + if consensus == 0 or expected_price == 0: + return { + 'divergence_pct': 0.0, + 'consensus_price': consensus, + 'alert': False, + } + + divergence_pct = abs(expected_price - consensus) / consensus * 100 + return { + 'divergence_pct': round(divergence_pct, 4), + 'consensus_price': consensus, + 'alert': divergence_pct > 1.0, + } diff --git a/trading_system/execution/slippage_controller.py b/trading_system/execution/slippage_controller.py new file mode 100644 index 0000000..e527c1c --- /dev/null +++ b/trading_system/execution/slippage_controller.py @@ -0,0 +1,80 @@ +""" +Slippage Controller — staggered repricing within a budget. + +Reprices unfilled orders closer to market in controlled steps: + Attempt 1: 0.05% worse + Attempt 2: 0.15% worse (cumulative) + Attempt 3: 0.30% worse (cumulative) + +After max_attempts, the order is escalated to Slack. +""" + + +class SlippageController: + """Manages repricing budget for unfilled orders.""" + + # Cumulative slippage steps as fraction of original limit price + DEFAULT_STEPS = [0.0005, 0.0015, 0.003] + + def __init__(self, max_attempts: int = 3, max_slippage_pct: float = 0.003): + """ + Args: + max_attempts: Maximum reprice attempts before escalation. + max_slippage_pct: Max cumulative slippage as decimal (0.003 = 0.3%). + """ + self.max_attempts = max_attempts + self.max_slippage_pct = max_slippage_pct + self.step_schedule = self.DEFAULT_STEPS[:max_attempts] + + def can_reprice(self, ctx) -> bool: + """True if the order has remaining reprice attempts.""" + return ctx.reprice_attempt < self.max_attempts + + def next_price(self, ctx, current_market_price: float) -> float: + """Compute the next repriced limit. + + Sells: lower the limit closer to market (accept worse fill). + Buys: raise the limit closer to market (pay more). + + The new price is bounded — it never crosses through the current + market price (we don't want to place a limit that would immediately + execute at a loss beyond our budget). + + Args: + ctx: ExecutionContext with original_limit_price and reprice_attempt. + current_market_price: Latest consensus price. + + Returns: + New limit price (float, rounded to 2 decimals). + """ + attempt = ctx.reprice_attempt # 0-indexed, next step + if attempt >= len(self.step_schedule): + step = self.step_schedule[-1] + else: + step = self.step_schedule[attempt] + + original = ctx.original_limit_price + action = ctx.action + + if action in ('sell', 'limit_sell', 'stop_limit_sell'): + # Sells: lower the limit (accept less) + new_price = original * (1 - step) + # Don't cross below market + new_price = max(new_price, current_market_price) + elif action in ('buy', 'limit_buy'): + # Buys: raise the limit (pay more) + new_price = original * (1 + step) + # Don't cross above market + new_price = min(new_price, current_market_price) + else: + new_price = original + + return round(new_price, 2) + + def record_reprice(self, ctx, new_price: float): + """Record a reprice attempt on the context. + + Increments the attempt counter and updates the current limit price. + """ + ctx.reprice_attempt += 1 + ctx.current_limit_price = new_price diff --git a/trading_system/main.py b/trading_system/main.py index 903f48a..dc14859 100644 --- a/trading_system/main.py +++ b/trading_system/main.py @@ -21,9 +21,15 @@ from trading_system.strategies.momentum_dca_strategy import MomentumDcaLongStrategy # noqa: E402 from trading_system.state.state_manager import StateManager # noqa: E402 from trading_system.state.blob_logger import log_state_to_blob # noqa: E402 +from trading_system.state.order_book import OrderBook # noqa: E402 +from trading_system.state.gamma_client import fetch_gamma_orders, extract_gamma_prices # noqa: E402 from trading_system.market_indicators import fetch_and_write_indicators # noqa: E402 from trading_system.utils.slack import send_slack_alert # noqa: E402 from trading_system.entities.OrderType import OrderSide # noqa: E402 +from trading_system.execution.price_discovery import PriceDiscovery # noqa: E402 +from trading_system.execution.slippage_controller import SlippageController # noqa: E402 +from trading_system.execution.fill_monitor import FillMonitor # noqa: E402 +from trading_system.execution.execution_manager import ExecutionManager # noqa: E402 from utils.safe_cash_bot import SafeCashBot # noqa: E402 @@ -61,6 +67,17 @@ def __init__(self, twelve_data_api_key: str, symbols: List[str], self.metrics_calculator = MetricsCalculator() self.state_manager = StateManager() self.trading_bot = SafeCashBot() + self.order_book = OrderBook() + + # Execution layer — price discovery, fill monitoring, repricing + self.price_discovery = PriceDiscovery(self.data_provider, self.trading_bot) + self.slippage_controller = SlippageController() + self.fill_monitor = FillMonitor(self.price_discovery) + self.execution_manager = ExecutionManager( + self.trading_bot, self.price_discovery, + self.slippage_controller, self.fill_monitor, + dry_run=self.dry_run, + ) # Initialize execution quality layer self.fill_logger = None @@ -214,8 +231,10 @@ def process_signal(self, symbol: str, signal: Dict, open_orders: list = None): if order['action'] == 'buy': self._execute_buy_order(symbol, order) + self.execution_manager.submit(symbol, order) elif order['action'] == 'sell': self._execute_sell_order(symbol, order) + self.execution_manager.submit(symbol, order) elif order['action'] == 'stop_limit_sell': has_paired_buy = signal.get('paired_buy') is not None @@ -250,17 +269,21 @@ def process_signal(self, symbol: str, signal: Dict, open_orders: list = None): else: self._execute_paired_limit_buy(symbol, paired_buy) self._execute_stop_limit_sell_order(symbol, order) + self.execution_manager.submit(symbol, order, paired_buy=paired_buy) else: self._execute_stop_limit_sell_order(symbol, order) + self.execution_manager.submit(symbol, order) elif order['action'] == 'limit_sell': # Cancel existing sells before resubmit self._cancel_orders_by_side(symbol, 'SELL', open_orders) self._execute_limit_sell_resubmit(symbol, order) + self.execution_manager.submit(symbol, order) def _cancel_orders_by_side(self, symbol: str, side: str, open_orders: list) -> tuple: """Cancel ALL open orders for symbol on the given side. Returns (cancelled_count, total_qty_cancelled). + Skips actual cancellation in dry-run mode. """ cancelled = 0 qty_cancelled = 0 @@ -268,10 +291,16 @@ def _cancel_orders_by_side(self, symbol: str, side: str, open_orders: list) -> t if (order.get('symbol') == symbol and order.get('side') == side): order_id = order.get('order_id') - if order_id and self.trading_bot.cancel_order_by_id(order_id): - cancelled += 1 - qty_cancelled += int(float(order.get('quantity', 0))) - print(f" Cancelled {side} {order_id} qty={int(float(order.get('quantity', 0)))}") + if order_id: + qty = int(float(order.get('quantity', 0))) + if self.dry_run: + cancelled += 1 + qty_cancelled += qty + print(f" [DRY RUN] Would cancel {side} {order_id} qty={qty}") + elif self.trading_bot.cancel_order_by_id(order_id): + cancelled += 1 + qty_cancelled += qty + print(f" Cancelled {side} {order_id} qty={qty}") return cancelled, qty_cancelled def _handle_order_replacement(self, symbol: str, signal: Dict, symbol_orders: list): @@ -594,6 +623,23 @@ def run_once(self): recent_orders = self.trading_bot.get_recent_orders(days=7) + # Seed order book from history on first tick, then diff each tick + self.order_book.seed_from_history(recent_orders) + tick_summary = self.order_book.update(open_orders, recent_orders) + + # Check pending execution contexts for fills, crossovers, repricing + self.execution_manager.check_pending_orders(open_orders, recent_orders) + + # Print fill/cancel events detected this tick + if tick_summary['fills']: + for f in tick_summary['fills']: + avg = f.get('average_price') + price_str = f" @ ${avg:.2f}" if avg else "" + print(f" [fill] {f['symbol']} {f['side']} x{f.get('quantity', '?')}{price_str}") + if tick_summary['cancellations']: + for c in tick_summary['cancellations']: + print(f" [cancel] {c['symbol']} {c['side']} x{c.get('quantity', '?')} ({c.get('state', '')})") + # Print order book before processing through state manager if open_orders: # Filter to --ticker symbols when not in verbose mode @@ -665,6 +711,24 @@ def run_once(self): # Compute drift metrics from cached daily bars (if available) drift_metrics = self._compute_drift_metrics() + # Fetch gamma runtime snapshot and feed into price discovery + gamma_data = fetch_gamma_orders() + self.price_discovery.update_gamma_prices(gamma_data) + + fill_rate = self.order_book.get_fill_rate() + execution_log = self.order_book.get_execution_log(limit=50) + execution_summary = self.execution_manager.get_pending_summary() + + if self.verbose and fill_rate: + print(f"\n Fill Rate: {fill_rate['fill_rate_pct']:.1f}% " + f"({fill_rate['total_filled']}/{fill_rate['total_submitted']} filled, " + f"{fill_rate['total_cancelled']} cancelled)") + + if self.verbose and gamma_data: + gamma_prices = extract_gamma_prices(gamma_data) + if gamma_prices: + print(f" Gamma prices: {gamma_prices}") + log_state_to_blob( self.state_manager, live=not self.dry_run, @@ -672,6 +736,10 @@ def run_once(self): portfolio=portfolio_data, drift_metrics=drift_metrics, recent_orders=recent_orders, + fill_rate=fill_rate, + execution_log=execution_log, + gamma_snapshot=gamma_data, + execution_summary=execution_summary, ) # Refresh dashboard market indicators @@ -1102,6 +1170,11 @@ def main(): '--recent', type=int, default=None, metavar='DAYS', help='Limit backtest to the last N daily bars (e.g. --recent 90)' ) + parser.add_argument( + '--audit', + action='store_true', + help='Run order coverage audit and exit' + ) args = parser.parse_args() @@ -1137,6 +1210,13 @@ def main(): recent_days=args.recent, ) + # Run audit if requested (standalone mode — exit after) + if args.audit: + from trading_system.audit import StopLossAuditor + auditor = StopLossAuditor() + exit_code = auditor.run_audit() + sys.exit(exit_code) + # Run backtest if requested (standalone mode — exit after) if args.backtest: system.run_backtest() diff --git a/trading_system/state/blob_logger.py b/trading_system/state/blob_logger.py index 2245fd6..9f6a760 100644 --- a/trading_system/state/blob_logger.py +++ b/trading_system/state/blob_logger.py @@ -29,7 +29,9 @@ def _get_config(): def _serialize_state(state_manager, order_book=None, portfolio=None, - drift_metrics=None, recent_orders=None) -> dict: + drift_metrics=None, recent_orders=None, + fill_rate=None, execution_log=None, + gamma_snapshot=None, execution_summary=None) -> dict: """Serialize StateManager state to a JSON-safe dictionary.""" snapshot = { "timestamp": datetime.now().isoformat(), @@ -51,6 +53,14 @@ def _serialize_state(state_manager, order_book=None, portfolio=None, snapshot["drift_metrics"] = drift_metrics if recent_orders: snapshot["recent_orders"] = recent_orders + if fill_rate is not None: + snapshot["fill_rate"] = fill_rate + if execution_log is not None: + snapshot["execution_log"] = execution_log + if gamma_snapshot is not None: + snapshot["gamma_snapshot"] = gamma_snapshot + if execution_summary is not None: + snapshot["execution_summary"] = execution_summary return snapshot @@ -62,12 +72,17 @@ def _serialize_value(obj): def _log_local(state_manager, order_book=None, portfolio=None, - drift_metrics=None, recent_orders=None): + drift_metrics=None, recent_orders=None, + fill_rate=None, execution_log=None, + gamma_snapshot=None, execution_summary=None): """Write state snapshot to a local JSON file under state_logs/.""" LOCAL_LOG_DIR.mkdir(exist_ok=True) snapshot = _serialize_state(state_manager, order_book=order_book, portfolio=portfolio, drift_metrics=drift_metrics, - recent_orders=recent_orders) + recent_orders=recent_orders, + fill_rate=fill_rate, execution_log=execution_log, + gamma_snapshot=gamma_snapshot, + execution_summary=execution_summary) blob_key = datetime.now().strftime("%Y-%m-%dT%H-%M-%S") payload = json.dumps(snapshot, default=_serialize_value, indent=2) @@ -78,7 +93,9 @@ def _log_local(state_manager, order_book=None, portfolio=None, def _log_remote(state_manager, order_book=None, portfolio=None, - drift_metrics=None, recent_orders=None): + drift_metrics=None, recent_orders=None, + fill_rate=None, execution_log=None, + gamma_snapshot=None, execution_summary=None): """Upload state snapshot to Netlify Blobs.""" config = _get_config() if not config: @@ -88,7 +105,10 @@ def _log_remote(state_manager, order_book=None, portfolio=None, snapshot = _serialize_state(state_manager, order_book=order_book, portfolio=portfolio, drift_metrics=drift_metrics, - recent_orders=recent_orders) + recent_orders=recent_orders, + fill_rate=fill_rate, execution_log=execution_log, + gamma_snapshot=gamma_snapshot, + execution_summary=execution_summary) blob_key = datetime.now().strftime("%Y-%m-%dT%H-%M-%S") payload = json.dumps(snapshot, default=_serialize_value) @@ -109,15 +129,23 @@ def _log_remote(state_manager, order_book=None, portfolio=None, def log_state_to_blob(state_manager, live=False, order_book=None, - portfolio=None, drift_metrics=None, recent_orders=None): + portfolio=None, drift_metrics=None, recent_orders=None, + fill_rate=None, execution_log=None, + gamma_snapshot=None, execution_summary=None): """Log StateManager state. Writes locally in dry-run, uploads to Netlify Blobs when live.""" if live: return _log_remote(state_manager, order_book=order_book, portfolio=portfolio, drift_metrics=drift_metrics, - recent_orders=recent_orders) + recent_orders=recent_orders, + fill_rate=fill_rate, execution_log=execution_log, + gamma_snapshot=gamma_snapshot, + execution_summary=execution_summary) return _log_local(state_manager, order_book=order_book, portfolio=portfolio, drift_metrics=drift_metrics, - recent_orders=recent_orders) + recent_orders=recent_orders, + fill_rate=fill_rate, execution_log=execution_log, + gamma_snapshot=gamma_snapshot, + execution_summary=execution_summary) def upload_blob(store_name, blob_key, data): diff --git a/trading_system/state/gamma_client.py b/trading_system/state/gamma_client.py new file mode 100644 index 0000000..78cca65 --- /dev/null +++ b/trading_system/state/gamma_client.py @@ -0,0 +1,56 @@ +""" +Gamma Runtime Service Client +Minimal HTTP client to fetch the desired order set from the gamma runtime +service for cross-engine visibility and as a price source. +""" + +import requests + +GAMMA_URL = "https://route-runtime-service.netlify.app/api/orders" + + +def fetch_gamma_orders(url=None): + """Fetch the /api/orders JSON snapshot from the gamma runtime service. + + Args: + url: override URL (defaults to GAMMA_URL) + + Returns: + dict with snapshot_key, stock_orders, options — or None on failure + """ + try: + resp = requests.get(url or GAMMA_URL, timeout=10) + resp.raise_for_status() + return resp.json() + except Exception as e: + print(f" [gamma] Could not fetch gamma orders: {e}") + return None + + +def extract_gamma_prices(data): + """Extract current prices from gamma order set. + + Args: + data: dict returned by fetch_gamma_orders() + + Returns: + dict mapping symbol to price, e.g. {"BTC": 29.47, "SPY": 680.7} + Returns empty dict on failure. + """ + if not data: + return {} + + prices = {} + try: + for order in data.get('stock_orders', []): + symbol = order.get('symbol') + price = order.get('price') or order.get('limit_price') + if symbol and price is not None: + try: + prices[symbol] = float(price) + except (ValueError, TypeError): + pass + except Exception as e: + print(f" [gamma] Could not extract prices: {e}") + + return prices diff --git a/trading_system/state/order_book.py b/trading_system/state/order_book.py new file mode 100644 index 0000000..1df0ec7 --- /dev/null +++ b/trading_system/state/order_book.py @@ -0,0 +1,204 @@ +""" +Order Book Tracker +Tracks order lifecycle (submitted → filled/cancelled) across ticks to compute +fill rates and maintain an execution log. +""" + +from datetime import datetime + + +class OrderBook: + """Tracks open order IDs across ticks, diffs to detect fills/cancellations.""" + + def __init__(self): + self._prev_open_ids = set() + self._seeded = False + self._total_submitted = 0 + self._total_filled = 0 + self._total_cancelled = 0 + self._by_symbol = {} # symbol -> {submitted, filled, cancelled} + self.execution_log = [] # chronological list of events + + def seed_from_history(self, recent_orders): + """Seed counters from 7-day order history so fill rate is immediately meaningful. + + Args: + recent_orders: list of order dicts from get_recent_orders(days=7). + Each has keys: order_id, symbol, state, average_price, filled_quantity, etc. + """ + if self._seeded: + return + + for order in (recent_orders or []): + state = order.get('state', '') + symbol = order.get('symbol', 'N/A') + + self._ensure_symbol(symbol) + self._total_submitted += 1 + self._by_symbol[symbol]['submitted'] += 1 + + if state == 'filled': + self._total_filled += 1 + self._by_symbol[symbol]['filled'] += 1 + self.execution_log.append({ + 'event': 'filled', + 'order_id': order.get('order_id'), + 'symbol': symbol, + 'side': order.get('side'), + 'quantity': order.get('quantity'), + 'average_price': order.get('average_price'), + 'filled_quantity': order.get('filled_quantity'), + 'timestamp': order.get('updated_at', order.get('created_at')), + 'source': 'seed', + }) + elif state in ('cancelled', 'failed', 'rejected'): + self._total_cancelled += 1 + self._by_symbol[symbol]['cancelled'] += 1 + self.execution_log.append({ + 'event': 'cancelled', + 'order_id': order.get('order_id'), + 'symbol': symbol, + 'side': order.get('side'), + 'quantity': order.get('quantity'), + 'timestamp': order.get('updated_at', order.get('created_at')), + 'source': 'seed', + }) + + self._seeded = True + + def update(self, open_orders, recent_orders): + """Diff current open orders against previous tick to detect fills/cancellations. + + Args: + open_orders: list of currently open order dicts (from get_open_orders()) + recent_orders: list of recent historical orders (from get_recent_orders()) + + Returns: + dict with 'new_orders', 'fills', 'cancellations' detected this tick + """ + current_ids = {o['order_id'] for o in (open_orders or []) if o.get('order_id')} + + # Build lookup of current open orders by ID + open_by_id = {o['order_id']: o for o in (open_orders or []) if o.get('order_id')} + + # Build lookup of recent orders by ID for resolving disappeared orders + recent_by_id = {o['order_id']: o for o in (recent_orders or []) if o.get('order_id')} + + # New orders = IDs in current but not in previous + new_ids = current_ids - self._prev_open_ids + + # Disappeared orders = IDs in previous but not in current + disappeared_ids = self._prev_open_ids - current_ids + + tick_summary = { + 'new_orders': [], + 'fills': [], + 'cancellations': [], + } + + now = datetime.now().isoformat() + + # Track new submissions + for oid in new_ids: + order = open_by_id.get(oid, {}) + symbol = order.get('symbol', 'N/A') + self._ensure_symbol(symbol) + self._total_submitted += 1 + self._by_symbol[symbol]['submitted'] += 1 + + entry = { + 'event': 'submitted', + 'order_id': oid, + 'symbol': symbol, + 'side': order.get('side'), + 'quantity': order.get('quantity'), + 'limit_price': order.get('limit_price'), + 'stop_price': order.get('stop_price'), + 'timestamp': now, + } + self.execution_log.append(entry) + tick_summary['new_orders'].append(entry) + + # Resolve disappeared orders: check recent_orders to distinguish fill vs cancel + for oid in disappeared_ids: + resolved = recent_by_id.get(oid, {}) + state = resolved.get('state', '') + symbol = resolved.get('symbol', 'N/A') + self._ensure_symbol(symbol) + + if state == 'filled': + self._total_filled += 1 + self._by_symbol[symbol]['filled'] += 1 + entry = { + 'event': 'filled', + 'order_id': oid, + 'symbol': symbol, + 'side': resolved.get('side'), + 'quantity': resolved.get('quantity'), + 'average_price': resolved.get('average_price'), + 'filled_quantity': resolved.get('filled_quantity'), + 'timestamp': now, + } + self.execution_log.append(entry) + tick_summary['fills'].append(entry) + else: + # Treat as cancelled (includes cancelled, failed, rejected, or unknown) + self._total_cancelled += 1 + self._by_symbol[symbol]['cancelled'] += 1 + entry = { + 'event': 'cancelled', + 'order_id': oid, + 'symbol': symbol, + 'side': resolved.get('side'), + 'quantity': resolved.get('quantity'), + 'state': state or 'disappeared', + 'timestamp': now, + } + self.execution_log.append(entry) + tick_summary['cancellations'].append(entry) + + self._prev_open_ids = current_ids + return tick_summary + + def get_fill_rate(self): + """Compute fill rate statistics. + + Returns: + dict with total_submitted, total_filled, total_cancelled, + fill_rate_pct, and by_symbol breakdown + """ + rate = 0.0 + if self._total_submitted > 0: + rate = (self._total_filled / self._total_submitted) * 100 + + return { + 'total_submitted': self._total_submitted, + 'total_filled': self._total_filled, + 'total_cancelled': self._total_cancelled, + 'fill_rate_pct': round(rate, 2), + 'by_symbol': dict(self._by_symbol), + } + + def get_execution_log(self, limit=50): + """Return the most recent execution log entries. + + Args: + limit: max number of entries to return (most recent first) + + Returns: + list of event dicts, newest first + """ + return list(reversed(self.execution_log[-limit:])) + + def to_dict(self): + """Serialize full state for blob storage.""" + return { + 'fill_rate': self.get_fill_rate(), + 'execution_log': self.execution_log, + 'prev_open_ids': list(self._prev_open_ids), + 'seeded': self._seeded, + } + + def _ensure_symbol(self, symbol): + if symbol not in self._by_symbol: + self._by_symbol[symbol] = {'submitted': 0, 'filled': 0, 'cancelled': 0} diff --git a/trading_system/utils/slack.py b/trading_system/utils/slack.py index 3adb908..b24dcc7 100644 --- a/trading_system/utils/slack.py +++ b/trading_system/utils/slack.py @@ -42,3 +42,9 @@ def send_slack_alert(message, emoji=":warning:"): except requests.RequestException as e: print(f"Failed to send Slack alert: {e}") return False + + +def send_crossover_alert(message): + """Send crossover alert mentioning @Jason Bian.""" + tagged = f"@Jason Bian {message}" + return send_slack_alert(tagged, emoji=":rotating_light:")