diff --git a/README.md b/README.md index 4882510..afc1eea 100644 --- a/README.md +++ b/README.md @@ -61,12 +61,12 @@ Backtesting uses the same event model as live trading. High-level flow: -- CSV or historical provider feeds market data -- Backtest runner drives event loop -- Strategy executes normally +- Dukascopy cache data is loaded via `BacktestClient.from_dukascopy_cache(...)` +- `run_backtest(...)` drives the event loop and recording pipeline +- Strategy code executes unchanged - Portfolio and recording operate identically to live mode -See `docs/backtesting_guide.md` for detailed usage. +See `docs/backtesting_guide.md` for the current cache-backed workflow. ## Live Trading (IG) @@ -116,7 +116,19 @@ Users can subscribe to recording events for custom reporting pipelines. ## Installation -Install using your preferred environment manager. Ensure Python 3.11+. +Python 3.11+ is required. + +Install the published package: + +```bash +pip install tradedesk +``` + +For local development: + +```bash +pip install -e '.[dev]' +``` ## Documentation @@ -131,6 +143,15 @@ See the `docs/` directory for: - Risk management guide - Metrics guide +Public package entry points are grouped under: + +- `tradedesk.marketdata` +- `tradedesk.execution` +- `tradedesk.execution.backtest` +- `tradedesk.portfolio` +- `tradedesk.recording` +- `tradedesk.strategy` + tradedesk is designed for clarity, determinism, and event-level transparency. @@ -142,4 +163,3 @@ Licensed under the Apache License, Version 2.0. See: [https://www.apache.org/licenses/LICENSE-2.0](https://www.apache.org/licenses/LICENSE-2.0) Copyright 2026 [Radius Red Ltd.](https://github.com/radiusred) - diff --git a/docs/aggregation_guide.md b/docs/aggregation_guide.md index f169631..44a3270 100644 --- a/docs/aggregation_guide.md +++ b/docs/aggregation_guide.md @@ -1,6 +1,6 @@ # Candle Aggregation Guide -The `tradedesk.aggregation` module provides time-bucketing candle aggregation for converting base-period candles into higher timeframes. +The `tradedesk.marketdata` module provides time-bucketing candle aggregation for converting base-period candles into higher timeframes. ## Overview @@ -14,7 +14,7 @@ The `tradedesk.aggregation` module provides time-bucketing candle aggregation fo ## Basic Usage ```python -from tradedesk.aggregation import CandleAggregator +from tradedesk.marketdata import CandleAggregator from tradedesk import Candle # Create aggregator for 15-minute candles from 5-minute base period @@ -49,7 +49,7 @@ assert result.close == 1.125 # Last close Use `choose_base_period()` to automatically select an appropriate base period for your broker: ```python -from tradedesk.aggregation import choose_base_period +from tradedesk.marketdata import choose_base_period # Default: Uses common broker periods (SECOND, 1MINUTE, 5MINUTE, HOUR) base = choose_base_period("15MINUTE") # Returns "1MINUTE" @@ -157,7 +157,7 @@ The aggregator is gap-tolerant: ## Complete Example: Live Aggregation ```python -from tradedesk.aggregation import CandleAggregator +from tradedesk.marketdata import CandleAggregator class LiveAggregationStrategy: def __init__(self, target_period: str): diff --git a/docs/backtesting_guide.md b/docs/backtesting_guide.md index 52981b2..edb14a3 100644 --- a/docs/backtesting_guide.md +++ b/docs/backtesting_guide.md @@ -1,12 +1,12 @@ # Backtesting Guide This guide covers running backtests with tradedesk using candle data loaded from -CSV files or supplied in-memory. +the Dukascopy cache or supplied in-memory. By the end you will have: - A working strategy -- A CSV-driven backtest via `run_portfolio` +- A cache-backed backtest via `run_portfolio` - A full recorded backtest via `run_backtest` (with metrics and trade output) The same strategy runs live against a broker without modification. @@ -19,21 +19,30 @@ The same strategy runs live against a broker without modification. my_backtest/ strategy.py run_backtest.py - candles.csv ``` --- -## 2. Candle CSV Format +## 2. Dukascopy Cache Input -``` -timestamp,open,high,low,close,volume -2025-01-01T00:00:00Z,1.2500,1.2510,1.2490,1.2505,1000 -2025-01-01T00:05:00Z,1.2505,1.2520,1.2500,1.2515,800 -``` +`BacktestClient.from_dukascopy_cache(...)` reads 1-minute candle files from a +Dukascopy cache directory and aggregates them to the period your strategy +subscribes to. + +Required inputs: + +- `cache_dir`: root cache directory +- `symbol`: cache symbol folder, for example `GBPUSD` +- `instrument`: instrument identifier used by your strategy +- `period`: target tradedesk period such as `5MINUTE` or `HOUR` +- `date_from` / `date_to`: inclusive date range +- `price_side`: `"bid"` or `"ask"` (`"bid"` is the default) + +Example shared cache location used at Radius Red: -Fields `volume` and `tick_count` are optional. -Timestamps may use `-` or `/` as date separators, with or without a trailing `Z`. +```text +/paperclip/tradedesk/marketdata/GBPUSD/2026/00/01_bid.csv.zst +``` --- @@ -82,6 +91,8 @@ code path as live trading. ```python # run_backtest.py +from datetime import date + from tradedesk import SimplePortfolio, run_portfolio from tradedesk.execution.backtest.client import BacktestClient @@ -89,10 +100,14 @@ from strategy import SimpleMomentumStrategy def client_factory(): - return BacktestClient.from_csv( - "candles.csv", + return BacktestClient.from_dukascopy_cache( + "/paperclip/tradedesk/marketdata", + symbol="GBPUSD", instrument="CS.D.GBPUSD.TODAY.IP", period="5MINUTE", + date_from=date(2025, 1, 1), + date_to=date(2025, 1, 31), + price_side="bid", ) @@ -112,7 +127,14 @@ To capture the client for inspection, use a closure: created = {} def client_factory(): - c = BacktestClient.from_csv("candles.csv", instrument="CS.D.GBPUSD.TODAY.IP", period="5MINUTE") + c = BacktestClient.from_dukascopy_cache( + "/paperclip/tradedesk/marketdata", + symbol="GBPUSD", + instrument="CS.D.GBPUSD.TODAY.IP", + period="5MINUTE", + date_from=date(2025, 1, 1), + date_to=date(2025, 1, 31), + ) created["client"] = c return c @@ -169,6 +191,7 @@ run_portfolio( ```python import asyncio +from datetime import date from pathlib import Path from tradedesk import SimplePortfolio @@ -181,7 +204,11 @@ async def main(): spec = BacktestSpec( instrument="CS.D.GBPUSD.TODAY.IP", period="5MINUTE", - candle_csv=Path("candles.csv"), + cache_dir=Path("/paperclip/tradedesk/marketdata"), + symbol="GBPUSD", + date_from=date(2025, 1, 1), + date_to=date(2025, 1, 31), + price_side="bid", half_spread_adjustment=0.5, # add half the spread to BID-sourced candles ) diff --git a/docs/metrics_guide.md b/docs/metrics_guide.md index 44eb058..9d7e808 100644 --- a/docs/metrics_guide.md +++ b/docs/metrics_guide.md @@ -1,6 +1,6 @@ # Performance Metrics Guide -The `tradedesk.metrics` module provides tools for analyzing trading strategy performance. +The `tradedesk.recording` module provides tools for analyzing trading strategy performance. ## Overview @@ -13,7 +13,7 @@ The metrics module helps you: ## Quick Start ```python -from tradedesk.metrics import compute_metrics +from tradedesk.recording import compute_metrics # Your trade fills trade_rows = [ @@ -44,7 +44,7 @@ print(f"Max Drawdown: {metrics.max_drawdown:.2f}") Convert fill history into complete round-trip trades: ```python -from tradedesk.metrics import round_trips_from_fills +from tradedesk.recording import round_trips_from_fills fills = [ {"instrument": "EURUSD", "direction": "BUY", "timestamp": "2025-01-01T10:00:00Z", "price": "1.1000", "size": "2"}, @@ -86,7 +86,7 @@ trips = round_trips_from_fills(fills) The `Metrics` dataclass contains comprehensive performance statistics: ```python -from tradedesk.metrics import Metrics, compute_metrics +from tradedesk.recording import Metrics, compute_metrics metrics = compute_metrics(equity_rows, trade_rows) @@ -157,7 +157,7 @@ assert metrics.final_equity == 0.0 Build an equity curve from round-trip P&L: ```python -from tradedesk.metrics import round_trips_from_fills, equity_rows_from_round_trips +from tradedesk.recording import round_trips_from_fills, equity_rows_from_round_trips fills = [ {"instrument": "EURUSD", "direction": "BUY", "timestamp": "2025-01-01T00:00:00Z", "price": "100", "size": "1"}, @@ -216,7 +216,7 @@ print(f"Avg win: ${metrics_usd.avg_win}") Calculate maximum drawdown from an equity curve: ```python -from tradedesk.metrics import max_drawdown +from tradedesk.recording.metrics import max_drawdown equity = [100, 110, 105, 95, 120, 115] dd = max_drawdown(equity) # -15.0 (peak 110, trough 95) @@ -229,7 +229,7 @@ assert max_drawdown([100, 101, 102]) == 0.0 # Monotonic up ## Complete Example: Strategy Analysis ```python -from tradedesk.metrics import compute_metrics, round_trips_from_fills +from tradedesk.recording import compute_metrics, round_trips_from_fills class StrategyAnalyzer: def __init__(self): diff --git a/docs/risk_management.md b/docs/risk_management.md index 4ad7ced..b35844e 100644 --- a/docs/risk_management.md +++ b/docs/risk_management.md @@ -1,6 +1,6 @@ # Risk Management Guide -The `tradedesk.risk` and `tradedesk.position` modules provide utilities for position sizing and position state tracking. +The `tradedesk.portfolio` and `tradedesk.execution` modules provide utilities for position sizing and position state tracking. ## Position Sizing @@ -9,7 +9,7 @@ The `tradedesk.risk` and `tradedesk.position` modules provide utilities for posi The `atr_normalised_size()` function calculates position size based on Average True Range (ATR): ```python -from tradedesk.risk import atr_normalised_size +from tradedesk.portfolio import atr_normalised_size # Calculate position size size = atr_normalised_size( @@ -61,7 +61,7 @@ size = atr_normalised_size( ```python # Adjust position size based on market volatility -from tradedesk.indicators import ATR +from tradedesk.marketdata.indicators import ATR atr_indicator = ATR(period=14) @@ -86,7 +86,7 @@ if current_atr: The `PositionTracker` class maintains state for an open position: ```python -from tradedesk.position import PositionTracker +from tradedesk.execution import PositionTracker from tradedesk.types import Direction # Create tracker @@ -155,7 +155,7 @@ assert position.mfe_points == 7.0 # 107 - 100 (updated to new max) ### Complete Position Lifecycle ```python -from tradedesk.position import PositionTracker +from tradedesk.execution import PositionTracker from tradedesk.types import Direction class MyStrategy: diff --git a/docs/strategy_guide.md b/docs/strategy_guide.md index 80f1e2e..07da5e7 100644 --- a/docs/strategy_guide.md +++ b/docs/strategy_guide.md @@ -296,8 +296,7 @@ The implementation uses two cooperating components: ```python from dataclasses import dataclass -from tradedesk.indicators.ema import EMA -from tradedesk.indicators.atr import ATR +from tradedesk.marketdata.indicators import ATR, EMA @dataclass diff --git a/tests/portfolio/test_crash_recovery.py b/tests/portfolio/test_crash_recovery.py index d65fbf9..7ec187c 100644 --- a/tests/portfolio/test_crash_recovery.py +++ b/tests/portfolio/test_crash_recovery.py @@ -20,7 +20,13 @@ from tradedesk import Direction from tradedesk.execution import BrokerPosition, PositionTracker -from tradedesk.portfolio import Instrument, JournalEntry, PositionJournal, ReconciliationManager +from tradedesk.portfolio import ( + Instrument, + JournalEntry, + PositionJournal, + ReconciliationManager, + SleeveId, +) from tradedesk.types import Candle # --------------------------------------------------------------------------- @@ -61,6 +67,7 @@ class _FakeStrategy: def __init__(self, epic: str = "") -> None: self.epic = epic + self.instrument = Instrument(epic) self.position = PositionTracker() self.entry_atr: float = 0.0 self._on_position_change: Callable[[str], None] | None = None @@ -101,7 +108,7 @@ def _build_manager( ) -> ReconciliationManager: if client is None: client = AsyncMock() - strategies = {Instrument(e): _FakeStrategy(e) for e in epics} + strategies = {SleeveId(e): _FakeStrategy(e) for e in epics} runner = MagicMock() runner.strategies = strategies mgr = ReconciliationManager( @@ -111,13 +118,13 @@ def _build_manager( target_period="HOUR", enable_event_subscription=False, ) - for inst, strat in strategies.items(): + for strat in strategies.values(): strat._on_position_change = mgr.persist_positions return mgr def _strat(mgr: ReconciliationManager, epic: str) -> _FakeStrategy: - return mgr._runner.strategies[Instrument(epic)] # type: ignore[return-value] + return mgr._runner.strategies[SleeveId(epic)] # type: ignore[return-value] @pytest.fixture diff --git a/tests/portfolio/test_orchestrator_reconciliation.py b/tests/portfolio/test_orchestrator_reconciliation.py index d2e3b9d..04745cc 100644 --- a/tests/portfolio/test_orchestrator_reconciliation.py +++ b/tests/portfolio/test_orchestrator_reconciliation.py @@ -10,7 +10,7 @@ from tradedesk import Direction from tradedesk.execution import BrokerPosition from tradedesk.execution.position import PositionTracker -from tradedesk.portfolio import Instrument, JournalEntry, PositionJournal +from tradedesk.portfolio import Instrument, JournalEntry, PositionJournal, SleeveId from tradedesk.portfolio.reconciliation import ReconciliationManager from tradedesk.types import Candle @@ -68,6 +68,7 @@ class _FakeStrategy: def __init__(self, client=None, epic="", period="", **kwargs): self.client = client self.epic = epic + self.instrument = Instrument(epic) self.position = PositionTracker() self._on_position_change = None self.entry_atr = 0.0 @@ -126,7 +127,7 @@ def _build_manager(epics, *, journal, client=None): strategies = {} for epic in epics: strat = _FakeStrategy(client=client, epic=epic) - strategies[Instrument(epic)] = strat + strategies[SleeveId(epic)] = strat runner = MagicMock() runner.strategies = strategies @@ -139,7 +140,7 @@ def _build_manager(epics, *, journal, client=None): ) # Register position-change callbacks (mirrors real orchestrator wiring) - for inst, strat in strategies.items(): + for strat in strategies.values(): strat._on_position_change = mgr.persist_positions return mgr @@ -147,7 +148,7 @@ def _build_manager(epics, *, journal, client=None): def _strat(mgr, epic): """Get strategy by epic string.""" - return mgr._runner.strategies[Instrument(epic)] + return mgr._runner.strategies[SleeveId(epic)] @pytest.fixture diff --git a/tests/portfolio/test_policy.py b/tests/portfolio/test_policy.py index 567b1ed..b3f4366 100644 --- a/tests/portfolio/test_policy.py +++ b/tests/portfolio/test_policy.py @@ -1,21 +1,32 @@ """Tests for portfolio risk allocation policies.""" from tradedesk.portfolio.risk import EqualSplitRiskPolicy -from tradedesk.portfolio.types import Instrument +from tradedesk.portfolio.types import Instrument, SleeveId -def test_equal_split_allocates_per_active_instrument(): - """Test that EqualSplitRiskPolicy divides budget equally across active instruments.""" +def test_equal_split_allocates_per_active_sleeve(): + """Test that EqualSplitRiskPolicy divides budget equally across active sleeves.""" p = EqualSplitRiskPolicy(portfolio_risk_budget=10.0) - # No active instruments -> empty allocation - assert p.allocate([]) == {} + # No active sleeves -> empty allocation + assert p.allocate({}) == {} - # One active instrument -> gets full budget - a = p.allocate([Instrument("EURUSD")]) - assert a[Instrument("EURUSD")] == 10.0 + # One active sleeve -> gets full budget + a = p.allocate({SleeveId("AdaptiveFade_EURUSD"): Instrument("CS.D.EURUSD.TODAY.IP")}) + assert a[SleeveId("AdaptiveFade_EURUSD")] == 10.0 - # Two active instruments -> split equally - ab = p.allocate([Instrument("EURUSD"), Instrument("GBPUSD")]) - assert ab[Instrument("EURUSD")] == 5.0 - assert ab[Instrument("GBPUSD")] == 5.0 + # Two active sleeves -> split equally + ab = p.allocate({ + SleeveId("AdaptiveFade_EURUSD"): Instrument("CS.D.EURUSD.TODAY.IP"), + SleeveId("BollingerReversion_GBPUSD"): Instrument("CS.D.GBPUSD.TODAY.IP"), + }) + assert ab[SleeveId("AdaptiveFade_EURUSD")] == 5.0 + assert ab[SleeveId("BollingerReversion_GBPUSD")] == 5.0 + + # Two active sleeves on the same instrument -> each gets 5.0 independently + dual = p.allocate({ + SleeveId("AdaptiveFade_AUDCAD"): Instrument("CS.D.AUDCAD.TODAY.IP"), + SleeveId("BollingerReversion_AUDCAD"): Instrument("CS.D.AUDCAD.TODAY.IP"), + }) + assert dual[SleeveId("AdaptiveFade_AUDCAD")] == 5.0 + assert dual[SleeveId("BollingerReversion_AUDCAD")] == 5.0 diff --git a/tests/portfolio/test_portfolio_runner.py b/tests/portfolio/test_portfolio_runner.py index dcb7764..930bde0 100644 --- a/tests/portfolio/test_portfolio_runner.py +++ b/tests/portfolio/test_portfolio_runner.py @@ -5,7 +5,7 @@ from tradedesk.marketdata.events import CandleClosedEvent from tradedesk.portfolio.risk import EqualSplitRiskPolicy from tradedesk.portfolio.runner import PortfolioRunner -from tradedesk.portfolio.types import Instrument +from tradedesk.portfolio.types import Instrument, SleeveId class FakeStrategy: @@ -33,16 +33,16 @@ async def evaluate_signals(self) -> None: @pytest.mark.asyncio async def test_runner_splits_risk_across_active_strategies(): - """Test that PortfolioRunner splits risk budget across active strategies.""" + """Test that PortfolioRunner splits risk budget across active sleeves.""" s1 = FakeStrategy("EURUSD", active=True) s2 = FakeStrategy("GBPUSD", active=True) s3 = FakeStrategy("USDJPY", active=False) r = PortfolioRunner( strategies={ - Instrument("EURUSD"): s1, - Instrument("GBPUSD"): s2, - Instrument("USDJPY"): s3, + SleeveId("s1"): s1, + SleeveId("s2"): s2, + SleeveId("s3"): s3, }, policy=EqualSplitRiskPolicy(portfolio_risk_budget=10.0), default_risk_per_trade=10.0, @@ -62,3 +62,39 @@ async def test_runner_splits_risk_across_active_strategies(): assert s1.evaluate_signals_calls == 1 assert s2.update_state_calls == 0 assert s2.evaluate_signals_calls == 0 + + +@pytest.mark.asyncio +async def test_runner_fans_out_to_dual_sleeve_same_instrument(): + """Both sleeves on the same instrument receive the candle and share risk budget.""" + fade = FakeStrategy("AUDCAD", active=True) + bollinger = FakeStrategy("AUDCAD", active=True) + other = FakeStrategy("EURUSD", active=False) + + r = PortfolioRunner( + strategies={ + SleeveId("AdaptiveFade_AUDCAD"): fade, + SleeveId("BollingerReversion_AUDCAD"): bollinger, + SleeveId("BollingerReversion_EURUSD"): other, + }, + policy=EqualSplitRiskPolicy(portfolio_risk_budget=10.0), + default_risk_per_trade=10.0, + ) + + await r.on_candle_close( + CandleClosedEvent(instrument=Instrument("AUDCAD"), timeframe="15MINUTE", candle=None) + ) + + # Both AUDCAD sleeves active -> 5.0 each (2 active out of 3 sleeves) + assert fade._rpt == 5.0 + assert bollinger._rpt == 5.0 + # Inactive EURUSD sleeve gets default + assert other._rpt == 10.0 + # Both AUDCAD sleeves should have processed the candle + assert fade.update_state_calls == 1 + assert fade.evaluate_signals_calls == 1 + assert bollinger.update_state_calls == 1 + assert bollinger.evaluate_signals_calls == 1 + # EURUSD sleeve should not have processed the AUDCAD candle + assert other.update_state_calls == 0 + assert other.evaluate_signals_calls == 0 diff --git a/tests/portfolio/test_reconciliation.py b/tests/portfolio/test_reconciliation.py index 174831c..5dbab1d 100644 --- a/tests/portfolio/test_reconciliation.py +++ b/tests/portfolio/test_reconciliation.py @@ -14,7 +14,7 @@ _direction_matches, reconcile, ) -from tradedesk.portfolio.types import Instrument +from tradedesk.portfolio.types import Instrument, SleeveId def _journal_entry(instrument="USDJPY", direction="long", size=1.0): @@ -194,6 +194,7 @@ class TestReconciliationManager: @pytest.fixture def mock_strategy(self): strat = Mock() + strat.instrument = Instrument("USDJPY") strat.position = Mock() strat.position.is_flat.return_value = True strat.position.direction = None @@ -209,8 +210,8 @@ def mock_strategy(self): @pytest.fixture def mock_runner(self, mock_strategy): runner = Mock() - # Populate with one default strategy - runner.strategies = {Instrument("USDJPY"): mock_strategy} + # Populate with one default strategy, keyed by SleeveId + runner.strategies = {SleeveId("USDJPY"): mock_strategy} return runner @pytest.fixture @@ -260,7 +261,7 @@ async def test_startup_orphan_adoption(self, manager, mock_client, mock_runner): restored = await manager.reconcile_on_startup() assert "USDJPY" in restored - strat = mock_runner.strategies[Instrument("USDJPY")] + strat = mock_runner.strategies[SleeveId("USDJPY")] # Verify open called strat.position.open.assert_called_with(Direction.LONG, 1.0, 150.0) # Should save to persist the adoption @@ -275,7 +276,7 @@ async def test_startup_broker_failure(self, manager, mock_client, mock_journal, restored = await manager.reconcile_on_startup() assert "USDJPY" in restored - strat = mock_runner.strategies[Instrument("USDJPY")] + strat = mock_runner.strategies[SleeveId("USDJPY")] strat.restore_from_journal.assert_called_once() @pytest.mark.asyncio @@ -289,7 +290,7 @@ async def test_periodic_reconcile_correction(self, manager, mock_client, mock_ru # Force reconcile await manager.periodic_reconcile() - strat = mock_runner.strategies[Instrument("USDJPY")] + strat = mock_runner.strategies[SleeveId("USDJPY")] strat.position.open.assert_called_with(Direction.LONG, 1.0, 150.0) manager._journal.save.assert_called_once() @@ -297,7 +298,7 @@ async def test_periodic_reconcile_correction(self, manager, mock_client, mock_ru async def test_post_warmup_check(self, manager, mock_client, mock_runner): """Verifies exit check on restored positions.""" # Setup strategy to look like it has a position - strat = mock_runner.strategies[Instrument("USDJPY")] + strat = mock_runner.strategies[SleeveId("USDJPY")] strat.position.is_flat.return_value = False mock_client.get_historical_candles.return_value = [Mock(close=155.0)] diff --git a/tradedesk/portfolio/__init__.py b/tradedesk/portfolio/__init__.py index ea30802..4fff9aa 100644 --- a/tradedesk/portfolio/__init__.py +++ b/tradedesk/portfolio/__init__.py @@ -19,6 +19,7 @@ Instrument, PortfolioStrategy, ReconcilableStrategy, + SleeveId, StrategySpec, ) @@ -42,6 +43,7 @@ "ReconciliationResult", "RiskAllocationPolicy", "SimplePortfolio", + "SleeveId", "StrategySpec", "WeightedRollingTracker", "atr_normalised_size", diff --git a/tradedesk/portfolio/reconciliation.py b/tradedesk/portfolio/reconciliation.py index eee0dc2..db5275d 100644 --- a/tradedesk/portfolio/reconciliation.py +++ b/tradedesk/portfolio/reconciliation.py @@ -11,7 +11,7 @@ from ..types import Direction from .journal import JournalEntry, PositionJournal from .runner import PortfolioRunner -from .types import Instrument, ReconcilableStrategy +from .types import ReconcilableStrategy log = logging.getLogger(__name__) @@ -324,7 +324,7 @@ async def reconcile_on_startup(self) -> set[str]: restored_instruments = self._restore_from_journal(journal_positions) return restored_instruments - managed_instruments = {str(inst) for inst in self._runner.strategies.keys()} + managed_instruments = {str(s.instrument) for s in self._runner.strategies.values()} result = reconcile( journal_positions=journal_positions, @@ -398,9 +398,9 @@ async def reconcile_on_startup(self) -> set[str]: def _restore_from_journal(self, journal_positions: dict[str, JournalEntry]) -> set[str]: """Restore positions from journal only (broker unreachable).""" restored: set[str] = set() - for inst, s in self._runner.strategies.items(): + for s in self._runner.strategies.values(): strat = cast(ReconcilableStrategy, s) - epic = str(inst) + epic = str(s.instrument) entry = journal_positions.get(epic) if entry is None or entry.direction is None: continue @@ -428,9 +428,9 @@ def _apply_reconciliation( broker_by_instrument = {bp.instrument: bp for bp in broker_positions} restored: set[str] = set() - for inst, s in self._runner.strategies.items(): + for s in self._runner.strategies.values(): strat = cast(ReconcilableStrategy, s) - epic = str(inst) + epic = str(s.instrument) entry = next((e for e in result.entries if e.instrument == epic), None) if entry is None: continue @@ -511,9 +511,9 @@ async def post_warmup_check(self, restored_instruments: set[str]) -> None: or adopted might be stale (e.g. stop-loss breached, regime deactivated). This method checks each one and exits immediately if warranted. """ - for inst, s in self._runner.strategies.items(): + for s in self._runner.strategies.values(): strat = cast(ReconcilableStrategy, s) - epic = str(inst) + epic = str(s.instrument) if epic not in restored_instruments: continue if strat.position.is_flat(): @@ -558,9 +558,9 @@ def persist_positions(self, changed_epic: str = "") -> None: return entries = [] - for inst, s in self._runner.strategies.items(): + for s in self._runner.strategies.values(): strat = cast(ReconcilableStrategy, s) - entries.append(strat.to_journal_entry(str(inst))) + entries.append(strat.to_journal_entry(str(s.instrument))) self._journal.save(entries) @@ -578,12 +578,12 @@ async def periodic_reconcile(self) -> None: # Build current local state journal_positions: dict[str, JournalEntry] = {} - for inst, s in self._runner.strategies.items(): + for s in self._runner.strategies.values(): strat = cast(ReconcilableStrategy, s) - epic = str(inst) + epic = str(s.instrument) journal_positions[epic] = strat.to_journal_entry(epic) - managed_instruments = {str(inst) for inst in self._runner.strategies.keys()} + managed_instruments = {str(s.instrument) for s in self._runner.strategies.values()} # Exclude instruments with recent position changes to avoid settlement race skipped = self._recently_changed_instruments.copy() @@ -613,7 +613,11 @@ async def periodic_reconcile(self) -> None: if entry.discrepancy == DiscrepancyType.MATCHED: continue - maybe_strat = self._runner.strategies.get(Instrument(entry.instrument)) + maybe_strat = next( + (s for s in self._runner.strategies.values() + if str(s.instrument) == entry.instrument), + None, + ) if maybe_strat is None: continue strat = cast(ReconcilableStrategy, maybe_strat) diff --git a/tradedesk/portfolio/risk.py b/tradedesk/portfolio/risk.py index 7f9f74b..398ab09 100644 --- a/tradedesk/portfolio/risk.py +++ b/tradedesk/portfolio/risk.py @@ -4,7 +4,7 @@ from dataclasses import dataclass from typing import Mapping -from .types import Instrument +from .types import Instrument, SleeveId def atr_normalised_size( @@ -43,19 +43,26 @@ class RiskAllocationPolicy(ABC): Base class for portfolio risk allocation policies. Risk allocation policies determine how to distribute a portfolio's risk budget - across multiple instruments based on regime activity or other criteria. + across multiple strategy sleeves based on regime activity or other criteria. + + Allocation is keyed by ``SleeveId`` so that two strategies on the same + instrument (e.g. ``AdaptiveFade_AUDCAD`` and ``BollingerReversion_AUDCAD``) + receive independent risk budgets. """ @abstractmethod - def allocate(self, active_instruments: list[Instrument]) -> Mapping[Instrument, float]: + def allocate(self, active_sleeves: Mapping[SleeveId, Instrument]) -> Mapping[SleeveId, float]: """ - Allocate risk budget across active instruments. + Allocate risk budget across active strategy sleeves. Args: - active_instruments: List of instruments to allocate risk to + active_sleeves: Mapping of SleeveId to its underlying Instrument for + strategies whose regime is currently active. Passing the instrument + allows policies to look up per-instrument history even when sleeve + names differ from raw instrument symbols. Returns: - Mapping of instrument to allocated risk amount (typically used as risk_per_trade) + Mapping of SleeveId to allocated risk amount (used as risk_per_trade). """ pass @@ -63,27 +70,27 @@ def allocate(self, active_instruments: list[Instrument]) -> Mapping[Instrument, @dataclass(frozen=True) class EqualSplitRiskPolicy(RiskAllocationPolicy): """ - Split a fixed portfolio risk budget across concurrently active regimes. + Split a fixed portfolio risk budget equally across concurrently active sleeves. Semantics: - - If k active regimes: allocate budget/k to each active instrument. - - If k == 0: allocate nothing (caller decides what to do when no regimes active). + - If k active sleeves: allocate budget/k to each active sleeve. + - If k == 0: allocate nothing (caller falls back to default_risk_per_trade). """ portfolio_risk_budget: float - def allocate(self, active_instruments: list[Instrument]) -> Mapping[Instrument, float]: + def allocate(self, active_sleeves: Mapping[SleeveId, Instrument]) -> Mapping[SleeveId, float]: """ - Allocate risk budget across active instruments. + Allocate risk budget across active strategy sleeves. Args: - active_instruments: List of instruments with active regimes + active_sleeves: Mapping of SleeveId to its underlying Instrument. Returns: - Mapping of instrument to allocated risk amount + Mapping of SleeveId to allocated risk amount. """ - if not active_instruments: + if not active_sleeves: return {} - k = len(active_instruments) + k = len(active_sleeves) per = float(self.portfolio_risk_budget) / float(k) - return {inst: per for inst in active_instruments} + return {sleeve: per for sleeve in active_sleeves} diff --git a/tradedesk/portfolio/runner.py b/tradedesk/portfolio/runner.py index f0f2c6f..47755fd 100644 --- a/tradedesk/portfolio/runner.py +++ b/tradedesk/portfolio/runner.py @@ -5,7 +5,7 @@ from tradedesk.marketdata import CandleClosedEvent from .risk import RiskAllocationPolicy -from .types import Instrument, PortfolioStrategy +from .types import Instrument, PortfolioStrategy, SleeveId @dataclass @@ -14,28 +14,36 @@ class PortfolioRunner: Client-agnostic portfolio orchestrator. Responsibilities: - - Maintain a set of per-instrument strategies + - Maintain a set of per-sleeve strategies, keyed by SleeveId - Compute active set k from strategy state (previous close) - Apply risk policy before processing the next candle close - - Forward candle events to the relevant strategy + - Forward candle events to all strategies on the relevant instrument Does NOT: - Place orders (strategies + their clients do that) - Perform portfolio rebalancing - Attempt to increase utilisation + + Two strategies on the same instrument (e.g. AdaptiveFade_AUDCAD and + BollingerReversion_AUDCAD) are stored under distinct SleeveIds. When a + candle arrives for AUDCAD, both sleeves receive the event. """ - strategies: dict[Instrument, PortfolioStrategy] + strategies: dict[SleeveId, PortfolioStrategy] policy: RiskAllocationPolicy default_risk_per_trade: float - def _active_instruments(self) -> list[Instrument]: - """Get list of instruments with active regimes.""" - return [inst for inst, s in self.strategies.items() if s.is_regime_active()] + def _active_sleeve_instruments(self) -> dict[SleeveId, Instrument]: + """Return mapping of active SleeveId -> underlying Instrument.""" + return { + sid: s.instrument + for sid, s in self.strategies.items() + if s.is_regime_active() + } def _apply_risk_budgets(self) -> None: """Apply risk allocation policy to all strategies.""" - active = self._active_instruments() + active = self._active_sleeve_instruments() alloc = self.policy.allocate(active) # If no regimes active, revert to default risk for all strategies. @@ -44,11 +52,10 @@ def _apply_risk_budgets(self) -> None: s.set_risk_per_trade(float(self.default_risk_per_trade)) return - # If some active, set active strategies to allocated risk, - # inactive strategies to default (so if they activate this bar, they use default for now). - for inst, s in self.strategies.items(): - if inst in alloc: - s.set_risk_per_trade(float(alloc[inst])) + # Active sleeves get allocated risk; inactive sleeves get default. + for sid, s in self.strategies.items(): + if sid in alloc: + s.set_risk_per_trade(float(alloc[sid])) else: s.set_risk_per_trade(float(self.default_risk_per_trade)) @@ -56,27 +63,31 @@ async def on_candle_close(self, event: CandleClosedEvent) -> None: """ Process a candle close event using two-phase lifecycle. - Phase 1: Update state (indicators, regime, position tracking) - Phase 2: Apply risk budgets based on updated regime state - Phase 3: Evaluate signals and execute trades with correct allocations + All strategy sleeves whose instrument matches the event receive the + event. This supports multiple sleeves on the same instrument (e.g. + dual-strategy AUDCAD trading). + + Phase 1: Update state for all matching sleeves + Phase 2: Apply risk budgets across the whole portfolio + Phase 3: Evaluate signals for all matching sleeves Args: event: Candle close event with instrument, period, and candle data """ - from .types import Instrument - - strat = self.strategies.get(Instrument(event.instrument)) - if strat is None: + matching = [ + s for s in self.strategies.values() + if s.instrument == Instrument(event.instrument) + ] + if not matching: return - # Phase 1: Update strategy state (indicators, regime, etc.) - # Regime state may change during this phase - await strat.update_state(event) + # Phase 1: Update state for all matching sleeves + for strat in matching: + await strat.update_state(event) - # Phase 2: Apply risk budgets based on current (updated) regime state - # This ensures allocations reflect any regime changes from phase 1 + # Phase 2: Apply risk budgets based on updated regime state (portfolio-wide) self._apply_risk_budgets() - # Phase 3: Evaluate signals and execute trades - # Strategies now have correct risk allocations for entries/exits - await strat.evaluate_signals() + # Phase 3: Evaluate signals for all matching sleeves + for strat in matching: + await strat.evaluate_signals() diff --git a/tradedesk/portfolio/types.py b/tradedesk/portfolio/types.py index 96cff4c..a896ccf 100644 --- a/tradedesk/portfolio/types.py +++ b/tradedesk/portfolio/types.py @@ -5,7 +5,7 @@ instruments. """ -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, Callable, NewType, Protocol if TYPE_CHECKING: @@ -15,6 +15,7 @@ from .journal import JournalEntry Instrument = NewType("Instrument", str) +SleeveId = NewType("SleeveId", str) @dataclass(frozen=True) @@ -31,12 +32,17 @@ class StrategySpec: strategy_cls: The strategy class to instantiate. kwargs: A dictionary of keyword arguments to pass to the strategy's `__init__` method during instantiation. + sleeve_name: Optional unique name for this strategy sleeve. If not + provided, defaults to ``"{ClassName}_{instrument}"``. Must be + unique across all sleeves in a portfolio — two sleeves on the + same instrument require explicit distinct names. """ instrument: str period: str strategy_cls: Callable[..., Any] kwargs: dict[str, Any] + sleeve_name: str | None = field(default=None) class PortfolioStrategy(Protocol):