Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added alpha/AutoRegression/ar_model.pth
Binary file not shown.
145 changes: 145 additions & 0 deletions alpha/AutoRegression/autoregression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from pathlib import Path

class AR(nn.Module):
def __init__(self, lag):
super().__init__()
self.lag = lag
self.linear = nn.Linear(lag, 1, bias=True)

def forward(self, x):
return self.linear(x)

def train_one_epoch(loader: DataLoader, model: nn.Module, loss_fn, optimizer):
model.train()
size = len(loader.dataset)
running_loss = 0.0
for _, (xb, yb) in enumerate(loader):
xb = xb.float()
yb = yb.float()
optimizer.zero_grad()
pred = model(xb)
loss = loss_fn(pred, yb)
loss.backward()
optimizer.step()
running_loss += loss.item() * xb.size(0)
epoch_loss = running_loss / size
print(f"Average training loss: {epoch_loss: .10e}")
return epoch_loss

def train(train_dataset: Dataset, epoches: int, ar_model: nn.Module):
loss_fn = nn.L1Loss()
optimizer = torch.optim.Adam(ar_model.parameters(), lr=1e-3)
loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=False)
for _ in range(epoches):
Comment thread
CYX22222003 marked this conversation as resolved.
train_one_epoch(loader, ar_model, loss_fn, optimizer)

def build_train(data, T):
X = []
y = []
n = len(data)
for i in range(n - T):
window = data[i : i + T]
target = data[i + T]
X.append(window)
y.append(target)

return np.array(X), np.array(y)

class ArDataset(Dataset):
def __init__(self, X, y):
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)
self.X = X.float()
self.y = y.float()
self.y = self.y.unsqueeze(1)

def __len__(self):
return self.X.shape[0]

def __getitem__(self, idx):
return self.X[idx], self.y[idx]

def infer_mu(model, X):
model.eval()
with torch.no_grad():
X_tensor = torch.tensor(X, dtype=torch.float32)
mu = model(X_tensor).squeeze(1).cpu().numpy()
return mu

def load_ar_model(model_path="./ar_model.pth", lag=90):
model = AR(lag=lag)
model.load_state_dict(torch.load(model_path, weights_only=True))
model.eval()
return model

base_dir = Path(__file__).resolve().parent
model_path = base_dir / "ar_model.pth"


class _LazyARModel:
"""
Lazily loads and caches the AR model on first use.

This avoids loading the model at import time, so that import errors
due to missing/corrupt weights or missing dependencies do not
prevent the rest of the package from being used.
"""

def __init__(self, path):
self._path = path
self._model = None

def _load(self):
if self._model is None:
# Delegate to the existing loader; any exceptions will be
# raised at first use rather than at import time.
self._model = load_ar_model(self._path)
return self._model

def __getattr__(self, name):
# Proxy attribute access to the underlying model instance.
return getattr(self._load(), name)

def __call__(self, *args, **kwargs):
# Forward calls to the underlying model
return self._load()(*args, **kwargs)


trained_ar_model = _LazyARModel(model_path)
# if __name__ == "__main__":
# print("1. Define model")
# ar_model = AR(lag=90)

# print("2. Load data")
# data = load_data()
# log_ret = data["log_return"].to_numpy()
# log_ret = log_ret[np.isfinite(log_ret)]
# X, y = build_train(log_ret, 90)
# print(X[0], y[0])

# print("3. Prepare training")
# train_idx = int(len(X) * 0.85)
# X_train = X[:train_idx]
# y_train = y[:train_idx]
# dataset = ArDataset(X_train, y_train)

# print("4. Train")
# train(dataset, 10, ar_model)

# print("5. Test inference")
# ar_model.eval()
# X_test = X[train_idx:]
# y_test = y[train_idx:]

# mu_test = infer_mu(ar_model, X_test)
# residuals = y_test - mu_test

# mae = np.mean(np.abs(residuals))
# print("Test MAE:", mae)

# print("6. Save model")
# torch.save(ar_model.state_dict(), "./ar_model.pth")
16 changes: 16 additions & 0 deletions alpha/AutoRegression/data_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import numpy as np
import pandas as pd

def load_data(data_path: str = "./binance/BTC_USDT-5m.feather",
start_date: str = "2021-01-01"):
df = pd.read_feather(data_path)
df = df[df["date"] > start_date]
df["log_return"] = np.log(df['close'] / df['close'].shift(1))
print(df.head())
Comment thread
CYX22222003 marked this conversation as resolved.
return df

def get_log_return_series():
df = load_data()
log_ret = df["log_return"].to_numpy()
log_ret = log_ret[np.isfinite(log_ret)]
return log_ret
32 changes: 32 additions & 0 deletions alpha/AutoregressionAlpha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import numpy as np
import torch
from pandas import DataFrame
from alpha.interface import IAlpha
from alpha.AutoRegression.autoregression import trained_ar_model, build_train
Comment thread
CYX22222003 marked this conversation as resolved.

Comment thread
CYX22222003 marked this conversation as resolved.
AR_MODEL_PATH = "./ar_model.pth"
Comment thread
CYX22222003 marked this conversation as resolved.
AR_LAG = 90

class AutoregressionAlpha(IAlpha):
def process(self) -> DataFrame:
df = self.dataframe.copy()

df["log_return"] = np.log(df["close"] / df["close"].shift(1))
log_ret = df["log_return"].to_numpy()
log_ret = log_ret[np.isfinite(log_ret)]
Comment thread
CYX22222003 marked this conversation as resolved.

if len(log_ret) <= AR_LAG:
df["ar_pred"] = np.nan
return df
X, _ = build_train(log_ret, AR_LAG)
# Load AR model
ar_model = trained_ar_model
ar_model.eval()
with torch.no_grad():
X_tensor = torch.tensor(X, dtype=torch.float32)
preds = ar_model(X_tensor).squeeze(1).cpu().numpy()
# Align predictions with dataframe index
ar_pred = np.full(df.shape[0], np.nan)
ar_pred[AR_LAG+1:] = preds
df["ar_pred"] = ar_pred
return df
19 changes: 18 additions & 1 deletion alpha/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,21 @@ def process(self) -> DataFrame:
This is to decouple the pipulation of indicator from IStrategy
"""
pass


fwd_ret_timeframe = [1, 5, 10, 20, 90]
class AlphaEvaluator:
def __init__(self, dataframe: DataFrame, alpha: type[IAlpha], metadata: dict = None):
self.df = dataframe
self.alpha = alpha(dataframe, metadata if metadata is not None else {})

def evaluate_information_coefficient(self, alpha_names):
df_processed = self.alpha.process()
out = {}
for a in alpha_names:
for t in fwd_ret_timeframe:
fwd_ret = df_processed['close'].pct_change().shift(-t)
temp_df = DataFrame({'alpha': df_processed[a], 'fwd_ret': fwd_ret})
temp_df = temp_df.dropna(subset=['alpha', 'fwd_ret'])
ic = temp_df['alpha'].corr(temp_df['fwd_ret'], method='spearman')
out[(a, t)] = ic
return out
61 changes: 59 additions & 2 deletions tests/test_alpha.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

talib = pytest.importorskip("talib", reason="TA-Lib C library not installed")

from alpha.interface import IAlpha
from alpha.interface import IAlpha, AlphaEvaluator
from alpha.SimpleEmaFactors import EmaAlpha
Comment thread
CYX22222003 marked this conversation as resolved.
from alpha.RsiAlpha import RsiAlpha
from alpha.MacdAlpha import MacdAlpha
from alpha.BollingerAlpha import BollingerAlpha
from alpha.AutoregressionAlpha import AutoregressionAlpha
from alpha.EventLstmAlpha import EventLstmAlpha # type: ignore

def _make_ohlcv(n=100):
Expand Down Expand Up @@ -160,6 +161,62 @@ def test_pctb_near_0_5_for_mean(self):
# Mean %B across a random walk should be roughly centered around 0.5
assert 0.2 < valid.mean() < 0.8

class TestAutoregressionAlpha:
def test_process_adds_ar_pred_column(self):
df = _make_ohlcv(200)
df = df[["date", "close"]].copy()
result = AutoregressionAlpha(df).process()
assert "ar_pred" in result.columns, "Missing ar_pred column"

def test_ar_pred_nan_for_short_series(self):
df = _make_ohlcv(50)
df = df[["date", "close"]].copy()
result = AutoregressionAlpha(df).process()
assert result["ar_pred"].isna().all(), "ar_pred should be NaN for short series"

def test_ar_pred_has_valid_values(self):
df = _make_ohlcv(200)
df = df[["date", "close"]].copy()
result = AutoregressionAlpha(df).process()
ar_pred = result["ar_pred"].iloc[91:]
assert ar_pred.notna().any(), "ar_pred should have valid predictions after lag"

class AlphaStub(IAlpha):
def process(self):
df = self.dataframe.copy()
df["alpha"] = df["close"].pct_change()
return df

class TestAlphaInformationEvaluation:
def test_evaluate_information_coefficient_returns_dict(self):
df = _make_ohlcv(120)
evaluator = AlphaEvaluator(df, AlphaStub)
result = evaluator.evaluate_information_coefficient(["alpha"])
assert isinstance(result, dict)
expected_keys = [("alpha", t) for t in [1, 5, 10, 20, 90]]
for key in expected_keys:
assert key in result
assert isinstance(result[key], (float, type(None)))

def test_ic_values_are_finite_or_nan(self):
df = _make_ohlcv(120)
evaluator = AlphaEvaluator(df, AlphaStub)
result = evaluator.evaluate_information_coefficient(["alpha"])
for ic in result.values():
assert (ic is None) or (isinstance(ic, float))

def test_ic_on_constant_alpha_is_nan(self):
class ConstAlpha(IAlpha):
def process(self):
df = self.dataframe.copy()
df["alpha"] = 1.0
return df
df = _make_ohlcv(120)
evaluator = AlphaEvaluator(df, ConstAlpha)
result = evaluator.evaluate_information_coefficient(["alpha"])
for ic in result.values():
assert ic != ic

class TestEventLstmAlpha:
@pytest.mark.slow
def test_event_lstm_alpha_with_fake_data(self):
Expand Down Expand Up @@ -196,4 +253,4 @@ def test_event_lstm_alpha_with_fake_data(self):
assert valid_preds.notna().all(), "Predictions should be filled"

# Predictions should be within [0,1] because close prices are normalized
assert (valid_preds >= 0).all() and (valid_preds <= 1).all()
assert (valid_preds >= 0).all() and (valid_preds <= 1).all()
Loading