Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions src/qp_problems.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,157 @@ def serialize(self, path, **kwargs):
def load(path, **kwargs):
with open(path, 'rb'):
return pickle.load(path, kwargs)

# This class converts a financial optimization problem to a standard linear optimization.
class LinearProgram(dict):
def __init__(self, *args, **kwargs):
super(LinearProgram, self).__init__(*args, **kwargs)

# Ensure 'params' exists in self
if 'params' not in self:
self['params'] = {} # Initialize it if missing

# Avoid KeyError by providing a default solver name
self.solver = self['params'].get('solver_name', 'cvxopt')

self['P'] = None # No quadratic term for linear programs
self['q'] = None
self['G'] = None
self['h'] = None
self['A'] = None
self['b'] = None
self['lb'] = None
self['ub'] = None

# Ensure constraints and variables lists exist
self.constraints = []
self.variables = []

def set_objective(self, q) -> None:
"""Set the linear objective function coefficients."""
self.q = q

def add_constraint(self, A: np.ndarray, b: np.ndarray, sense: str) -> None:
"""Add a constraint to the linear program."""
if self.get('A') is None:
self['A'] = A
self['b'] = b
else:
self['A'] = np.vstack([self['A'], A])
self['b'] = np.hstack([self['b'], b])

def add_variable(self, name: str, lb: float = 0, ub: float = None) -> None:
"""Add a variable to the linear program."""
if self.get('lb') is None:
self['lb'] = np.array([lb])
else:
self['lb'] = np.append(self['lb'], lb)

if self.get('ub') is None:
self['ub'] = np.array([ub if ub is not None else float('inf')])
else:
self['ub'] = np.append(self['ub'], ub if ub is not None else float('inf'))

def linearize_constraints(self, x_init: np.ndarray) -> None:
# Dimensions
n = len(self.get('q'))
m = 0 if self.get('G') is None else self.get('G').shape[0]

# Objective (for linear programs, it's just the linear term, no quadratic part)
P = np.zeros((n, n)) if self.get('P') is None else self['P']
q = np.pad(self['q'], (0, n)) if self.get('q') is not None else np.zeros(n)

# Inequality constraints
G = np.zeros(shape=(m + n, n))
if self.get('G') is not None:
G[0:m, :] = self.get('G')
G[m:(m + n), :] = np.eye(n)
h = self.get('h') if self.get('h') is not None else np.empty(shape=(0,))
h = np.append(h, np.append(x_init, np.zeros(n)))

# Equality constraints (if any)
A = np.pad(self['A'], [(0, 0), (0, n)]) if self.get('A') is not None else None

lb = np.pad(self['lb'], (0, n)) if self.get('lb') is not None else None
ub = np.pad(self['ub'], (0, n), constant_values=float('inf')) if self.get('ub') is not None else None

# Update the dictionary with the new linearized values
self.update({'P': P,
'q': q,
'G': G,
'h': h,
'A': A,
'lb': lb,
'ub': ub})

def is_feasible(self) -> bool:
problem = qpsolvers.Problem(P=np.zeros(self.get('P').shape),
q=np.zeros(self.get('P').shape[0]),
G=self.get('G'),
h=self.get('h'),
A=self.get('A'),
b=self.get('b'),
lb=self.get('lb'),
ub=self.get('ub'))

# Convert to sparse matrices for best performance
if self.solver in SPARSE_SOLVERS:
if self['params'].get('sparse'):
if problem.P is not None:
problem.P = scipy.sparse.csc_matrix(problem.P)
if problem.A is not None:
problem.A = scipy.sparse.csc_matrix(problem.A)
if problem.G is not None:
problem.G = scipy.sparse.csc_matrix(problem.G)
solution = qpsolvers.solve_problem(problem=problem,
solver=self.solver,
initvals=self.get('x0'),
verbose=False)
return solution.found

def solve(self) -> None:
# Ensure the dimensions of matrices are valid
P = self.get('P')
if P is not None and not isPD(P):
self['P'] = nearestPD(P)

problem = qpsolvers.Problem(P=self.get('P'),
q=self.get('q'),
G=self.get('G'),
h=self.get('h'),
A=self.get('A'),
b=self.get('b'),
lb=self.get('lb'),
ub=self.get('ub'))

# Convert to sparse matrices for best performance
if self.solver in SPARSE_SOLVERS:
if self['params'].get('sparse'):
if problem.P is not None:
problem.P = scipy.sparse.csc_matrix(problem.P)
if problem.A is not None:
problem.A = scipy.sparse.csc_matrix(problem.A)
if problem.G is not None:
problem.G = scipy.sparse.csc_matrix(problem.G)

solution = qpsolvers.solve_problem(problem=problem,
solver=self.solver,
initvals=self.get('x0'),
verbose=False)
self['solution'] = solution
return None

# 0.5 * x' * P * x + q' * x + const
def objective_value(self, x: np.ndarray, with_const: bool = True) -> float:
const = 0 if self.get('constant') is None or not with_const else self['constant']
return (0.5 * (x @ self.get('P') @ x) + self.get('q') @ x).item() + const

def serialize(self, path, **kwargs):
with open(path, 'wb') as f:
pickle.dump(self, f, kwargs)

@staticmethod
def load(path, **kwargs):
with open(path, 'rb') as f:
return pickle.load(f, kwargs)

53 changes: 53 additions & 0 deletions src/test_linear_program.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import unittest
import numpy as np
from qp_problems import LinearProgram

class TestLinearProgram(unittest.TestCase):
def setUp(self):
"""Initialize a LinearProgram instance before each test."""
self.lp = LinearProgram(params={'solver_name': 'some_solver'})

def test_set_objective(self):
"""Test setting the objective function."""
q = np.array([1, 2])
self.lp.set_objective(q)
self.assertTrue(hasattr(self.lp, 'q'))
np.testing.assert_array_equal(self.lp.q, q)

def test_add_variables(self):
"""Test adding variables."""
self.lp.add_variable('x1', 0, 10)
self.lp.add_variable('x2', 0, 5)
self.assertEqual(len(self.lp.variables), 2)

def test_add_constraints(self):
"""Test adding constraints."""
A = np.array([[1, 1], [2, 1]])
b = np.array([5, 8])
self.lp.add_constraint(A, b, '<=')
self.assertEqual(len(self.lp.constraints), 2)

def test_solve(self):
"""Test solving the linear program."""
self.lp.set_objective(np.array([1, 2]))
self.lp.add_variable('x1', 0, 10)
self.lp.add_variable('x2', 0, 5)
self.lp.add_constraint(np.array([[1, 1], [2, 1]]), np.array([5, 8]), '<=')

result = self.lp.solve()
self.assertIsNotNone(result)
self.assertTrue(hasattr(self.lp, 'solution'))

def test_objective_value(self):
"""Test retrieving the objective value after solving."""
self.lp.set_objective(np.array([1, 2]))
self.lp.add_variable('x1', 0, 10)
self.lp.add_variable('x2', 0, 5)
self.lp.add_constraint(np.array([[1, 1], [2, 1]]), np.array([5, 8]), '<=')
self.lp.solve()

obj_val = self.lp.objective_value()
self.assertIsInstance(obj_val, (int, float))

if __name__ == '__main__':
unittest.main()