Skip to content

Commit 8c0569e

Browse files
Cody (Radius Red)Paperclip-Paperclip
authored andcommitted
feat(portfolio): introduce SleeveId as allocation key with instrument-aware policy lookup (RAD-67)
- Added SleeveId NewType and sleeve_name field on StrategySpec - RiskAllocationPolicy.allocate() signature changed from list[SleeveId] to Mapping[SleeveId, Instrument] so policies resolve actual instrument history - PortfolioRunner._apply_risk_budgets() builds sleeve->instrument mapping - PortfolioRunner.strategies keyed by SleeveId; on_candle_close fans out to all sleeves matching the instrument - Reconciliation derives epic from str(instrument), not sleeve key - All portfolio tests updated Co-Authored-By: Paperclip <[email protected]>
1 parent 4014715 commit 8c0569e

10 files changed

Lines changed: 177 additions & 91 deletions

File tree

tests/portfolio/test_crash_recovery.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020

2121
from tradedesk import Direction
2222
from tradedesk.execution import BrokerPosition, PositionTracker
23-
from tradedesk.portfolio import Instrument, JournalEntry, PositionJournal, ReconciliationManager
23+
from tradedesk.portfolio import (
24+
Instrument,
25+
JournalEntry,
26+
PositionJournal,
27+
ReconciliationManager,
28+
SleeveId,
29+
)
2430
from tradedesk.types import Candle
2531

2632
# ---------------------------------------------------------------------------
@@ -61,6 +67,7 @@ class _FakeStrategy:
6167

6268
def __init__(self, epic: str = "") -> None:
6369
self.epic = epic
70+
self.instrument = Instrument(epic)
6471
self.position = PositionTracker()
6572
self.entry_atr: float = 0.0
6673
self._on_position_change: Callable[[str], None] | None = None
@@ -101,7 +108,7 @@ def _build_manager(
101108
) -> ReconciliationManager:
102109
if client is None:
103110
client = AsyncMock()
104-
strategies = {Instrument(e): _FakeStrategy(e) for e in epics}
111+
strategies = {SleeveId(e): _FakeStrategy(e) for e in epics}
105112
runner = MagicMock()
106113
runner.strategies = strategies
107114
mgr = ReconciliationManager(
@@ -111,13 +118,13 @@ def _build_manager(
111118
target_period="HOUR",
112119
enable_event_subscription=False,
113120
)
114-
for inst, strat in strategies.items():
121+
for strat in strategies.values():
115122
strat._on_position_change = mgr.persist_positions
116123
return mgr
117124

118125

119126
def _strat(mgr: ReconciliationManager, epic: str) -> _FakeStrategy:
120-
return mgr._runner.strategies[Instrument(epic)] # type: ignore[return-value]
127+
return mgr._runner.strategies[SleeveId(epic)] # type: ignore[return-value]
121128

122129

123130
@pytest.fixture

tests/portfolio/test_orchestrator_reconciliation.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from tradedesk import Direction
1111
from tradedesk.execution import BrokerPosition
1212
from tradedesk.execution.position import PositionTracker
13-
from tradedesk.portfolio import Instrument, JournalEntry, PositionJournal
13+
from tradedesk.portfolio import Instrument, JournalEntry, PositionJournal, SleeveId
1414
from tradedesk.portfolio.reconciliation import ReconciliationManager
1515
from tradedesk.types import Candle
1616

@@ -68,6 +68,7 @@ class _FakeStrategy:
6868
def __init__(self, client=None, epic="", period="", **kwargs):
6969
self.client = client
7070
self.epic = epic
71+
self.instrument = Instrument(epic)
7172
self.position = PositionTracker()
7273
self._on_position_change = None
7374
self.entry_atr = 0.0
@@ -126,7 +127,7 @@ def _build_manager(epics, *, journal, client=None):
126127
strategies = {}
127128
for epic in epics:
128129
strat = _FakeStrategy(client=client, epic=epic)
129-
strategies[Instrument(epic)] = strat
130+
strategies[SleeveId(epic)] = strat
130131

131132
runner = MagicMock()
132133
runner.strategies = strategies
@@ -139,15 +140,15 @@ def _build_manager(epics, *, journal, client=None):
139140
)
140141

141142
# Register position-change callbacks (mirrors real orchestrator wiring)
142-
for inst, strat in strategies.items():
143+
for strat in strategies.values():
143144
strat._on_position_change = mgr.persist_positions
144145

145146
return mgr
146147

147148

148149
def _strat(mgr, epic):
149150
"""Get strategy by epic string."""
150-
return mgr._runner.strategies[Instrument(epic)]
151+
return mgr._runner.strategies[SleeveId(epic)]
151152

152153

153154
@pytest.fixture

tests/portfolio/test_policy.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,32 @@
11
"""Tests for portfolio risk allocation policies."""
22

33
from tradedesk.portfolio.risk import EqualSplitRiskPolicy
4-
from tradedesk.portfolio.types import Instrument
4+
from tradedesk.portfolio.types import Instrument, SleeveId
55

66

7-
def test_equal_split_allocates_per_active_instrument():
8-
"""Test that EqualSplitRiskPolicy divides budget equally across active instruments."""
7+
def test_equal_split_allocates_per_active_sleeve():
8+
"""Test that EqualSplitRiskPolicy divides budget equally across active sleeves."""
99
p = EqualSplitRiskPolicy(portfolio_risk_budget=10.0)
1010

11-
# No active instruments -> empty allocation
12-
assert p.allocate([]) == {}
11+
# No active sleeves -> empty allocation
12+
assert p.allocate({}) == {}
1313

14-
# One active instrument -> gets full budget
15-
a = p.allocate([Instrument("EURUSD")])
16-
assert a[Instrument("EURUSD")] == 10.0
14+
# One active sleeve -> gets full budget
15+
a = p.allocate({SleeveId("AdaptiveFade_EURUSD"): Instrument("CS.D.EURUSD.TODAY.IP")})
16+
assert a[SleeveId("AdaptiveFade_EURUSD")] == 10.0
1717

18-
# Two active instruments -> split equally
19-
ab = p.allocate([Instrument("EURUSD"), Instrument("GBPUSD")])
20-
assert ab[Instrument("EURUSD")] == 5.0
21-
assert ab[Instrument("GBPUSD")] == 5.0
18+
# Two active sleeves -> split equally
19+
ab = p.allocate({
20+
SleeveId("AdaptiveFade_EURUSD"): Instrument("CS.D.EURUSD.TODAY.IP"),
21+
SleeveId("BollingerReversion_GBPUSD"): Instrument("CS.D.GBPUSD.TODAY.IP"),
22+
})
23+
assert ab[SleeveId("AdaptiveFade_EURUSD")] == 5.0
24+
assert ab[SleeveId("BollingerReversion_GBPUSD")] == 5.0
25+
26+
# Two active sleeves on the same instrument -> each gets 5.0 independently
27+
dual = p.allocate({
28+
SleeveId("AdaptiveFade_AUDCAD"): Instrument("CS.D.AUDCAD.TODAY.IP"),
29+
SleeveId("BollingerReversion_AUDCAD"): Instrument("CS.D.AUDCAD.TODAY.IP"),
30+
})
31+
assert dual[SleeveId("AdaptiveFade_AUDCAD")] == 5.0
32+
assert dual[SleeveId("BollingerReversion_AUDCAD")] == 5.0

tests/portfolio/test_portfolio_runner.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from tradedesk.marketdata.events import CandleClosedEvent
66
from tradedesk.portfolio.risk import EqualSplitRiskPolicy
77
from tradedesk.portfolio.runner import PortfolioRunner
8-
from tradedesk.portfolio.types import Instrument
8+
from tradedesk.portfolio.types import Instrument, SleeveId
99

1010

1111
class FakeStrategy:
@@ -33,16 +33,16 @@ async def evaluate_signals(self) -> None:
3333

3434
@pytest.mark.asyncio
3535
async def test_runner_splits_risk_across_active_strategies():
36-
"""Test that PortfolioRunner splits risk budget across active strategies."""
36+
"""Test that PortfolioRunner splits risk budget across active sleeves."""
3737
s1 = FakeStrategy("EURUSD", active=True)
3838
s2 = FakeStrategy("GBPUSD", active=True)
3939
s3 = FakeStrategy("USDJPY", active=False)
4040

4141
r = PortfolioRunner(
4242
strategies={
43-
Instrument("EURUSD"): s1,
44-
Instrument("GBPUSD"): s2,
45-
Instrument("USDJPY"): s3,
43+
SleeveId("s1"): s1,
44+
SleeveId("s2"): s2,
45+
SleeveId("s3"): s3,
4646
},
4747
policy=EqualSplitRiskPolicy(portfolio_risk_budget=10.0),
4848
default_risk_per_trade=10.0,
@@ -62,3 +62,39 @@ async def test_runner_splits_risk_across_active_strategies():
6262
assert s1.evaluate_signals_calls == 1
6363
assert s2.update_state_calls == 0
6464
assert s2.evaluate_signals_calls == 0
65+
66+
67+
@pytest.mark.asyncio
68+
async def test_runner_fans_out_to_dual_sleeve_same_instrument():
69+
"""Both sleeves on the same instrument receive the candle and share risk budget."""
70+
fade = FakeStrategy("AUDCAD", active=True)
71+
bollinger = FakeStrategy("AUDCAD", active=True)
72+
other = FakeStrategy("EURUSD", active=False)
73+
74+
r = PortfolioRunner(
75+
strategies={
76+
SleeveId("AdaptiveFade_AUDCAD"): fade,
77+
SleeveId("BollingerReversion_AUDCAD"): bollinger,
78+
SleeveId("BollingerReversion_EURUSD"): other,
79+
},
80+
policy=EqualSplitRiskPolicy(portfolio_risk_budget=10.0),
81+
default_risk_per_trade=10.0,
82+
)
83+
84+
await r.on_candle_close(
85+
CandleClosedEvent(instrument=Instrument("AUDCAD"), timeframe="15MINUTE", candle=None)
86+
)
87+
88+
# Both AUDCAD sleeves active -> 5.0 each (2 active out of 3 sleeves)
89+
assert fade._rpt == 5.0
90+
assert bollinger._rpt == 5.0
91+
# Inactive EURUSD sleeve gets default
92+
assert other._rpt == 10.0
93+
# Both AUDCAD sleeves should have processed the candle
94+
assert fade.update_state_calls == 1
95+
assert fade.evaluate_signals_calls == 1
96+
assert bollinger.update_state_calls == 1
97+
assert bollinger.evaluate_signals_calls == 1
98+
# EURUSD sleeve should not have processed the AUDCAD candle
99+
assert other.update_state_calls == 0
100+
assert other.evaluate_signals_calls == 0

tests/portfolio/test_reconciliation.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
_direction_matches,
1515
reconcile,
1616
)
17-
from tradedesk.portfolio.types import Instrument
17+
from tradedesk.portfolio.types import Instrument, SleeveId
1818

1919

2020
def _journal_entry(instrument="USDJPY", direction="long", size=1.0):
@@ -194,6 +194,7 @@ class TestReconciliationManager:
194194
@pytest.fixture
195195
def mock_strategy(self):
196196
strat = Mock()
197+
strat.instrument = Instrument("USDJPY")
197198
strat.position = Mock()
198199
strat.position.is_flat.return_value = True
199200
strat.position.direction = None
@@ -209,8 +210,8 @@ def mock_strategy(self):
209210
@pytest.fixture
210211
def mock_runner(self, mock_strategy):
211212
runner = Mock()
212-
# Populate with one default strategy
213-
runner.strategies = {Instrument("USDJPY"): mock_strategy}
213+
# Populate with one default strategy, keyed by SleeveId
214+
runner.strategies = {SleeveId("USDJPY"): mock_strategy}
214215
return runner
215216

216217
@pytest.fixture
@@ -260,7 +261,7 @@ async def test_startup_orphan_adoption(self, manager, mock_client, mock_runner):
260261
restored = await manager.reconcile_on_startup()
261262

262263
assert "USDJPY" in restored
263-
strat = mock_runner.strategies[Instrument("USDJPY")]
264+
strat = mock_runner.strategies[SleeveId("USDJPY")]
264265
# Verify open called
265266
strat.position.open.assert_called_with(Direction.LONG, 1.0, 150.0)
266267
# Should save to persist the adoption
@@ -275,7 +276,7 @@ async def test_startup_broker_failure(self, manager, mock_client, mock_journal,
275276
restored = await manager.reconcile_on_startup()
276277

277278
assert "USDJPY" in restored
278-
strat = mock_runner.strategies[Instrument("USDJPY")]
279+
strat = mock_runner.strategies[SleeveId("USDJPY")]
279280
strat.restore_from_journal.assert_called_once()
280281

281282
@pytest.mark.asyncio
@@ -289,15 +290,15 @@ async def test_periodic_reconcile_correction(self, manager, mock_client, mock_ru
289290
# Force reconcile
290291
await manager.periodic_reconcile()
291292

292-
strat = mock_runner.strategies[Instrument("USDJPY")]
293+
strat = mock_runner.strategies[SleeveId("USDJPY")]
293294
strat.position.open.assert_called_with(Direction.LONG, 1.0, 150.0)
294295
manager._journal.save.assert_called_once()
295296

296297
@pytest.mark.asyncio
297298
async def test_post_warmup_check(self, manager, mock_client, mock_runner):
298299
"""Verifies exit check on restored positions."""
299300
# Setup strategy to look like it has a position
300-
strat = mock_runner.strategies[Instrument("USDJPY")]
301+
strat = mock_runner.strategies[SleeveId("USDJPY")]
301302
strat.position.is_flat.return_value = False
302303

303304
mock_client.get_historical_candles.return_value = [Mock(close=155.0)]

tradedesk/portfolio/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
Instrument,
2020
PortfolioStrategy,
2121
ReconcilableStrategy,
22+
SleeveId,
2223
StrategySpec,
2324
)
2425

@@ -42,6 +43,7 @@
4243
"ReconciliationResult",
4344
"RiskAllocationPolicy",
4445
"SimplePortfolio",
46+
"SleeveId",
4547
"StrategySpec",
4648
"WeightedRollingTracker",
4749
"atr_normalised_size",

tradedesk/portfolio/reconciliation.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from ..types import Direction
1212
from .journal import JournalEntry, PositionJournal
1313
from .runner import PortfolioRunner
14-
from .types import Instrument, ReconcilableStrategy
14+
from .types import ReconcilableStrategy
1515

1616
log = logging.getLogger(__name__)
1717

@@ -324,7 +324,7 @@ async def reconcile_on_startup(self) -> set[str]:
324324
restored_instruments = self._restore_from_journal(journal_positions)
325325
return restored_instruments
326326

327-
managed_instruments = {str(inst) for inst in self._runner.strategies.keys()}
327+
managed_instruments = {str(s.instrument) for s in self._runner.strategies.values()}
328328

329329
result = reconcile(
330330
journal_positions=journal_positions,
@@ -398,9 +398,9 @@ async def reconcile_on_startup(self) -> set[str]:
398398
def _restore_from_journal(self, journal_positions: dict[str, JournalEntry]) -> set[str]:
399399
"""Restore positions from journal only (broker unreachable)."""
400400
restored: set[str] = set()
401-
for inst, s in self._runner.strategies.items():
401+
for s in self._runner.strategies.values():
402402
strat = cast(ReconcilableStrategy, s)
403-
epic = str(inst)
403+
epic = str(s.instrument)
404404
entry = journal_positions.get(epic)
405405
if entry is None or entry.direction is None:
406406
continue
@@ -428,9 +428,9 @@ def _apply_reconciliation(
428428
broker_by_instrument = {bp.instrument: bp for bp in broker_positions}
429429
restored: set[str] = set()
430430

431-
for inst, s in self._runner.strategies.items():
431+
for s in self._runner.strategies.values():
432432
strat = cast(ReconcilableStrategy, s)
433-
epic = str(inst)
433+
epic = str(s.instrument)
434434
entry = next((e for e in result.entries if e.instrument == epic), None)
435435
if entry is None:
436436
continue
@@ -511,9 +511,9 @@ async def post_warmup_check(self, restored_instruments: set[str]) -> None:
511511
or adopted might be stale (e.g. stop-loss breached, regime deactivated).
512512
This method checks each one and exits immediately if warranted.
513513
"""
514-
for inst, s in self._runner.strategies.items():
514+
for s in self._runner.strategies.values():
515515
strat = cast(ReconcilableStrategy, s)
516-
epic = str(inst)
516+
epic = str(s.instrument)
517517
if epic not in restored_instruments:
518518
continue
519519
if strat.position.is_flat():
@@ -558,9 +558,9 @@ def persist_positions(self, changed_epic: str = "") -> None:
558558
return
559559

560560
entries = []
561-
for inst, s in self._runner.strategies.items():
561+
for s in self._runner.strategies.values():
562562
strat = cast(ReconcilableStrategy, s)
563-
entries.append(strat.to_journal_entry(str(inst)))
563+
entries.append(strat.to_journal_entry(str(s.instrument)))
564564

565565
self._journal.save(entries)
566566

@@ -578,12 +578,12 @@ async def periodic_reconcile(self) -> None:
578578

579579
# Build current local state
580580
journal_positions: dict[str, JournalEntry] = {}
581-
for inst, s in self._runner.strategies.items():
581+
for s in self._runner.strategies.values():
582582
strat = cast(ReconcilableStrategy, s)
583-
epic = str(inst)
583+
epic = str(s.instrument)
584584
journal_positions[epic] = strat.to_journal_entry(epic)
585585

586-
managed_instruments = {str(inst) for inst in self._runner.strategies.keys()}
586+
managed_instruments = {str(s.instrument) for s in self._runner.strategies.values()}
587587

588588
# Exclude instruments with recent position changes to avoid settlement race
589589
skipped = self._recently_changed_instruments.copy()
@@ -613,7 +613,11 @@ async def periodic_reconcile(self) -> None:
613613
if entry.discrepancy == DiscrepancyType.MATCHED:
614614
continue
615615

616-
maybe_strat = self._runner.strategies.get(Instrument(entry.instrument))
616+
maybe_strat = next(
617+
(s for s in self._runner.strategies.values()
618+
if str(s.instrument) == entry.instrument),
619+
None,
620+
)
617621
if maybe_strat is None:
618622
continue
619623
strat = cast(ReconcilableStrategy, maybe_strat)

0 commit comments

Comments
 (0)