diff --git a/src/backtest.py b/src/backtest.py index 8855473..626cb98 100644 --- a/src/backtest.py +++ b/src/backtest.py @@ -1,21 +1,17 @@ ''' -PorQua : a python library for portfolio optimization and backtesting -PorQua is part of GeomScale project +PorQua: A Python Library for Portfolio Optimization and Backtesting +Part of the GeomScale project Copyright (c) 2024 Cyril Bachelard Copyright (c) 2024 Minh Ha Ho -Licensed under GNU LGPL.3, see LICENCE file +Licensed under GNU LGPL.3; see LICENCE file. ''' - ############################################################################ ### CLASSES BacktestData, BacktestService, Backtest ############################################################################ - - - import os from typing import Optional import pickle @@ -30,17 +26,12 @@ from builders import SelectionItemBuilder, OptimizationItemBuilder - - - class BacktestData(): - def __init__(self): pass class BacktestService(): - def __init__(self, data: BacktestData, selection_item_builders: dict[str, SelectionItemBuilder], @@ -54,7 +45,7 @@ def __init__(self, self.optimization_item_builders = optimization_item_builders self.settings = settings if settings is not None else {} self.settings.update(kwargs) - # Initialize the selection and optimization data + self.selection = Selection() self.optimization_data = OptimizationData([]) @@ -135,23 +126,19 @@ def build_selection(self, rebdate: str) -> None: def build_optimization(self, rebdate: str) -> None: - # Initialize the optimization constraints - self.optimization.constraints = Constraints(selection = self.selection.selected) + self.optimization.constraints = Constraints(selection=self.selection.selected) - # Loop over the optimization_item_builders items for item_builder in self.optimization_item_builders.values(): item_builder(self, rebdate) return None def prepare_rebalancing(self, rebalancing_date: str) -> None: - self.build_selection(rebdate = rebalancing_date) - self.build_optimization(rebdate = rebalancing_date) + self.build_selection(rebdate=rebalancing_date) + self.build_optimization(rebdate=rebalancing_date) return None - class Backtest: - def __init__(self) -> None: self._strategy = Strategy([]) self._output = {} @@ -165,16 +152,16 @@ def output(self): return self._output def append_output(self, - date_key = None, - output_key = None, - value = None): + date_key=None, + output_key=None, + value=None): if value is None: return True if date_key in self.output.keys(): if output_key in self.output[date_key].keys(): - raise Warning(f"Output key '{output_key}' for date key '{date_key}' \ - already exists and will be overwritten.") + raise Warning(f"Output key '{output_key}' for date key '{date_key}' " + "already exists and will be overwritten.") self.output[date_key][output_key] = value else: self.output[date_key] = {} @@ -185,13 +172,11 @@ def append_output(self, def rebalance(self, bs: BacktestService, rebalancing_date: str) -> None: - # Prepare the rebalancing, i.e., the optimization problem - bs.prepare_rebalancing(rebalancing_date = rebalancing_date) + bs.prepare_rebalancing(rebalancing_date=rebalancing_date) - # Solve the optimization problem try: - bs.optimization.set_objective(optimization_data = bs.optimization_data) + bs.optimization.set_objective(optimization_data=bs.optimization_data) bs.optimization.solve() except Exception as error: raise RuntimeError(error) @@ -199,27 +184,25 @@ def rebalance(self, return None def run(self, bs: BacktestService) -> None: - for rebalancing_date in bs.settings['rebdates']: - if not bs.settings.get('quiet'): print(f'Rebalancing date: {rebalancing_date}') - self.rebalance(bs = bs, - rebalancing_date = rebalancing_date) + self.rebalance(bs=bs, + rebalancing_date=rebalancing_date) # Append portfolio to strategy weights = bs.optimization.results['weights'] - portfolio = Portfolio(rebalancing_date = rebalancing_date, weights = weights) + portfolio = Portfolio(rebalancing_date=rebalancing_date, weights=weights) self.strategy.portfolios.append(portfolio) - # Append stuff to output if a custom append function is provided + # Append additional output if a custom append function is provided append_fun = bs.settings.get('append_fun') if append_fun is not None: - append_fun(backtest = self, - bs = bs, - rebalancing_date = rebalancing_date, - what = bs.settings.get('append_fun_args')) + append_fun(backtest=self, + bs=bs, + rebalancing_date=rebalancing_date, + what=bs.settings.get('append_fun_args')) return None @@ -228,7 +211,7 @@ def save(self, path: Optional[str] = None) -> None: try: if path is not None and filename is not None: - filename = os.path.join(path, filename) #// alternatively, use pathlib package + filename = os.path.join(path, filename) # Alternatively, use pathlib package with open(filename, "wb") as f: pickle.dump(self, f, protocol=pickle.HIGHEST_PROTOCOL) except Exception as ex: @@ -237,34 +220,30 @@ def save(self, return None - -# -------------------------------------------------------------------------- -# Helper functions -# -------------------------------------------------------------------------- - def append_custom(backtest: Backtest, bs: BacktestService, rebalancing_date: Optional[str] = None, what: Optional[list] = None) -> None: if what is None: - what = ['w_dict', 'objective'] + what = ['portfolio_ids', 'objective'] for key in what: - if key == 'w_dict': - w_dict = bs.optimization.results['w_dict'] - for key in w_dict.keys(): - weights = w_dict[key] - if hasattr(weights, 'to_dict'): - weights = weights.to_dict() - portfolio = Portfolio(rebalancing_date = rebalancing_date, weights = weights) - backtest.append_output(date_key = rebalancing_date, - output_key = f'weights_{key}', - value = pd.Series(portfolio.weights)) + if key == 'portfolio_ids': + portfolio_ids = bs.optimization.results.get('portfolio_ids', {}) + for pid, indices in portfolio_ids.items(): + + weights = bs.optimization.results.get('weights', {}) + + segment_weights = {i: weights[i] for i in indices if i in weights} + portfolio = Portfolio(rebalancing_date=rebalancing_date, weights=segment_weights) + backtest.append_output(date_key=rebalancing_date, + output_key=f'weights_portfolio_{pid}', + value=pd.Series(portfolio.weights)) else: - if not key in bs.optimization.results.keys(): + if key not in bs.optimization.results.keys(): continue - backtest.append_output(date_key = rebalancing_date, - output_key = key, - value = bs.optimization.results[key]) + backtest.append_output(date_key=rebalancing_date, + output_key=key, + value=bs.optimization.results[key]) return None diff --git a/src/constraints.py b/src/constraints.py index dc2b21c..38a626c 100644 --- a/src/constraints.py +++ b/src/constraints.py @@ -8,21 +8,27 @@ Licensed under GNU LGPL.3, see LICENCE file ''' - - - import warnings import pandas as pd import numpy as np from typing import Dict - - - +############################################################################### +# Constraints Class +############################################################################### class Constraints: def __init__(self, selection="NA") -> None: + """ + Initialize a Constraints instance. + + Args: + selection (str or iterable of str): A character vector for asset selection. + Each element must be a string. + Raises: + ValueError: If any element in selection is not a string. + """ if not all(isinstance(item, str) for item in selection): raise ValueError("argument 'selection' has to be a character vector.") @@ -31,12 +37,22 @@ def __init__(self, selection="NA") -> None: self.box = {'box_type': 'NA', 'lower': None, 'upper': None} self.linear = {'Amat': None, 'sense': None, 'rhs': None} self.l1 = {} - return None def __str__(self) -> str: + """ + Return a string representation of the Constraints instance, + listing all attributes and their current values. + """ return ' '.join(f'\n{key}:\n\n{vars(self)[key]}\n' for key in vars(self).keys()) def add_budget(self, rhs=1, sense='=') -> None: + """ + Add or update the budget constraint. + + Args: + rhs: Right-hand side value of the budget constraint. + sense (str): Constraint sense (e.g., '='). Defaults to '='. + """ if self.budget.get('rhs') is not None: warnings.warn("Existing budget constraint is overwritten\n") @@ -44,12 +60,19 @@ def add_budget(self, rhs=1, sense='=') -> None: self.budget = {'Amat': a_values, 'sense': sense, 'rhs': rhs} - return None - def add_box(self, - box_type="LongOnly", - lower=None, - upper=None) -> None: + def add_box(self, box_type="LongOnly", lower=None, upper=None) -> None: + """ + Add or update the box constraint. + + Args: + box_type (str): The type of box constraint. Options are "LongOnly", "LongShort", or "Unbounded". + lower: The lower bound(s). + upper: The upper bound(s). + + Raises: + ValueError: If any lower bound is higher than the corresponding upper bound. + """ boxcon = box_constraint(box_type, lower, upper) if np.isscalar(boxcon['lower']): @@ -61,7 +84,6 @@ def add_box(self, raise ValueError("Some lower bounds are higher than the corresponding upper bounds.") self.box = boxcon - return None def add_linear(self, Amat: pd.DataFrame = None, @@ -69,6 +91,19 @@ def add_linear(self, sense: str = '=', rhs=None, name: str = None) -> None: + """ + Add or update linear constraints. + + Args: + Amat (pd.DataFrame, optional): Constraint matrix. + a_values (pd.Series, optional): Alternative to Amat. If provided, used to construct Amat. + sense (str): Constraint sense. Defaults to '='. + rhs: Right-hand side value(s) for the constraint. + name (str, optional): Name for the constraint (used as index in Amat). + + Raises: + ValueError: If neither Amat nor a_values is provided. + """ if Amat is None: if a_values is None: raise ValueError("Either 'Amat' or 'a_values' must be provided.") @@ -91,27 +126,42 @@ def add_linear(self, Amat.fillna(0, inplace=True) self.linear = {'Amat': Amat, 'sense': sense, 'rhs': rhs} - return None - - # name: turnover or leverage - def add_l1(self, - name: str, - rhs=None, - x0=None, - *args, **kwargs) -> None: + + def add_l1(self, name: str, rhs=None, x0=None, *args, **kwargs) -> None: + """ + Add or update an l1-type constraint (e.g., for turnover or leverage). + + Args: + name (str): Name of the constraint. + rhs: Right-hand side value. + x0 (optional): An initial guess. + *args: Additional arguments. + **kwargs: Additional keyword arguments. + + Raises: + TypeError: If rhs is not provided. + """ if rhs is None: raise TypeError("argument 'rhs' is required.") con = {'rhs': rhs} - if x0: + if x0 is not None: con['x0'] = x0 for i, arg in enumerate(args): con[f'arg{i}'] = arg for key, value in kwargs.items(): con[key] = value self.l1[name] = con - return None def to_GhAb(self, lbub_to_G: bool = False) -> Dict[str, pd.DataFrame]: + """ + Convert constraints into matrices/vectors for optimization. + + Args: + lbub_to_G (bool): If True, convert lower and upper bounds from the box constraint into G and h matrices. + + Returns: + Dict[str, pd.DataFrame]: Dictionary with keys 'G', 'h', 'A', and 'b' corresponding to constraint matrices/vectors. + """ A = None b = None G = None @@ -128,7 +178,10 @@ def to_GhAb(self, lbub_to_G: bool = False) -> Dict[str, pd.DataFrame]: if lbub_to_G: I = np.eye(len(self.selection)) G_tmp = np.concatenate((-I, I), axis=0) - h_tmp = np.concatenate((-self.box["lower"], self.box["upper"]), axis=0) + # Explicitly convert to NumPy arrays before concatenation + lower_arr = np.array(self.box["lower"]) + upper_arr = np.array(self.box["upper"]) + h_tmp = np.concatenate((-lower_arr, upper_arr), axis=0) G = np.vstack((G, G_tmp)) if (G is not None) else G_tmp h = np.concatenate((h, h_tmp), axis=None) if h is not None else h_tmp @@ -150,8 +203,8 @@ def to_GhAb(self, lbub_to_G: bool = False) -> Dict[str, pd.DataFrame]: A = np.vstack((A, A_tmp)) if A is not None else A_tmp b = np.concatenate((b, b_tmp), axis=None) if b is not None else b_tmp if idx_eq.sum() < Amat.shape[0]: - G_tmp = Amat[idx_eq == False].to_numpy() - h_tmp = rhs[idx_eq == False].to_numpy() + G_tmp = Amat[np.logical_not(idx_eq)].to_numpy() + h_tmp = rhs[np.logical_not(idx_eq)].to_numpy() else: G_tmp = Amat.to_numpy() h_tmp = rhs.to_numpy() @@ -160,24 +213,41 @@ def to_GhAb(self, lbub_to_G: bool = False) -> Dict[str, pd.DataFrame]: G = np.vstack((G, G_tmp)) if G is not None else G_tmp h = np.concatenate((h, h_tmp), axis=None) if h is not None else h_tmp - # To ensure A and G are matrices (even with only 1 row) + # To ensure A and G are matrices (even if only one row) A = A.reshape(-1, A.shape[-1]) if A is not None else None G = G.reshape(-1, G.shape[-1]) if G is not None else None return {'G': G, 'h': h, 'A': A, 'b': b} +############################################################################### +# Helper Functions +############################################################################### +def match_arg(x, lst): + """ + Return the first element from lst that contains x. -# -------------------------------------------------------------------------- -# Helper functions -# -------------------------------------------------------------------------- + Args: + x: Substring to match. + lst: List of strings. -def match_arg(x, lst): + Returns: + The first matching string. + """ return [el for el in lst if x in el][0] -def box_constraint(box_type="LongOnly", - lower=None, - upper=None) -> dict: +def box_constraint(box_type="LongOnly", lower=None, upper=None) -> dict: + """ + Define a box constraint based on the type. + + Args: + box_type (str): Constraint type; options: "LongOnly", "LongShort", or "Unbounded". + lower: Lower bound(s). + upper: Upper bound(s). + + Returns: + dict: A dictionary with keys 'box_type', 'lower', and 'upper'. + """ box_type = match_arg(box_type, ["LongOnly", "LongShort", "Unbounded"]) if box_type == "Unbounded": @@ -192,13 +262,12 @@ def box_constraint(box_type="LongOnly", lower = 0 upper = 1 else: - lower = upper * 0 + lower = 0 # lower set to 0 when only upper is provided else: if not np.isscalar(lower): if any(l < 0 for l in lower): raise ValueError("Inconsistent lower bounds for box_type 'LongOnly'. " "Change box_type to LongShort or ensure that lower >= 0.") - upper = lower * 0 + 1 if upper is None else upper return {'box_type': box_type, 'lower': lower, 'upper': upper} @@ -208,6 +277,19 @@ def linear_constraint(Amat=None, rhs=float("inf"), index_or_name=None, a_values=None) -> dict: + """ + Create a dictionary representing a linear constraint. + + Args: + Amat: Constraint matrix. + sense (str): Constraint sense. Defaults to "=". + rhs: Right-hand side value. Defaults to infinity. + index_or_name: Optional index or name. + a_values: Optional additional values. + + Returns: + dict: A dictionary representing the linear constraint. + """ ans = {'Amat': Amat, 'sense': sense, 'rhs': rhs} @@ -216,4 +298,3 @@ def linear_constraint(Amat=None, if a_values is not None: ans['a_values'] = a_values return ans - diff --git a/src/helper_functions.py b/src/helper_functions.py index 868deab..86d1bde 100644 --- a/src/helper_functions.py +++ b/src/helper_functions.py @@ -8,13 +8,10 @@ Licensed under GNU LGPL.3, see LICENCE file ''' - ############################################################################ ### HELPER FUNCTIONS ############################################################################ - - from typing import Optional import pandas as pd import numpy as np @@ -24,20 +21,27 @@ from portfolio import Portfolio, Strategy +############################################################################ +# Matrix Utilities +############################################################################ +def nearestPD(A: np.ndarray) -> np.ndarray: + """ + Find the nearest positive-definite matrix to input matrix A. -def nearestPD(A): - """Find the nearest positive-definite matrix to input - - A Python/Numpy port of John D'Errico's `nearestSPD` MATLAB code [1], which - credits [2]. The code below is written by Cyril. + This is a Python/Numpy port of John D'Errico's `nearestSPD` MATLAB code [1], + which credits [2]. The code below is written by Cyril. [1] https://www.mathworks.com/matlabcentral/fileexchange/42885-nearestspd - [2] N.J. Higham, "Computing a nearest symmetric positive semidefinite - matrix" (1988): https://doi.org/10.1016/0024-3795(88)90223-6 - """ + matrix" (1988): https://doi.org/10.1016/0024-3795(88)90223-6 + Args: + A (np.ndarray): The input matrix. + + Returns: + np.ndarray: The nearest positive-definite matrix. + """ B = (A + A.T) / 2 _, s, V = np.linalg.svd(B) H = np.dot(V.T, np.dot(np.diag(s), V)) @@ -58,53 +62,109 @@ def nearestPD(A): return A3 -def isPD(B): - """Returns true when input is positive-definite, via Cholesky""" +def isPD(B: np.ndarray) -> bool: + """ + Returns true when input matrix B is positive-definite via Cholesky decomposition. + + Args: + B (np.ndarray): The matrix to check. + + Returns: + bool: True if B is positive-definite, False otherwise. + """ try: _ = np.linalg.cholesky(B) return True except np.linalg.LinAlgError: return False -def serialize_solution(name_suffix, solution, runtime): + +############################################################################ +# Serialization and Data Conversion Helpers +############################################################################ + +def serialize_solution(name_suffix: str, solution, runtime: float) -> None: + """ + Serialize the solution object and save it to a pickle file. + + Args: + name_suffix (str): Suffix for the filename. + solution: An optimization solution with attributes x, obj, and methods for residuals. + runtime (float): The runtime of the solution. + """ result = { - 'solution' : solution.x, - 'objective' : solution.obj, - 'primal_residual' :solution.primal_residual(), - 'dual_residual' : solution.dual_residual(), - 'duality_gap' : solution.duality_gap(), - 'runtime' : runtime + 'solution': solution.x, + 'objective': solution.obj, + 'primal_residual': solution.primal_residual(), + 'dual_residual': solution.dual_residual(), + 'duality_gap': solution.duality_gap(), + 'runtime': runtime } with open(f'{name_suffix}.pickle', 'wb') as handle: pickle.dump(result, handle, protocol=pickle.HIGHEST_PROTOCOL) + def to_numpy(data): + """ + Convert data to a NumPy array if possible. + + Args: + data: Input data that might have a 'to_numpy' method. + + Returns: + The NumPy array or None if data is None. + """ return None if data is None else data.to_numpy() if hasattr(data, 'to_numpy') else data -def output_to_strategies(output: dict) -> dict[int, Strategy]: +############################################################################ +# Portfolio Strategy Conversion +############################################################################ +def output_to_strategies(output: dict) -> dict: + """ + Convert output dictionary into a dictionary of Strategy instances. + + Each key in the output corresponds to a rebalance date and each strategy is + constructed from the weights provided. + + Args: + output (dict): Dictionary with keys as rebalance dates and values as dictionaries + containing weights for each strategy. + + Returns: + dict: A dictionary mapping strategy keys (e.g., 'q1', 'q2', ...) to Strategy objects. + """ N = len(output[list(output.keys())[0]]) strategy_dict = {} for i in range(N): - strategy_dict[f'q{i+1}'] = Strategy([]) + key = f'q{i+1}' + strategy_dict[key] = Strategy([]) for rebdate in output.keys(): weights = output[rebdate][f'weights_{i+1}'] if hasattr(weights, 'to_dict'): - weights = weights.to_dict() + weights = weights.to_dict() portfolio = Portfolio(rebdate, weights) - strategy_dict[f'q{i+1}'].portfolios.append(portfolio) + strategy_dict[key].portfolios.append(portfolio) return strategy_dict - -#------------------- Machine learning helpers ------------------- +############################################################################ +# Machine Learning Helpers +############################################################################ def calculate_rmse(y_true, y_pred): """ - Calculate the Root Mean Squared Error (RMSE) + Calculate the Root Mean Squared Error (RMSE). + + Args: + y_true: Ground truth values. + y_pred: Predicted values (can be a pandas object with a 'values' attribute). + + Returns: + float: The RMSE value. """ rmse = np.sqrt(np.mean((np.array(y_true) - np.array(y_pred.values)) ** 2)) return rmse @@ -112,18 +172,35 @@ def calculate_rmse(y_true, y_pred): def calculate_mape(y_true, y_pred): """ - Calculate the Mean Absolute Percentage Error (MAPE) % + Calculate the Mean Absolute Percentage Error (MAPE) in percent. + + Args: + y_true: Ground truth values. + y_pred: Predicted values. + + Returns: + float: The MAPE value. """ y_pred, y_true = np.array(y_pred), np.array(y_true) mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100 return mape -def show_result(predictions, y_test, y_actual, method = None): + +def show_result(predictions, y_test, y_actual, method=None): + """ + Display RMSE and MAPE metrics and plot predictions versus true values. + + Args: + predictions: Predicted values. + y_test: Values used to calculate metrics. + y_actual: Actual values for plotting. + method (optional): Method or model name for display purposes. + """ print(f'RMSE of linear regression: {calculate_rmse(y_test, predictions)}') print(f'MAPE of linear regression: {calculate_mape(y_test, predictions)}') - plt.plot(y_actual, color = 'cyan') - plt.plot(predictions, color = 'green') + plt.plot(y_actual, color='cyan') + plt.plot(predictions, color='green') plt.legend(["True values", "Prediction"]) plt.title(method) plt.show() diff --git a/src/optimization.py b/src/optimization.py index e87ba19..b3c0d58 100644 --- a/src/optimization.py +++ b/src/optimization.py @@ -8,146 +8,150 @@ Licensed under GNU LGPL.3, see LICENCE file ''' - ############################################################################ -### OPTIMIZATION +# OPTIMIZATION MODULE ############################################################################ - - from abc import ABC, abstractmethod from typing import Optional import numpy as np import pandas as pd - from helper_functions import to_numpy from covariance import Covariance from mean_estimation import MeanEstimator from constraints import Constraints from optimization_data import OptimizationData -import qp_problems - -# https://github.com/qpsolvers/qpsolvers - - - - - +import qp_problems # https://github.com/qpsolvers/qpsolvers class OptimizationParameter(dict): + """ + Dictionary-like container for optimization parameters with sensible defaults. + """ def __init__(self, **kwargs): - super(OptimizationParameter, self).__init__(**kwargs) + super().__init__(**kwargs) self.__dict__ = self - if not self.get('solver_name'): self['solver_name'] = 'cvxopt' - if not self.get('verbose'): self['verbose'] = True - if not self.get('allow_suboptimal'): self['allow_suboptimal'] = False + self.setdefault('solver_name', 'cvxopt') + self.setdefault('verbose', True) + self.setdefault('allow_suboptimal', False) class Objective(dict): - - def __init__(self, *args, **kwargs): - super(Objective, self).__init__(*args, **kwargs) + """ + Container for the optimization objective. + """ + pass class Optimization(ABC): + """ + Abstract base class for optimization models. + """ def __init__(self, - params: OptimizationParameter = None, - constraints: Constraints = None, + params: Optional[OptimizationParameter] = None, + constraints: Optional[Constraints] = None, **kwargs): - self.params = OptimizationParameter(**kwargs) if params is None else params + self.params = params if params is not None else OptimizationParameter(**kwargs) self.objective = Objective() - self.constraints = Constraints() if constraints is None else constraints + self.constraints = constraints if constraints is not None else Constraints() self.model = None self.results = None @abstractmethod def set_objective(self, optimization_data: OptimizationData) -> None: + """ + Construct the objective based on the optimization data. + Must be implemented in subclasses. + """ raise NotImplementedError("Method 'set_objective' must be implemented in derived class.") - @abstractmethod def solve(self) -> bool: + """ + Solve the optimization problem using the qpsolvers backend. + Returns a boolean indicating if the solution was found. + """ self.solve_qpsolvers() - return self.results['status'] + return self.results.get('status', False) def solve_qpsolvers(self) -> None: + """ + Build and solve the quadratic programming problem. + """ self.model_qpsolvers() self.model.solve() universe = self.constraints.selection solution = self.model['solution'] status = solution.found - weights = pd.Series(solution.x[:len(universe)] if status else [None] * len(universe), - index=universe) - + weights = pd.Series( + solution.x[:len(universe)] if status else [None] * len(universe), + index=universe + ) self.results = {'weights': weights.to_dict(), - 'status': self.model['solution'].found} - - return None + 'status': status} def model_qpsolvers(self) -> None: - # Ensure that P and q are numpy arrays - if 'P' in self.objective.keys(): + """ + Prepare the QP model using the objective and constraints. + """ + # Ensure that P is provided; default q to zeros if missing + if 'P' in self.objective: P = to_numpy(self.objective['P']) else: raise ValueError("Missing matrix 'P' in objective.") - if 'q' in self.objective.keys(): - q = to_numpy(self.objective['q']) - else: - q = np.zeros(len(self.constraints.selection)) + q = to_numpy(self.objective.get('q', np.zeros(len(self.constraints.selection)))) self.objective['P'] = P self.objective['q'] = q universe = self.constraints.selection - - # constraints - constraints = self.constraints - GhAb = constraints.to_GhAb() - - lb = constraints.box['lower'].to_numpy() if constraints.box['box_type'] != 'NA' else None - ub = constraints.box['upper'].to_numpy() if constraints.box['box_type'] != 'NA' else None - - self.model = qp_problems.QuadraticProgram(P=self.objective['P'], - q=self.objective['q'], - constant=self.objective.get('constant'), - G=GhAb['G'], - h=GhAb['h'], - A=GhAb['A'], - b=GhAb['b'], - lb=lb, - ub=ub, - params=self.params) - - # Choose which reference position to be used - tocon = self.constraints.l1.get('turnover') - x0 = tocon['x0'] if tocon is not None and tocon.get('x0') is not None else self.params.get('x0') + ghab = self.constraints.to_GhAb() + + lb = (self.constraints.box['lower'].to_numpy() + if self.constraints.box['box_type'] != 'NA' else None) + ub = (self.constraints.box['upper'].to_numpy() + if self.constraints.box['box_type'] != 'NA' else None) + + self.model = qp_problems.QuadraticProgram( + P=self.objective['P'], + q=self.objective['q'], + constant=self.objective.get('constant'), + G=ghab['G'], + h=ghab['h'], + A=ghab['A'], + b=ghab['b'], + lb=lb, + ub=ub, + params=self.params + ) + + # Set reference position for turnover constraints if provided + turnover = self.constraints.l1.get('turnover') + x0 = (turnover.get('x0') if turnover is not None and turnover.get('x0') is not None + else self.params.get('x0')) x_init = {asset: x0.get(asset, 0) for asset in universe} if x0 is not None else None - # Transaction cost in the objective transaction_cost = self.params.get('transaction_cost') if transaction_cost is not None and x_init is not None: self.model.linearize_turnover_objective(pd.Series(x_init), transaction_cost) + elif turnover and not transaction_cost and x_init is not None: + self.model.linearize_turnover_constraint(pd.Series(x_init), turnover['rhs']) - # Turnover constraint - if tocon and not transaction_cost and x_init is not None: - self.model.linearize_turnover_constraint(pd.Series(x_init), tocon['rhs']) - - # Leverage constraint + # Leverage constraint linearization levcon = self.constraints.l1.get('leverage') if levcon is not None: self.model.linearize_leverage_constraint(N=len(universe), leverage_budget=levcon['rhs']) - return None - - class EmptyOptimization(Optimization): + """ + Dummy optimization implementation. + """ - def set_objective(self) -> None: + def set_objective(self, optimization_data: OptimizationData) -> None: pass def solve(self) -> bool: @@ -155,29 +159,36 @@ def solve(self) -> bool: class MeanVariance(Optimization): + """ + Mean-Variance optimization implementation. + """ def __init__(self, covariance: Optional[Covariance] = None, mean_estimator: Optional[MeanEstimator] = None, **kwargs): super().__init__(**kwargs) - self.covariance = Covariance() if covariance is None else covariance - self.mean_estimator = MeanEstimator() if mean_estimator is None else MeanEstimator + self.covariance = covariance if covariance is not None else Covariance() + # Fix: use provided mean_estimator or instantiate one + self.mean_estimator = mean_estimator if mean_estimator is not None else MeanEstimator() self.params.setdefault('risk_aversion', 1) def set_objective(self, optimization_data: OptimizationData) -> None: - covmat = self.covariance.estimate(X = optimization_data['return_series']) - covmat = covmat * self.params['risk_aversion'] * 2 - mu = self.mean_estimator.estimate(X = optimization_data['return_series']) * (-1) - self.objective = Objective(q = mu, - P = covmat) - return None - - def solve(self) -> bool: + returns = optimization_data['return_series'] + covmat = self.covariance.estimate(X=returns) + # Scale covariance matrix by risk aversion (multiplied by 2 for QP formulation) + covmat *= self.params['risk_aversion'] * 2 + mu = self.mean_estimator.estimate(X=returns) * (-1) + self.objective = Objective(q=mu, P=covmat) + + def solve(self) -> bool: return super().solve() class QEQW(Optimization): + """ + Equal-Weighted optimization with a covariance-based objective. + """ def __init__(self, **kwargs): super().__init__(**kwargs) @@ -188,80 +199,80 @@ def set_objective(self, optimization_data: OptimizationData) -> None: covmat = self.covariance.estimate(X=X) * 2 mu = np.zeros(X.shape[1]) self.objective = Objective(P=covmat, q=mu) - return None def solve(self) -> bool: return super().solve() - class LeastSquares(Optimization): + """ + Least squares optimization. + """ - def __init__(self, - covariance: Optional[Covariance] = None, - **kwargs): + def __init__(self, covariance: Optional[Covariance] = None, **kwargs): super().__init__(**kwargs) self.covariance = covariance def set_objective(self, optimization_data: OptimizationData) -> None: - X = optimization_data['return_series'] y = optimization_data['bm_series'] + if self.params.get('log_transform'): X = np.log(1 + X) y = np.log(1 + y) - # 0.5 * w * P * w' - q * w' + constant + # 0.5 * w' P w - q' w + constant P = 2 * (X.T @ X) q = to_numpy(-2 * X.T @ y).reshape((-1,)) constant = to_numpy(y.T @ y).item() + # Add L2 penalty if provided l2_penalty = self.params.get('l2_penalty') - if l2_penalty is not None and l2_penalty != 0: + if l2_penalty: P += 2 * l2_penalty * np.eye(X.shape[1]) - self.objective = Objective(P=P, - q=q, - constant=constant) - return None + self.objective = Objective(P=P, q=q, constant=constant) def solve(self) -> bool: return super().solve() class WeightedLeastSquares(Optimization): + """ + Weighted least squares optimization. + """ def set_objective(self, optimization_data: OptimizationData) -> None: - X = optimization_data['return_series'] y = optimization_data['bm_series'] + if self.params.get('log_transform'): X = np.log(1 + X) y = np.log(1 + y) tau = self.params['tau'] lambda_val = np.exp(-np.log(2) / tau) - i = np.arange(X.shape[0]) - wt_tmp = lambda_val ** i - wt = np.flip(wt_tmp / np.sum(wt_tmp) * len(wt_tmp)) - W = np.diag(wt) + indices = np.arange(X.shape[0]) + weights_tmp = lambda_val ** indices + # Flip weights so that more recent observations get higher weight + weights_norm = np.flip(weights_tmp / np.sum(weights_tmp) * len(weights_tmp)) + W = np.diag(weights_norm) - P = 2 * ((X.T).to_numpy() @ W @ X) - q = -2 * (X.T).to_numpy() @ W @ y - constant = (y.T).to_numpy() @ W @ y + # Convert DataFrame columns to numpy arrays explicitly + P = 2 * (X.T.to_numpy() @ W @ X) + q = -2 * (X.T.to_numpy() @ W @ y) + constant = (y.T.to_numpy() @ W @ y) - self.objective = Objective(P=P, - q=q, - constant=constant) - return None + self.objective = Objective(P=P, q=q, constant=constant) def solve(self) -> bool: return super().solve() - class LAD(Optimization): - # Least Absolute Deviation (same as mean absolute deviation, MAD) + """ + Least Absolute Deviation (LAD) optimization, also known as Mean Absolute Deviation (MAD). + """ def __init__(self, **kwargs): super().__init__(**kwargs) @@ -271,6 +282,7 @@ def __init__(self, **kwargs): def set_objective(self, optimization_data: OptimizationData) -> None: X = optimization_data['return_series'] y = optimization_data['bm_series'] + if self.params.get('use_level'): X = (1 + X).cumprod() y = (1 + y).cumprod() @@ -280,39 +292,38 @@ def set_objective(self, optimization_data: OptimizationData) -> None: self.objective = Objective(X=X, y=y) - return None - def solve(self) -> bool: - # Note: Should use an interior point linear solver instead of qpsolvers + # Note: In practice an interior point linear solver might be preferred. self.model_qpsolvers() self.model.solve() - weights = pd.Series(self.model['solution'].x[0:len(self.constraints.selection)], + weights = pd.Series(self.model['solution'].x[:len(self.constraints.selection)], index=self.constraints.selection) self.results = {'weights': weights.to_dict()} return True def model_qpsolvers(self) -> None: - # Data and constraints + """ + Build the QP model for the LAD problem. + """ X = to_numpy(self.objective['X']) y = to_numpy(self.objective['y']) - GhAb = self.constraints.to_GhAb() + ghab = self.constraints.to_GhAb() N = X.shape[1] T = X.shape[0] # Inequality constraints - G_tilde = np.pad(GhAb['G'], [(0, 0), (0, 2 * T)]) if GhAb['G'] is not None else None - h_tilde = GhAb['h'] + G_tilde = np.pad(ghab['G'], [(0, 0), (0, 2 * T)]) if ghab['G'] is not None else None + h_tilde = ghab['h'] # Equality constraints - A = GhAb['A'] - meq = 0 if A is None else 1 if A.ndim == 1 else A.shape[0] - - A_tilde = np.zeros(shape=(T, N + 2 * T)) if A is None else np.pad(A, [(0, T), (0, 2 * T)]) - A_tilde[meq:(T + meq), 0:N] = X - A_tilde[meq:(T + meq), N:(N + T)] = np.eye(T) - A_tilde[meq:(T + meq), (N + T):] = -np.eye(T) + A = ghab['A'] + meq = 0 if A is None else (1 if A.ndim == 1 else A.shape[0]) + A_tilde = np.zeros((T, N + 2 * T)) if A is None else np.pad(A, [(0, T), (0, 2 * T)]) + A_tilde[meq:(T + meq), :N] = X + A_tilde[meq:(T + meq), N:N + T] = np.eye(T) + A_tilde[meq:(T + meq), N + T:] = -np.eye(T) - b_tilde = y if GhAb['b'] is None else np.append(GhAb['b'], y) + b_tilde = y if ghab['b'] is None else np.append(ghab['b'], y) lb = to_numpy(self.constraints.box['lower']) if self.constraints.box['box_type'] != 'NA' else np.full(N, -np.inf) lb = np.pad(lb, (0, 2 * T)) @@ -320,98 +331,104 @@ def model_qpsolvers(self) -> None: ub = to_numpy(self.constraints.box['upper']) if self.constraints.box['box_type'] != 'NA' else np.full(N, np.inf) ub = np.pad(ub, (0, 2 * T), constant_values=np.inf) - # Objective function + # Objective function for LAD q = np.append(np.zeros(N), np.ones(2 * T)) P = np.diag(np.zeros(N + 2 * T)) - if 'leverage' in self.constraints.l1.keys(): + # Leverage constraints handling + if 'leverage' in self.constraints.l1: lev_budget = self.constraints.l1['leverage']['rhs'] - # Auxiliary variables to deal with the abs() function A_tilde = np.pad(A_tilde, [(0, 0), (0, 2 * N)]) lev_eq = np.hstack((np.eye(N), np.zeros((N, 2 * T)), -np.eye(N), np.eye(N))) A_tilde = np.vstack((A_tilde, lev_eq)) - b_tilde = np.append(b_tilde, np.zeros()) - + b_tilde = np.append(b_tilde, np.zeros(N)) G_tilde = np.pad(G_tilde, [(0, 0), (0, 2 * N)]) lev_ineq = np.append(np.zeros(N + 2 * T), np.ones(2 * N)) G_tilde = np.vstack((G_tilde, lev_ineq)) - h_tilde = np.append(GhAb['h'], [lev_budget]) - + h_tilde = np.append(ghab['h'], [lev_budget]) lb = np.pad(lb, (0, 2 * N)) ub = np.pad(lb, (0, 2 * N), constant_values=np.inf) - self.model = qp_problems.QuadraticProgram(P=P, - q=q, - G=G_tilde, - h=h_tilde, - A=A_tilde, - b=b_tilde, - lb=lb, - ub=ub, - params=self.params) - return None - + self.model = qp_problems.QuadraticProgram( + P=P, + q=q, + G=G_tilde, + h=h_tilde, + A=A_tilde, + b=b_tilde, + lb=lb, + ub=ub, + params=self.params + ) class PercentilePortfolios(Optimization): + """ + Constructs portfolios based on percentiles of score distributions. + """ def __init__(self, field: Optional[str] = None, estimator: Optional[MeanEstimator] = None, - n_percentiles = 5, # creates quintile portfolios by default. + n_percentiles: int = 5, # Defaults to quintile portfolios. **kwargs): super().__init__(**kwargs) self.estimator = estimator - self.params = {'solver_name': 'percentile', - 'n_percentiles': n_percentiles, - 'field': field} + # Overwrite parameters with percentile-specific values + self.params = OptimizationParameter(solver_name='percentile', + n_percentiles=n_percentiles, + field=field) def set_objective(self, optimization_data: OptimizationData) -> None: - + """ + Define the objective based on score percentiles. + """ field = self.params.get('field') if self.estimator is not None: if field is not None: - raise ValueError('Either specify a "field" or pass an "estimator", but not both.') - else: - scores = self.estimator.estimate(X = optimization_data['return_series']) + raise ValueError('Specify either a "field" or pass an "estimator", but not both.') + scores = self.estimator.estimate(X=optimization_data['return_series']) else: if field is not None: scores = optimization_data['scores'][field] else: score_weights = self.params.get('score_weights') if score_weights is not None: - # Compute weighted average scores = ( - optimization_data['scores'][score_weights.keys()] - .multiply(score_weights.values()) + optimization_data['scores'][list(score_weights.keys())] + .multiply(list(score_weights.values())) .sum(axis=1) ) else: - scores = optimization_data['scores'].mean(axis = 1).squeeze() + scores = optimization_data['scores'].mean(axis=1).squeeze() - # Add tiny noise to zeros since otherwise there might be two threshold values == 0 + # Avoid duplicated thresholds at zero by adding minimal noise scores[scores == 0] = np.random.normal(0, 1e-10, scores[scores == 0].shape) - self.objective = Objective(scores = -scores) - - return None + self.objective = Objective(scores=-scores) def solve(self) -> bool: - + """ + Constructs percentile-based portfolios and assigns long/short weights. + """ scores = self.objective['scores'] - N = self.params['n_percentiles'] - q_vec = np.linspace(0, 100, N + 1) - th = np.percentile(scores, q_vec) - lID = [] - w_dict = {} - for i in range(1, len(th)): + n_percentiles = self.params['n_percentiles'] + percentile_edges = np.linspace(0, 100, n_percentiles + 1) + thresholds = np.percentile(scores, percentile_edges) + portfolio_ids = {} + for i in range(1, len(thresholds)): if i == 1: - lID.append(list(scores.index[scores <= th[i]])) + indices = scores.index[scores <= thresholds[i]] else: - lID.append(list(scores.index[np.logical_and(scores > th[i-1], scores <= th[i])])) - w_dict[i] = scores[lID[i-1]] * 0 + 1 / len(lID[i-1]) - weights = scores * 0 - weights[w_dict[1].keys()] = 1 / len(w_dict[1].keys()) - weights[w_dict[N].keys()] = -1 / len(w_dict[N].keys()) + indices = scores.index[(scores > thresholds[i - 1]) & (scores <= thresholds[i])] + portfolio_ids[i] = indices + # Build weights: assign long to the bottom percentile and short to the top percentile + weights = scores * 0.0 + if portfolio_ids.get(1): + long_weight = 1 / len(portfolio_ids[1]) + weights.loc[portfolio_ids[1]] = long_weight + if portfolio_ids.get(n_percentiles): + short_weight = -1 / len(portfolio_ids[n_percentiles]) + weights.loc[portfolio_ids[n_percentiles]] = short_weight self.results = {'weights': weights.to_dict(), - 'w_dict': w_dict} + 'portfolio_ids': portfolio_ids} return True diff --git a/src/portfolio.py b/src/portfolio.py index dbef928..fdf39d7 100644 --- a/src/portfolio.py +++ b/src/portfolio.py @@ -14,20 +14,16 @@ import numpy as np - - - class Portfolio: - def __init__(self, rebalancing_date: str = None, - weights: dict = {}, + weights: dict = None, name: str = None, - init_weights: dict = {}): + init_weights: dict = None): self.rebalancing_date = rebalancing_date - self.weights = weights + self.weights = weights if weights is not None else {} self.name = name - self.init_weights = init_weights + self.init_weights = init_weights if init_weights is not None else {} @staticmethod def empty() -> 'Portfolio': @@ -76,12 +72,14 @@ def float_weights(self, return_series: pd.DataFrame, end_date: str, rescale: bool = False): - if self.weights is not None: - return floating_weights(X=return_series, - w=self.weights, - start_date=self.rebalancing_date, - end_date=end_date, - rescale=rescale) + if self.weights is not None and self.weights != {}: + return floating_weights( + X=return_series, + w=self.weights, + start_date=self.rebalancing_date, + end_date=end_date, + rescale=rescale + ) else: return None @@ -90,39 +88,51 @@ def initial_weights(self, return_series: pd.DataFrame, end_date: str, rescale: bool = True) -> dict[str, float]: - if not hasattr(self, '_initial_weights'): - if self.rebalancing_date is not None and self.weights is not None: + if self.rebalancing_date is not None and self.weights: w_init = dict.fromkeys(selection, 0) - w_float = self.float_weights(return_series=return_series, - end_date=end_date, - rescale=rescale) - w_floated = w_float.iloc[-1] - - w_init.update({key: w_floated[key] for key in w_init.keys() & w_floated.keys()}) - self._initial_weights = w_init + w_float = self.float_weights( + return_series=return_series, + end_date=end_date, + rescale=rescale + ) + if w_float is None or w_float.empty: + self._initial_weights = None + else: + # Use intersection of keys safely by converting to sets. + common_keys = set(w_init.keys()).intersection(set(w_float.columns)) + w_floated = w_float.iloc[-1] + w_init.update({key: w_floated[key] for key in common_keys}) + self._initial_weights = w_init else: - self._initial_weights = None # {key: 0 for key in selection} - + self._initial_weights = None return self._initial_weights def turnover(self, portfolio: "Portfolio", return_series: pd.DataFrame, rescale=True): - if portfolio.rebalancing_date is not None and portfolio.rebalancing_date < self.rebalancing_date: - w_init = portfolio.initial_weights(selection=self.weights.keys(), - return_series=return_series, - end_date=self.rebalancing_date, - rescale=rescale) + # Fix: Compare dates as datetime objects rather than as raw strings. + date_self = pd.to_datetime(self.rebalancing_date) if self.rebalancing_date else None + date_other = pd.to_datetime(portfolio.rebalancing_date) if portfolio.rebalancing_date else None + + if date_other is not None and date_self is not None and date_other < date_self: + w_init = portfolio.initial_weights( + selection=list(self.weights.keys()), + return_series=return_series, + end_date=self.rebalancing_date, + rescale=rescale + ) else: - w_init = self.initial_weights(selection=portfolio.weights.keys(), - return_series=return_series, - end_date=portfolio.rebalancing_date, - rescale=rescale) - + w_init = self.initial_weights( + selection=list(portfolio.weights.keys()), + return_series=return_series, + end_date=portfolio.rebalancing_date, + rescale=rescale + ) + if w_init is None: + return 0.0 return pd.Series(w_init).sub(pd.Series(portfolio.weights), fill_value=0).abs().sum() class Strategy: - def __init__(self, portfolios: list[Portfolio]): self.portfolios = portfolios @@ -140,7 +150,6 @@ def portfolios(self, new_portfolios: list[Portfolio]): def clear(self) -> None: self.portfolios.clear() - return None def get_rebalancing_dates(self): return [portfolio.rebalancing_date for portfolio in self.portfolios] @@ -158,25 +167,29 @@ def get_weights_df(self) -> pd.DataFrame: return pd.DataFrame(weights_dict).T def get_portfolio(self, rebalancing_date: str) -> Portfolio: - if rebalancing_date in self.get_rebalancing_dates(): - idx = self.get_rebalancing_dates().index(rebalancing_date) + dates = self.get_rebalancing_dates() + if rebalancing_date in dates: + idx = dates.index(rebalancing_date) return self.portfolios[idx] else: raise ValueError(f'No portfolio found for rebalancing date {rebalancing_date}') def has_previous_portfolio(self, rebalancing_date: str) -> bool: dates = self.get_rebalancing_dates() - ans = False - if len(dates) > 0: - ans = dates[0] < rebalancing_date - return ans + if dates: + # Compare as datetime objects for safety. + first_date = pd.to_datetime(dates[0]) + curr_date = pd.to_datetime(rebalancing_date) + return first_date < curr_date + return False def get_previous_portfolio(self, rebalancing_date: str) -> Portfolio: if not self.has_previous_portfolio(rebalancing_date): return Portfolio.empty() else: - yesterday = [x for x in self.get_rebalancing_dates() if x < rebalancing_date][-1] - return self.get_portfolio(yesterday) + previous_dates = [x for x in self.get_rebalancing_dates() if pd.to_datetime(x) < pd.to_datetime(rebalancing_date)] + previous_date = sorted(previous_dates)[-1] + return self.get_portfolio(previous_date) def get_initial_portfolio(self, rebalancing_date: str) -> Portfolio: if self.has_previous_portfolio(rebalancing_date=rebalancing_date): @@ -193,65 +206,72 @@ def number_of_assets(self, th: float = 0.0001) -> pd.Series: def turnover(self, return_series, rescale=True) -> pd.Series: dates = self.get_rebalancing_dates() - turnover = {} - for rebalancing_date in dates: - previous_portfolio = self.get_previous_portfolio(rebalancing_date) - current_portfolio = self.get_portfolio(rebalancing_date) - turnover[rebalancing_date] = current_portfolio.turnover(portfolio=previous_portfolio, - return_series=return_series, - rescale=rescale) - return pd.Series(turnover) + turnover_dict = {} + for r_date in dates: + previous_portfolio = self.get_previous_portfolio(r_date) + current_portfolio = self.get_portfolio(r_date) + turnover_dict[r_date] = current_portfolio.turnover( + portfolio=previous_portfolio, + return_series=return_series, + rescale=rescale + ) + return pd.Series(turnover_dict) def simulate(self, - return_series=None, + return_series: pd.DataFrame = None, fc: float = 0, vc: float = 0, n_days_per_year: int = 252) -> pd.Series: - rebdates = self.get_rebalancing_dates() ret_list = [] - for rebdate in rebdates: - next_rebdate = rebdates[rebdates.index(rebdate) + 1] if rebdate < rebdates[-1] else return_series.index[-1] + for i, rebdate in enumerate(rebdates): + # Fix: Compare dates as datetimes. + if i < len(rebdates) - 1: + next_rebdate = rebdates[i + 1] + else: + next_rebdate = return_series.index[-1] portfolio = self.get_portfolio(rebdate) - w_float = portfolio.float_weights(return_series=return_series, - end_date=next_rebdate, - rescale=False) # Note that rescale is hardcoded to False. + w_float = portfolio.float_weights( + return_series=return_series, + end_date=next_rebdate, + rescale=False + ) short_positions = list(filter(lambda x: x < 0, portfolio.weights.values())) long_positions = list(filter(lambda x: x >= 0, portfolio.weights.values())) margin = abs(sum(short_positions)) cash = max(min(1 - sum(long_positions), 1), 0) loan = 1 - (sum(long_positions) + cash) - (sum(short_positions) + margin) + w_float.insert(0, 'margin', margin) w_float.insert(0, 'cash', cash) w_float.insert(0, 'loan', loan) level = w_float.sum(axis=1) - ret_tmp = level.pct_change(1) # 1 for one day lookback + ret_tmp = level.pct_change(1) # One day lookback. ret_list.append(ret_tmp) portf_ret = pd.concat(ret_list).dropna() if vc != 0: - to = self.turnover(return_series=return_series, - rescale=False) # Note that rescale is hardcoded to False. + to = self.turnover(return_series=return_series, rescale=False) varcost = to * vc - portf_ret[0] -= varcost[0] - portf_ret[varcost[1:].index] -= varcost[1:].values + portf_ret = portf_ret.subtract(varcost, fill_value=0) + if fc != 0: - n_days = (portf_ret.index[1:] - portf_ret.index[:-1]).to_numpy().astype('timedelta64[D]').astype(int) - fixcost = (1 + fc) ** (n_days / n_days_per_year) - 1 - portf_ret[1:] -= fixcost + date_index = pd.to_datetime(portf_ret.index) + n_days = (date_index[1:] - date_index[:-1]).days + fixcost = (1 + fc) ** (np.array(n_days) / n_days_per_year) - 1 + fixcost_series = pd.Series(fixcost, index=portf_ret.index[1:]) + portf_ret.loc[fixcost_series.index] = portf_ret.loc[fixcost_series.index] - fixcost_series return portf_ret - - # -------------------------------------------------------------------------- # Helper functions # -------------------------------------------------------------------------- -def floating_weights(X, w, start_date, end_date, rescale=True): +def floating_weights(X: pd.DataFrame, w: dict, start_date, end_date, rescale=True): start_date = pd.to_datetime(start_date) end_date = pd.to_datetime(end_date) if start_date < X.index[0]: diff --git a/src/selection.py b/src/selection.py index 562f8db..9a147a7 100644 --- a/src/selection.py +++ b/src/selection.py @@ -15,13 +15,10 @@ -from typing import Union, Optional +from typing import Union, Optional, List import pandas as pd - - class Selection: - def __init__(self, ids: pd.Index = pd.Index([])): self._filtered: dict[str, Union[pd.Series, pd.DataFrame]] = {} self.selected = ids @@ -31,68 +28,60 @@ def selected(self) -> pd.Index: return self._selected @selected.setter - def selected(self, value): + def selected(self, value: pd.Index): if not isinstance(value, pd.Index): - raise ValueError( - "Inconsistent input type for selected.setter. Needs to be a pd.Index." - ) + raise ValueError("Inconsistent input type for selected.setter. Needs to be a pd.Index.") self._selected = value @property def filtered(self): return self._filtered - def get_selected(self, filter_names: Optional[list[str]] = None) -> pd.Index: + def get_selected(self, filter_names: Optional[List[str]] = None) -> pd.Index: if filter_names is not None: df = self.df_binary(filter_names) else: df = self.df_binary() + #return only rows where all binary columns equal 1 return df[df.eq(1).all(axis=1)].index def clear(self) -> None: self.selected = pd.Index([]) self._filtered = {} - def add_filtered(self, - filter_name: str, - value: Union[pd.Series, pd.DataFrame]) -> None: - - # Check input types + def add_filtered(self, filter_name: str, value: Union[pd.Series, pd.DataFrame]) -> None: + #check input types if not isinstance(filter_name, str) or not filter_name.strip(): raise ValueError("Argument 'filter_name' must be a nonempty string.") if not isinstance(value, pd.Series) and not isinstance(value, pd.DataFrame): - raise ValueError( - 'Inconsistent input type. Needs to be a pd.Series or a pd.DataFrame.' - ) + raise ValueError("Inconsistent input type. Needs to be a pd.Series or a pd.DataFrame.") # Ensure that column 'binary' is of type int if it exists if isinstance(value, pd.Series): if value.name == 'binary': if not value.isin([0, 1]).all(): raise ValueError("Column 'binary' must contain only 0s and 1s.") - else: - value = value.astype(int) - - if isinstance(value, pd.DataFrame): + value = value.astype(int) + elif isinstance(value, pd.DataFrame): if 'binary' in value.columns: if not value['binary'].isin([0, 1]).all(): raise ValueError("Column 'binary' must contain only 0s and 1s.") - else: - value['binary'] = value['binary'].astype(int) + value['binary'] = value['binary'].astype(int) - # Add to filtered + #add to filtered self._filtered[filter_name] = value - # Reset selected + #reset selected based on the updated filters self.selected = self.get_selected() - return None - - def df(self, filter_names: Optional[list[str]] = None) -> pd.DataFrame: + def df(self, filter_names: Optional[List[str]] = None) -> pd.DataFrame: if filter_names is None: - filter_names = self.filtered.keys() - return pd.concat( + filter_names = list(self.filtered.keys()) + #if there are no filters, return an empty DataFrame + if not filter_names: + return pd.DataFrame() + df_concat = pd.concat( { key: ( pd.DataFrame(self.filtered[key]) @@ -101,13 +90,17 @@ def df(self, filter_names: Optional[list[str]] = None) -> pd.DataFrame: ) for key in filter_names }, - axis = 1, + axis=1, ) + return df_concat - def df_binary(self, filter_names: Optional[list[str]] = None) -> pd.DataFrame: - + def df_binary(self, filter_names: Optional[List[str]] = None) -> pd.DataFrame: if filter_names is None: - filter_names = self.filtered.keys() - df = self.df(filter_names = filter_names).filter(like = 'binary').dropna() - df.columns = df.columns.droplevel(1) + filter_names = list(self.filtered.keys()) + if not filter_names: + return pd.DataFrame() + df = self.df(filter_names=filter_names).filter(like='binary').dropna() + if isinstance(df.columns, pd.MultiIndex) and df.columns.nlevels > 1: + df.columns = df.columns.droplevel(1) return df +