Skip to content

Commit

Permalink
draft sectors data gather new and update
Browse files Browse the repository at this point in the history
Signed-off-by: ivelin <[email protected]>
  • Loading branch information
ivelin committed Feb 13, 2024
1 parent 331b183 commit 2aade62
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 32 deletions.
4 changes: 2 additions & 2 deletions canswim.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/bash
args=("$@")
conda activate canswim
pip install -e ./
#conda activate canswim
#pip install -e ./
python -m canswim "${args[@]}"

# run dashboard
Expand Down
17 changes: 10 additions & 7 deletions src/canswim/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from dotenv import load_dotenv
import os
import argparse
from canswim import model_search, train, dashboard
from canswim import model_search, train, dashboard, gather_data

# Instantiate the parser
parser = argparse.ArgumentParser(
Expand All @@ -32,14 +32,15 @@
parser.add_argument(
"task",
type=str,
help="""Which %(prog)s task to run: \n
`dashboard` for stock charting and scans of recorded forecasts.\n
`modelsearch` to find and save optimal hyperparameters for model training.\n
`train` for continuous model training.\n
`finetune` to fine tune pretrained model.\n
help="""Which %(prog)s task to run:
`dashboard` for stock charting and scans of recorded forecasts.
'gatherdata` to gather 3rd party stock market data and save to HF Hub.
`modelsearch` to find and save optimal hyperparameters for model training.
`train` for continuous model training.
`finetune` to fine tune pretrained model.
`forecast` to run forecast on stocks and upload dataset to HF Hub.
""",
choices=["dashboard", "modelsearch", "train", "finetune", "forecast"],
choices=["dashboard", "gatherdata", "modelsearch", "train", "finetune", "forecast"],
)

args = parser.parse_args()
Expand All @@ -63,6 +64,8 @@
match args.task:
case "modelsearch":
model_search.main()
case "gatherdata":
gather_data.main()
case "train":
train.main()
case "finetune":
Expand Down
1 change: 1 addition & 0 deletions src/canswim/covariates.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def prepare_earn_series(self, tickers=None):
assert not t_earn.index.duplicated().any()
assert not t_earn.index.isnull().any()
t_earn = self.align_earn_to_business_days(t_earn)
# drop rows with duplicate datetime index values
t_earn = (
t_earn.reset_index()
.drop_duplicates(subset="Date", keep="last")
Expand Down
107 changes: 87 additions & 20 deletions src/canswim/gather_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
from canswim.hfhub import HFHub
import typing
from fmpsdk.url_methods import __return_json_v3, __validate_period
from pandas.api.types import is_datetime64_any_dtype as is_datetime

def get_ydelta(datetime_col):
"""Return the max datetime value in a datetime formatted dataframe column"""
assert len(datetime_col) > 0
assert is_datetime(datetime_col)
now = pd.Timestamp.now()
latest_saved_record = datetime_col.max()
ydelta = (now - latest_saved_record) // pd.Timedelta(52, "w")
return ydelta


class MarketDataGatherer:

Expand Down Expand Up @@ -95,38 +106,84 @@ def gather_stock_tickers(self):
stocks_df.to_csv(stocks_file)
logger.info(f"Saved stock set to {stocks_file}")


def gather_broad_market_data(self):
## Prepare data for broad market indicies
# Capture S&P500, NASDAQ100 and Russell 200 indecies and their equal weighted counter parts
# As well as VIX volatility index, DYX US Dollar index, TNX US 12 Weeks Treasury Yield, 5 Years Treasury Yield and 10 Year Treasuries Yield
broad_market_indicies = (
"^SPX ^SPXEW ^NDX ^NDXE ^RUT ^R2ESC ^VIX DX-Y.NYB ^IRX ^FVX ^TNX"
)
bm_file = "data/data-3rd-party/broad_market.parquet"
latest_saved = None
data_file = "data/data-3rd-party/broad_market.parquet"
period = "max"
old_df = None
try:
tmp_s = pd.read_parquet(bm_file)
latest_saved =
old_df = pd.read_parquet(data_file)
ydelta = 1 + get_ydelta(old_df.index)
logger.info("Loaded saved broad market data. Sample: \n{bm}", bm=old_df)
logger.info(f"Columns: \n{old_df.columns}")
logger.info(f"Latest saved record is less than {ydelta} year(s) ago")
period = f"{ydelta}y"
except Exception as e:
logger.info(f"Could not read file: {sectors_file}")
broad_market = yf.download(broad_market_indicies, period="max", group_by="tickers")
logger.info("Broad market data gathered. Sample: {bm}", bm=broad_market)
broad_market.to_parquet(bm_file)
logger.info(f"Saved broad market data to {bm_file}")
bm = pd.read_parquet(bm_file)
logger.info(f"Sanity check passed for broad market data. Loaded OK from {bm_file}")

logger.info(f"Could not load data from file: {data_file}. Error: {e}")
new_df = yf.download(broad_market_indicies, period=period, group_by="tickers")
logger.info("New broad market data gathered. Sample: \n{bm}", bm=new_df)
logger.info(f"Columns: \n{new_df.columns}")
assert sorted(old_df.columns) == sorted(new_df.columns)
merged_df = pd.concat([old_df, new_df], axis=0)
logger.info(f"bm_df concat\n {merged_df}")
assert sorted(merged_df.columns) == sorted(old_df.columns)
assert len(merged_df) == len(old_df) + len(new_df)
merged_df = (
merged_df.reset_index()
.drop_duplicates(subset=[('Date','')], keep="last")
.set_index("Date")
)
logger.info("Updated broad market data ready. Sample: \n{bm}", bm=merged_df)
assert merged_df.index.is_unique
merged_df.to_parquet(data_file)
logger.info(f"Saved broad market data to {data_file}")
_bm = pd.read_parquet(data_file)
assert sorted(_bm.columns) == sorted(merged_df.columns)
assert len(_bm) == len(merged_df)
logger.info(f"Sanity check passed for broad market data. Loaded OK from {data_file}")

def gather_sectors_data(self):
"""Gather historic price and volume data for key market sectors"""
sector_indicies = "XLE ^SP500-15 ^SP500-20 ^SP500-25 ^SP500-30 ^SP500-35 ^SP500-40 ^SP500-45 ^SP500-50 ^SP500-55 ^SP500-60"
sectors_file = "data/data-3rd-party/sectors.parquet"
... check if file already exists and adjust download period accordingly
sectors = yf.download(sector_indicies, period="max")
logger.info(f"Sector indicies data gathered. Sample: {sectors}")
sectors.to_parquet(sectors_file)
logger.info(f"Saved sectors data to {sectors_file}")
tmp_s = pd.read_parquet(sectors_file)
logger.info(f"Sanity check passed for sector data. Loaded OK from {sectors_file}")
data_file = "data/data-3rd-party/sectors.parquet"
period = "max"
old_df = None
try:
old_df = pd.read_parquet(data_file)
ydelta = 1 + get_ydelta(old_df.index)
logger.info("Loaded saved sectors data. Sample: \n{df}", df=old_df)
logger.info(f"Columns: \n{old_df.columns}")
logger.info(f"Latest saved record is less than {ydelta} year(s) ago")
period = f"{ydelta}y"
except Exception as e:
logger.info(f"Could not load data from file: {data_file}. Error: {e}")
new_df = yf.download(sector_indicies, period=period, group_by="tickers")
logger.info(f"Sector indicies data gathered. Sample: \n{new_df}")
logger.info(f"Columns: \n{new_df.columns}")
assert sorted(old_df.columns) == sorted(new_df.columns)
merged_df = pd.concat([old_df, new_df], axis=0)
logger.info(f"merged_df concat\n {merged_df}")
assert sorted(merged_df.columns) == sorted(old_df.columns)
assert len(merged_df) == len(old_df) + len(new_df)
merged_df = (
merged_df.reset_index()
.drop_duplicates(subset=[('Date','')], keep="last")
.set_index("Date")
)
logger.info(f"Updated sectors data ready. Sample: \n{merged_df}")
assert merged_df.index.is_unique
merged_df.to_parquet(data_file)
logger.info(f"Saved sectors data to {data_file}")
_df = pd.read_parquet(data_file)
assert sorted(_df.columns) == sorted(merged_df.columns)
assert len(_df) == len(merged_df)
logger.info(f"Sanity check passed for sectors data. Loaded OK from {data_file}")

def gather_stock_price_data(self):
stock_price_data = yf.download(
Expand Down Expand Up @@ -470,3 +527,13 @@ def upload_data_to_hfhub(self):
folder_path=data_path,
token=HF_TOKEN,
)


# main function
def main():
g = MarketDataGatherer()
g.gather_broad_market_data()
g.gather_sectors_data()

if __name__ == "__main__":
main()
12 changes: 9 additions & 3 deletions src/canswim/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,19 @@ def __prepare_data_splits(self):
logger.info(f"Total # stocks in train list: {len(self.target_train_list)}")
if len(self.target_train_list) > 0:
logger.info(
f"Sample train series start time: {self.target_train_list[0].start_time()}, end time: {self.target_train_list[0].end_time()}, "
f"Sample train series start time: {self.target_train_list[0].start_time()}, end time: {self.target_train_list[0].end_time()}"
)
logger.info(
f"Sample val series start time: {self.target_val_list[0].start_time()}, end time: {self.target_val_list[0].end_time()}"
)
logger.info(
f"Sample test series start time: {self.target_test_list[0].start_time()}, end time: {self.target_test_list[0].end_time()}, "
f"Sample test series start time: {self.target_test_list[0].start_time()}, end time: {self.target_test_list[0].end_time()}"
)
logger.info(
f"Sample past covariates columns: {len(self.past_cov_list[0].columns)}"
)
logger.info(
f"Sample future covariates columns: {len(self.future_cov_list[0].columns)}"
)
# update targets series dict
updated_target_series = {}
Expand Down Expand Up @@ -983,7 +989,7 @@ def _optuna_objective(self, trial):
# for convenience, print some optimization trials information

def find_model(self, n_trials: int = 100, study_name: str = "canswim-study"):
study = optuna.create_study(direction="minimize", study_name="canswim-study", storage="sqlite:///data/optuna_study.db")
study = optuna.create_study(direction="minimize", study_name=study_name, storage="sqlite:///data/optuna_study.db")
study.optimize(
self._optuna_objective,
n_trials=n_trials,
Expand Down

0 comments on commit 2aade62

Please sign in to comment.