Skip to content

Commit

Permalink
correlations
Browse files Browse the repository at this point in the history
  • Loading branch information
Idan Yael committed Feb 8, 2022
1 parent 5e9143f commit bfd8f7a
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 14 deletions.
6 changes: 6 additions & 0 deletions src/assets/correlation_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"groups": [
["AAPL", "MSFT"],
["CHKP", "FB"]
]
}
8 changes: 8 additions & 0 deletions src/correlation.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"groups": [
[
"AAPL",
"MSFT"
]
]
}
4 changes: 2 additions & 2 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
# LoadersPipelines.build_technicals_calculator().run()
# LoadersPipelines.build_returns_calculator().run()

# LoadersPipelines.build_technicals_with_buckets_calculator('bins.json', BIN_COUNT).run()
LoadersPipelines.build_technicals_with_buckets_calculator('bins.json', BIN_COUNT, 'correlation.json').run()
# LoadersPipelines.build_technicals_with_buckets_matcher('bins.json').run()
BacktestPipelines.build_mongodb_history_buckets_backtester('bins.json').run()
# BacktestPipelines.build_mongodb_history_buckets_backtester('bins.json').run()
# BacktestPipelines.build_mongodb_history_similarity_backtester('bins.json').run()

28 changes: 20 additions & 8 deletions src/pipeline/builders/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from assets.assets_provider import AssetsProvider
from entities.timespan import TimeSpan
from pipeline.processor import Processor
from pipeline.processors.assets_correlation import AssetCorrelationProcessor
from pipeline.processors.candle_cache import CandleCache
from pipeline.processors.mongodb_sink import MongoDBSinkProcessor
from pipeline.processors.returns import ReturnsCalculatorProcessor
from pipeline.processors.technicals import TechnicalsProcessor
from pipeline.processors.technicals_buckets_matcher import TechnicalsBucketsMatcher
from pipeline.processors.technicals_normalizer import TechnicalsNormalizerProcessor
from pipeline.processors.timespan_change import TimeSpanChangeProcessor
from pipeline.reverse_source import ReverseSource
from pipeline.runner import PipelineRunner
from pipeline.source import Source
Expand All @@ -19,7 +21,7 @@
from providers.ib.interactive_brokers_connector import InteractiveBrokersConnector
from storage.mongodb_storage import MongoDBStorage

DEFAULT_DAYS_BACK = 365 * 3
DEFAULT_DAYS_BACK = 365 * 1


class LoadersPipelines:
Expand Down Expand Up @@ -63,19 +65,26 @@ def _build_mongo_source(days_back: int) -> Source:
return source

@staticmethod
def _build_technicals_base_processor_chain(bins_file_path: Optional[str] = None) -> Processor:
def _build_technicals_base_processor_chain(bins_file_path: Optional[str] = None,
correlations_file_path: Optional[str] = None) -> Processor:
mongodb_storage = MongoDBStorage()
sink = MongoDBSinkProcessor(mongodb_storage)
cache_processor = CandleCache(sink)

bucket_matcher: Optional[TechnicalsBucketsMatcher] = None
latest_processor = cache_processor

if bins_file_path:
bucket_matcher = TechnicalsBucketsMatcher(bins_file_path, next_processor=cache_processor)
bucket_matcher = TechnicalsBucketsMatcher(bins_file_path, next_processor=latest_processor)
latest_processor = bucket_matcher

if correlations_file_path:
asset_correlation = AssetCorrelationProcessor(correlations_file_path, next_processor=latest_processor)
latest_processor = asset_correlation

technical_normalizer = TechnicalsNormalizerProcessor(
next_processor=bucket_matcher if bins_file_path else cache_processor)
technical_normalizer = TechnicalsNormalizerProcessor(next_processor=latest_processor)
technicals = TechnicalsProcessor(technical_normalizer)
return technicals
timespan_change_processor = TimeSpanChangeProcessor(TimeSpan.Day, technicals)
return timespan_change_processor

@staticmethod
def build_technicals_calculator(days_back: int = DEFAULT_DAYS_BACK) -> PipelineRunner:
Expand All @@ -85,9 +94,11 @@ def build_technicals_calculator(days_back: int = DEFAULT_DAYS_BACK) -> PipelineR

@staticmethod
def build_technicals_with_buckets_calculator(bins_file_path: str, bins_count: int,
correlations_file_path: str,
days_back: int = DEFAULT_DAYS_BACK) -> PipelineRunner:
source = LoadersPipelines._build_mongo_source(days_back)
technicals = LoadersPipelines._build_technicals_base_processor_chain()
technicals = LoadersPipelines._build_technicals_base_processor_chain(
correlations_file_path=correlations_file_path)

symbols = AssetsProvider.get_sp500_symbols()
technicals_binner = TechnicalsBinner(symbols, bins_count, bins_file_path)
Expand All @@ -96,6 +107,7 @@ def build_technicals_with_buckets_calculator(bins_file_path: str, bins_count: in

@staticmethod
def build_technicals_with_buckets_matcher(bins_file_path: str,
correlations_file_path: str,
days_back: int = DEFAULT_DAYS_BACK) -> PipelineRunner:
source = LoadersPipelines._build_mongo_source(days_back)

Expand Down
5 changes: 4 additions & 1 deletion src/pipeline/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ def process(self, context: SharedContext, candle: Candle):
if self.next_processor:
self.next_processor.process(context, candle)

def reprocess(self, context: SharedContext, candle: Candle):
if self.next_processor:
self.next_processor.reprocess(context, candle)

def event(self, context: SharedContext, event: Event):
if self.next_processor:
self.next_processor.event(context, event)

90 changes: 90 additions & 0 deletions src/pipeline/processors/assets_correlation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import json
from typing import Optional, List, Dict

from scipy import spatial

from entities.candle import Candle
from entities.event import Event
from entities.generic_candle_attachment import GenericCandleAttachment
from pipeline.processor import Processor
from pipeline.processors.candle_cache import CandleCache
from pipeline.processors.technicals import IndicatorValue
from pipeline.shared_context import SharedContext

CORRELATIONS_ATTACHMENT_KEY = 'correlations'
CORRELATION_ELEMENTS_COUNT = 4


class AssetCorrelation(GenericCandleAttachment[IndicatorValue]):
pass


AssetCorrelation()


class CorrelationConfig:
def __init__(self, groups: List[List[str]]) -> None:
self.groups: List[List[str]] = groups


class AssetCorrelationProcessor(Processor):
def __init__(self, config_path: str, next_processor: Optional[Processor]) -> None:
with open(config_path, 'r') as config_content:
c: Dict = json.loads(config_content.read())
self.config = CorrelationConfig(c.get('groups', []))

super().__init__(next_processor)

def process(self, context: SharedContext, candle: Candle):
super().process(context, candle)

def event(self, context: SharedContext, event: Event):
if event == event.TimeSpanChange:
self._calculate_correlations(context)

super().event(context, event)

def _calculate_correlations(self, context: SharedContext):
cache_reader = CandleCache.context_reader(context)
symbols = cache_reader.get_symbols_list()

for symbol in symbols:
self._calculate_symbol_correlations(context, symbol)

def _calculate_symbol_correlations(self, context: SharedContext, symbol: str):
cache_reader = CandleCache.context_reader(context)
asset_correlation = AssetCorrelation()

group_symbols = self._get_symbol_group(symbol)

if group_symbols:
current_symbol_candles = cache_reader.get_symbol_candles(symbol) or []
current_symbol_values = self._get_correlation_measurable_values(current_symbol_candles)

for paired_symbol in group_symbols:
if paired_symbol == symbol:
continue

symbol_candles = cache_reader.get_symbol_candles(paired_symbol) or []
symbol_values = self._get_correlation_measurable_values(symbol_candles)

if len(symbol_values) != len(current_symbol_values) or len(current_symbol_values) <= CORRELATION_ELEMENTS_COUNT:
continue

correlation = spatial.distance.correlation(current_symbol_values[-CORRELATION_ELEMENTS_COUNT:],
symbol_values[-CORRELATION_ELEMENTS_COUNT:])
asset_correlation.set(paired_symbol, correlation)

latest_candle = current_symbol_candles[-1]
latest_candle.attachments.add_attachement(CORRELATIONS_ATTACHMENT_KEY, asset_correlation)

self.reprocess(context, latest_candle)

def _get_symbol_group(self, symbol: str) -> Optional[List[str]]:
for group in self.config.groups:
if symbol in group:
return group

@staticmethod
def _get_correlation_measurable_values(candles: List[Candle]) -> List[float]:
return [c.close for c in candles]
16 changes: 16 additions & 0 deletions src/pipeline/processors/candle_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,28 @@ def get_symbol_candles(self, symbol: str) -> Optional[List[Candle]]:
if data and symbol in data:
return data[symbol]

def get_symbols_list(self) -> Optional[List[str]]:
data = self.context.get_kv_data(CONTEXT_IDENT)
if data:
return list(data.keys())


class CandleCache(Processor):
def __init__(self, next_processor: Optional[Processor] = None) -> None:
super().__init__(next_processor)
self.data: CacheData = {}

def reprocess(self, context: SharedContext, candle: Candle):
context_reader = CandleCacheContextReader(context)
candles = context_reader.get_symbol_candles(candle.symbol)

for i in range(len(candles)):
if candles[i].timestamp == candle.timestamp:
candles[i] = candle
break

super().reprocess(context, candle)

def process(self, context: SharedContext, candle: Candle):
context_writer = CandleCacheContextWriter(context)
context_writer.put_candle(candle)
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/processors/technicals.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pipeline.processors.candle_cache import CandleCache
from pipeline.shared_context import SharedContext

CONTEXT_IDENT = 'Technicals'

INDICATORS_ATTACHMENT_KEY = 'indicators'
TechnicalsData = Dict[str, Dict[str, List[float]]]

Expand Down
4 changes: 4 additions & 0 deletions src/pipeline/sources/mongodb_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from datetime import datetime
from typing import Iterator, List, Optional

Expand All @@ -8,6 +9,7 @@


class MongoDBSource(Source):
logger = logging.getLogger('MongoDBSource')

def __init__(self, mongo_storage: MongoDBStorage, symbols: List[str], timespan: TimeSpan,
from_time: datetime, to_time: Optional[datetime] = datetime.now()) -> None:
Expand All @@ -18,7 +20,9 @@ def __init__(self, mongo_storage: MongoDBStorage, symbols: List[str], timespan:
self.symbols = symbols

def read(self) -> Iterator[Candle]:
self.logger.info('Fetching candles from mongo source...')
all_candles = self.mongo_storage.get_candles(self.timespan, self.from_time, self.to_time)
self.logger.info('Got candles, starting iteration')
for c in all_candles:
if c.symbol in self.symbols:
yield c
12 changes: 12 additions & 0 deletions src/pipeline/terminators/technicals_binner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import math
from typing import List, Dict, Tuple

import numpy

from entities.bucket import Bucket
from entities.bucketscontainer import BucketsContainer
from entities.candle import Candle
from pipeline.processors.assets_correlation import AssetCorrelation, CORRELATIONS_ATTACHMENT_KEY
from pipeline.processors.candle_cache import CandleCache
from pipeline.processors.technicals import IndicatorValue
from pipeline.processors.technicals_normalizer import NormalizedIndicators, NORMALIZED_INDICATORS_ATTACHMENT_KEY
Expand Down Expand Up @@ -44,6 +47,15 @@ def _process_candle(self, candle: Candle):

self.values[indicator].append(value)

asset_correlation: AssetCorrelation = candle.attachments.get_attachment(
CORRELATIONS_ATTACHMENT_KEY)
if asset_correlation:
values = []
for s, v in asset_correlation.items():
values.append(v)

self.values['avg_correlation'] = numpy.average(values)

def _calculate_bins(self):
for indicator, values in self.values.items():
if isinstance(values[0], float):
Expand Down
9 changes: 9 additions & 0 deletions tests/configs/correlations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"groups": [
[
"X",
"Y",
"Z"
]
]
}
1 change: 1 addition & 0 deletions tests/fakes/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class FakeSource(Source):
def __init__(self, candles: List[Candle]) -> None:
super().__init__()
self.candles = candles
self.candles.sort(key=lambda c: c.timestamp)

def read(self) -> Iterator[Candle]:
for c in self.candles:
Expand Down
13 changes: 11 additions & 2 deletions tests/unit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@
TEST_SYMBOL = 'X'


def generate_candle(time_span: TimeSpan, timestamp: datetime) -> Candle:
return Candle(symbol=TEST_SYMBOL, time_span=time_span, timestamp=timestamp,
def generate_candle_with_symbol(symbol: str, time_span: TimeSpan, timestamp: datetime) -> Candle:
return Candle(symbol=symbol, time_span=time_span, timestamp=timestamp,
open=0.0, close=0.0, high=0.0, low=0.0, volume=0.0)


def generate_candle(time_span: TimeSpan, timestamp: datetime) -> Candle:
return generate_candle_with_symbol(TEST_SYMBOL, time_span, timestamp)


def generate_candle_with_price(time_span: TimeSpan, timestamp: datetime, price: float) -> Candle:
candle = generate_candle(time_span, timestamp)
candle.open = candle.close = candle.high = candle.low = candle.volume = price
return candle

def generate_candle_with_price_and_symbol(symbol: str, time_span: TimeSpan, timestamp: datetime, price: float) -> Candle:
candle = generate_candle_with_symbol(symbol, time_span, timestamp)
candle.open = candle.close = candle.high = candle.low = candle.volume = price
return candle
Loading

0 comments on commit bfd8f7a

Please sign in to comment.