diff --git a/src/qp_problems.py b/src/qp_problems.py index 44bbf7d..ea9b373 100644 --- a/src/qp_problems.py +++ b/src/qp_problems.py @@ -228,3 +228,157 @@ def serialize(self, path, **kwargs): def load(path, **kwargs): with open(path, 'rb'): return pickle.load(path, kwargs) + +# This class converts a financial optimization problem to a standard linear optimization. +class LinearProgram(dict): + def __init__(self, *args, **kwargs): + super(LinearProgram, self).__init__(*args, **kwargs) + + # Ensure 'params' exists in self + if 'params' not in self: + self['params'] = {} # Initialize it if missing + + # Avoid KeyError by providing a default solver name + self.solver = self['params'].get('solver_name', 'cvxopt') + + self['P'] = None # No quadratic term for linear programs + self['q'] = None + self['G'] = None + self['h'] = None + self['A'] = None + self['b'] = None + self['lb'] = None + self['ub'] = None + + # Ensure constraints and variables lists exist + self.constraints = [] + self.variables = [] + + def set_objective(self, q) -> None: + """Set the linear objective function coefficients.""" + self.q = q + + def add_constraint(self, A: np.ndarray, b: np.ndarray, sense: str) -> None: + """Add a constraint to the linear program.""" + if self.get('A') is None: + self['A'] = A + self['b'] = b + else: + self['A'] = np.vstack([self['A'], A]) + self['b'] = np.hstack([self['b'], b]) + + def add_variable(self, name: str, lb: float = 0, ub: float = None) -> None: + """Add a variable to the linear program.""" + if self.get('lb') is None: + self['lb'] = np.array([lb]) + else: + self['lb'] = np.append(self['lb'], lb) + + if self.get('ub') is None: + self['ub'] = np.array([ub if ub is not None else float('inf')]) + else: + self['ub'] = np.append(self['ub'], ub if ub is not None else float('inf')) + + def linearize_constraints(self, x_init: np.ndarray) -> None: + # Dimensions + n = len(self.get('q')) + m = 0 if self.get('G') is None else self.get('G').shape[0] + + # Objective (for linear programs, it's just the linear term, no quadratic part) + P = np.zeros((n, n)) if self.get('P') is None else self['P'] + q = np.pad(self['q'], (0, n)) if self.get('q') is not None else np.zeros(n) + + # Inequality constraints + G = np.zeros(shape=(m + n, n)) + if self.get('G') is not None: + G[0:m, :] = self.get('G') + G[m:(m + n), :] = np.eye(n) + h = self.get('h') if self.get('h') is not None else np.empty(shape=(0,)) + h = np.append(h, np.append(x_init, np.zeros(n))) + + # Equality constraints (if any) + A = np.pad(self['A'], [(0, 0), (0, n)]) if self.get('A') is not None else None + + lb = np.pad(self['lb'], (0, n)) if self.get('lb') is not None else None + ub = np.pad(self['ub'], (0, n), constant_values=float('inf')) if self.get('ub') is not None else None + + # Update the dictionary with the new linearized values + self.update({'P': P, + 'q': q, + 'G': G, + 'h': h, + 'A': A, + 'lb': lb, + 'ub': ub}) + + def is_feasible(self) -> bool: + problem = qpsolvers.Problem(P=np.zeros(self.get('P').shape), + q=np.zeros(self.get('P').shape[0]), + G=self.get('G'), + h=self.get('h'), + A=self.get('A'), + b=self.get('b'), + lb=self.get('lb'), + ub=self.get('ub')) + + # Convert to sparse matrices for best performance + if self.solver in SPARSE_SOLVERS: + if self['params'].get('sparse'): + if problem.P is not None: + problem.P = scipy.sparse.csc_matrix(problem.P) + if problem.A is not None: + problem.A = scipy.sparse.csc_matrix(problem.A) + if problem.G is not None: + problem.G = scipy.sparse.csc_matrix(problem.G) + solution = qpsolvers.solve_problem(problem=problem, + solver=self.solver, + initvals=self.get('x0'), + verbose=False) + return solution.found + + def solve(self) -> None: + # Ensure the dimensions of matrices are valid + P = self.get('P') + if P is not None and not isPD(P): + self['P'] = nearestPD(P) + + problem = qpsolvers.Problem(P=self.get('P'), + q=self.get('q'), + G=self.get('G'), + h=self.get('h'), + A=self.get('A'), + b=self.get('b'), + lb=self.get('lb'), + ub=self.get('ub')) + + # Convert to sparse matrices for best performance + if self.solver in SPARSE_SOLVERS: + if self['params'].get('sparse'): + if problem.P is not None: + problem.P = scipy.sparse.csc_matrix(problem.P) + if problem.A is not None: + problem.A = scipy.sparse.csc_matrix(problem.A) + if problem.G is not None: + problem.G = scipy.sparse.csc_matrix(problem.G) + + solution = qpsolvers.solve_problem(problem=problem, + solver=self.solver, + initvals=self.get('x0'), + verbose=False) + self['solution'] = solution + return None + + # 0.5 * x' * P * x + q' * x + const + def objective_value(self, x: np.ndarray, with_const: bool = True) -> float: + const = 0 if self.get('constant') is None or not with_const else self['constant'] + return (0.5 * (x @ self.get('P') @ x) + self.get('q') @ x).item() + const + + def serialize(self, path, **kwargs): + with open(path, 'wb') as f: + pickle.dump(self, f, kwargs) + + @staticmethod + def load(path, **kwargs): + with open(path, 'rb') as f: + return pickle.load(f, kwargs) + \ No newline at end of file diff --git a/src/test_linear_program.py b/src/test_linear_program.py new file mode 100644 index 0000000..5066219 --- /dev/null +++ b/src/test_linear_program.py @@ -0,0 +1,53 @@ +import unittest +import numpy as np +from qp_problems import LinearProgram + +class TestLinearProgram(unittest.TestCase): + def setUp(self): + """Initialize a LinearProgram instance before each test.""" + self.lp = LinearProgram(params={'solver_name': 'some_solver'}) + + def test_set_objective(self): + """Test setting the objective function.""" + q = np.array([1, 2]) + self.lp.set_objective(q) + self.assertTrue(hasattr(self.lp, 'q')) + np.testing.assert_array_equal(self.lp.q, q) + + def test_add_variables(self): + """Test adding variables.""" + self.lp.add_variable('x1', 0, 10) + self.lp.add_variable('x2', 0, 5) + self.assertEqual(len(self.lp.variables), 2) + + def test_add_constraints(self): + """Test adding constraints.""" + A = np.array([[1, 1], [2, 1]]) + b = np.array([5, 8]) + self.lp.add_constraint(A, b, '<=') + self.assertEqual(len(self.lp.constraints), 2) + + def test_solve(self): + """Test solving the linear program.""" + self.lp.set_objective(np.array([1, 2])) + self.lp.add_variable('x1', 0, 10) + self.lp.add_variable('x2', 0, 5) + self.lp.add_constraint(np.array([[1, 1], [2, 1]]), np.array([5, 8]), '<=') + + result = self.lp.solve() + self.assertIsNotNone(result) + self.assertTrue(hasattr(self.lp, 'solution')) + + def test_objective_value(self): + """Test retrieving the objective value after solving.""" + self.lp.set_objective(np.array([1, 2])) + self.lp.add_variable('x1', 0, 10) + self.lp.add_variable('x2', 0, 5) + self.lp.add_constraint(np.array([[1, 1], [2, 1]]), np.array([5, 8]), '<=') + self.lp.solve() + + obj_val = self.lp.objective_value() + self.assertIsInstance(obj_val, (int, float)) + +if __name__ == '__main__': + unittest.main()