Skip to content
238 changes: 195 additions & 43 deletions optimism/FunctionSpace.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
from jax.scipy.linalg import solve
from jaxtyping import Array, Float
from optimism import Interpolants
from optimism import Mesh
from optimism import QuadratureRule
from typing import Tuple
import equinox as eqx
import jax
import jax.numpy as np
from collections import namedtuple
import numpy as onp

import jax
import jax.numpy as np
from jax.scipy.linalg import solve

class EssentialBC(eqx.Module):
nodeSet: str
component: int
from optimism import Interpolants
from optimism import Mesh


class FunctionSpace(eqx.Module):
FunctionSpace = namedtuple('FunctionSpace', ['shapes', 'vols', 'shapeGrads', 'mesh', 'quadratureRule', 'isAxisymmetric'])
FunctionSpace.__doc__ = \
"""Data needed for calculus on functions in the discrete function space.

In describing the shape of the attributes, ``ne`` is the number of
Expand All @@ -37,12 +32,8 @@ class FunctionSpace(eqx.Module):
isAxisymmetric: boolean indicating if the function space data are
axisymmetric.
"""
shapes: Float[Array, "ne nqpe nn"]
vols: Float[Array, "ne nqpe"]
shapeGrads: Float[Array, "ne nqpe nn nd"]
mesh: Mesh.Mesh
quadratureRule: QuadratureRule.QuadratureRule
isAxisymmetric: bool

EssentialBC = namedtuple('EssentialBC', ['nodeSet', 'component'])


def construct_function_space(mesh, quadratureRule, mode2D='cartesian'):
Expand Down Expand Up @@ -363,42 +354,49 @@ def integrate_function_on_edges(functionSpace, func, U, quadRule, edges):
return np.sum(integrate_on_edges(functionSpace, func, U, quadRule, edges))


class DofManager(eqx.Module):
# TODO get type hints below correct
# TODO this one could be moved to jax types if we move towards
# TODO jit safe preconditioners/solvers
fieldShape: Tuple[int, int]
isBc: any
isUnknown: any
ids: any
unknownIndices: any
bcIndices: any
dofToUnknown: any
HessRowCoords: any
HessColCoords: any
hessian_bc_mask: any
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you started making changes to FunctionSpace.py before some newer changes were made to make it an Equinox module. Can you keep the Equinox module implementations of FunctionSpace and DofManager classes and implement your DOFManagerMPC class as a module? Let me know if you want my help.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ryan,
I updated the DofManagerMPC class using the updated FunctionSpace with the equinox module and pushed it to the forked repository.

def create_nodeset_layers(mesh):
coords = mesh.coords
tol = 1e-8
# Create unique layers of nodesets along the y-direction
unique_layers = sorted(onp.unique(coords[:,1]))
Ny = int(input("Enter the expected number of nodeset layers in y-direction: "))
assert len(unique_layers) == Ny, f"ERROR - Expected {Ny} layers, but found {len(unique_layers)}."

layer_rows = []

for i, y_val in enumerate(unique_layers):
nodes_in_layer = onp.flatnonzero(onp.abs(coords[:, 1] - y_val) < tol)
layer_rows.append(nodes_in_layer)

max_nodes_per_layer = max(len(row) for row in layer_rows)
y_layers = -1 * np.ones((len(layer_rows), max_nodes_per_layer), dtype=int) # Initialize with -1

for i, row in enumerate(layer_rows):
y_layers = y_layers.at[i, :len(row)].set(row) # Fill each row with nodes from the layer

# # For debugging
# print("Layers in y-direction: ", y_layers)
return y_layers

class DofManager:
def __init__(self, functionSpace, dim, EssentialBCs):
self.fieldShape = Mesh.num_nodes(functionSpace.mesh), dim
isBc = onp.full(self.fieldShape, False, dtype=bool)
self.isBc = onp.full(self.fieldShape, False, dtype=bool)
for ebc in EssentialBCs:
isBc[functionSpace.mesh.nodeSets[ebc.nodeSet], ebc.component] = True
self.isBc = isBc
self.isBc[functionSpace.mesh.nodeSets[ebc.nodeSet], ebc.component] = True
self.isUnknown = ~self.isBc

self.ids = onp.arange(self.isBc.size).reshape(self.fieldShape)
self.ids = np.arange(self.isBc.size).reshape(self.fieldShape)

self.unknownIndices = self.ids[self.isUnknown]
self.bcIndices = self.ids[self.isBc]

ones = onp.ones(self.isBc.size, dtype=int) * -1
dofToUnknown = ones
dofToUnknown[self.unknownIndices] = onp.arange(self.unknownIndices.size)
self.dofToUnknown = dofToUnknown
ones = np.ones(self.isBc.size, dtype=int) * -1
self.dofToUnknown = ones.at[self.unknownIndices].set(np.arange(self.unknownIndices.size))

self.HessRowCoords, self.HessColCoords = self._make_hessian_coordinates(functionSpace.mesh.conns)

self.hessian_bc_mask = self._make_hessian_bc_mask(onp.array(functionSpace.mesh.conns))
self.hessian_bc_mask = self._make_hessian_bc_mask(functionSpace.mesh.conns)


def get_bc_size(self):
Expand Down Expand Up @@ -450,7 +448,7 @@ def _make_hessian_coordinates(self, conns):
rowCoords[rangeBegin:rangeEnd] = elHessCoords.ravel()
colCoords[rangeBegin:rangeEnd] = elHessCoords.T.ravel()

rangeBegin += onp.square(nElUnknowns[e])
rangeBegin += np.square(nElUnknowns[e])
return rowCoords, colCoords


Expand All @@ -466,3 +464,157 @@ def _make_hessian_bc_mask(self, conns):
hessian_bc_mask[e,eFlag,:] = False
hessian_bc_mask[e,:,eFlag] = False
return hessian_bc_mask

# Different class for Multi-Point Constrained Problem
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Different class for Multi-Point Constrained Problem
# DOF Manager for Multi-Point Constrained Problem

class DofManagerMPC:
def __init__(self, functionSpace, dim, EssentialBCs, mesh):
self.fieldShape = Mesh.num_nodes(functionSpace.mesh), dim
self.isBc = onp.full(self.fieldShape, False, dtype=bool)
for ebc in EssentialBCs:
self.isBc[functionSpace.mesh.nodeSets[ebc.nodeSet], ebc.component] = True
self.isUnknown = ~self.isBc

self.ids = np.arange(self.isBc.size).reshape(self.fieldShape)

# Create layers and assign master and slave rows
self.layers = create_nodeset_layers(mesh)
print("These are the layers created for the mesh: ", self.layers)
self.master_layer = int(input("Choose the row index for master nodes: "))
self.slave_layer = int(input("Choose the row index for slave nodes: "))

# Generate master and slave arrays
self.master_array = self._create_layer_array(self.master_layer)
self.slave_array = self._create_layer_array(self.slave_layer)

# Create DOF mappings for unknowns
self.unknownIndices = self.ids[self.isUnknown]
self.bcIndices = self.ids[self.isBc]

ones = np.ones(self.isBc.size, dtype=int) * -1
self.dofToUnknown = ones.at[self.unknownIndices].set(np.arange(self.unknownIndices.size))

# Compute Hessian-related data
self.HessRowCoords, self.HessColCoords = self._make_hessian_coordinates(functionSpace.mesh.conns)
self.hessian_bc_mask = self._make_hessian_bc_mask(functionSpace.mesh.conns)

def get_bc_size(self):
return np.sum(self.isBc).item()

def get_unknown_size(self):
return np.sum(self.isUnknown).item()

def create_field(self, Uu, Ubc=0.0):
U = np.zeros(self.isBc.shape).at[self.isBc].set(Ubc)
return U.at[self.isUnknown].set(Uu)

def get_bc_values(self, U):
return U[self.isBc]

def get_unknown_values(self, U):
return U[self.isUnknown]

def get_master_values(self, U):
return U[self.master_array[:, 1:]]

def get_slave_values(self, U):
return U[self.slave_array[:, 1:]]

def _create_layer_array(self, layer_index):
"""Creates an array for the specified layer (master or slave) with DOF mappings."""
layer_nodes = self.layers[layer_index]
layer_array = []
for node in layer_nodes:
node_dofs = self.ids[node]
layer_array.append([node, *node_dofs])
return onp.array(layer_array, dtype=int)

def _make_hessian_coordinates(self, conns):
"""Creates row and column coordinates for the Hessian, considering master and slave nodes."""
nElUnknowns = onp.zeros(conns.shape[0], dtype=int)
nHessianEntries = 0
for e, eNodes in enumerate(conns):
elUnknownFlags = self.isUnknown[eNodes, :].ravel()
nElUnknowns[e] = onp.sum(elUnknownFlags)

# Include master and slave nodes in the size calculation
eNodes_list = onp.asarray(eNodes).tolist()
elMasterNodes = set(eNodes_list).intersection(set(self.master_array[:, 0]))
elSlaveNodes = set(eNodes_list).intersection(set(self.slave_array[:, 0]))

nElMasters = sum(len(self.master_array[onp.where(self.master_array[:, 0] == node)[0], 1:].ravel()) for node in elMasterNodes)
nElSlaves = sum(len(self.slave_array[onp.where(self.slave_array[:, 0] == node)[0], 1:].ravel()) for node in elSlaveNodes)

totalDOFs = nElUnknowns[e] + nElMasters + nElSlaves
nHessianEntries += totalDOFs ** 2

# Allocate sufficient space for Hessian coordinates
rowCoords = onp.zeros(nHessianEntries, dtype=int)
colCoords = onp.zeros(nHessianEntries, dtype=int)
rangeBegin = 0

for e, eNodes in enumerate(conns):
eNodes_list = onp.asarray(eNodes).tolist()
elDofs = self.ids[eNodes, :]
elUnknownFlags = self.isUnknown[eNodes, :]
elUnknowns = self.dofToUnknown[elDofs[elUnknownFlags]]

# Identify master and slave DOFs for the element
elMasterNodes = set(eNodes_list).intersection(set(self.master_array[:, 0]))
elSlaveNodes = set(eNodes_list).intersection(set(self.slave_array[:, 0]))

elMasterDofs = []
for node in elMasterNodes:
master_idx = onp.where(self.master_array[:, 0] == node)[0]
elMasterDofs.extend(self.master_array[master_idx, 1:].ravel())

elSlaveDofs = []
for node in elSlaveNodes:
slave_idx = onp.where(self.slave_array[:, 0] == node)[0]
elSlaveDofs.extend(self.slave_array[slave_idx, 1:].ravel())

# Combine unknowns, master, and slave DOFs
elAllUnknowns = onp.concatenate([elUnknowns, elMasterDofs, elSlaveDofs])
elHessCoords = onp.tile(elAllUnknowns, (len(elAllUnknowns), 1))

nElHessianEntries = len(elAllUnknowns) ** 2
rangeEnd = rangeBegin + nElHessianEntries

# Assign values to the Hessian coordinate arrays
rowCoords[rangeBegin:rangeEnd] = elHessCoords.ravel()
colCoords[rangeBegin:rangeEnd] = elHessCoords.T.ravel()

rangeBegin += nElHessianEntries

return rowCoords, colCoords


def _make_hessian_bc_mask(self, conns):
"""Creates a mask for BCs in the Hessian, considering master and slave nodes."""
nElements, nNodesPerElement = conns.shape
nFields = self.ids.shape[1]
nDofPerElement = nNodesPerElement * nFields

hessian_bc_mask = onp.full((nElements, nDofPerElement, nDofPerElement), True, dtype=bool)
for e, eNodes in enumerate(conns):
# Get boundary condition flags for all DOFs
eFlag = self.isBc[eNodes, :].ravel()

# Identify master and slave nodes
eNodes_list = onp.asarray(eNodes).tolist()
masterFlag = onp.isin(eNodes_list, self.master_array[:, 0])
slaveFlag = onp.isin(eNodes_list, self.slave_array[:, 0])

# Expand master and slave flags to match the DOF shape
masterFlag_expanded = onp.repeat(masterFlag, nFields)
slaveFlag_expanded = onp.repeat(slaveFlag, nFields)

# Combine flags
combinedFlag = eFlag | masterFlag_expanded | slaveFlag_expanded

# Update Hessian mask
hessian_bc_mask[e, combinedFlag, :] = False
hessian_bc_mask[e, :, combinedFlag] = False

return hessian_bc_mask


74 changes: 73 additions & 1 deletion optimism/Objective.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,76 @@ def get_value(self, x):
def get_residual(self, x):
return self.gradient(self.scaling * x)



class ObjectiveMPC(Objective):
def __init__(self, f, x, p, dofManagerMPC, precondStrategy=None):
"""
Initialize the Objective class for MPC with full DOFs and master-slave constraints.

Parameters:
- f: Objective function.
- x: Initial guess for the primal DOF vector.
- p: Parameters for the objective function.
- dofManagerMPC: Instance of DofManagerMPC with master-slave relationships.
- precondStrategy: Preconditioning strategy (optional).
"""
super().__init__(f, x, p, precondStrategy)
self.dofManagerMPC = dofManagerMPC
self.master_dofs = dofManagerMPC.master_array[:, 1:]
self.slave_dofs = dofManagerMPC.slave_array[:, 1:]
self.slave_to_master_map = self.create_slave_to_master_map()

def create_slave_to_master_map(self):
"""
Create the mapping to express slave DOFs in terms of master DOFs.
Returns:
- C: Constraint matrix (2D array).
- d: Constant vector (1D array).
"""
num_slaves = len(self.slave_dofs)
num_masters = len(self.master_dofs)

# Initialize the constraint matrix with the correct dimensions
C = np.zeros((num_slaves, num_masters)) # Replace with actual mapping logic

# Initialize the constant vector
d = np.zeros(num_slaves) # Replace with actual constant values if needed

return C, d


def value(self, x_full):
# Ensure the constraints are satisfied
x_full = self.enforce_constraints(x_full)
return super().value(x_full)

def gradient(self, x_full):
x_full = self.enforce_constraints(x_full)
grad_full = super().gradient(x_full)
return grad_full

def hessian_vec(self, x_full, vx_full):
x_full = self.enforce_constraints(x_full)
hess_vec_full = super().hessian_vec(x_full, vx_full)
return hess_vec_full

def enforce_constraints(self, x_full):
"""
Enforce the master-slave relationship in the full DOF vector.
"""
if self.slave_to_master_map is not None:
C, d = self.slave_to_master_map

# # Debug shapes
# print("Shape of C:", C.shape)
# print("Shape of x_full[self.master_dofs]:", x_full[self.master_dofs].shape)
# print("Shape of d:", d.shape)

# Ensure broadcasting compatibility
slave_values = C @ x_full[self.master_dofs] + d[:, None]
# print("Shape of slave_values:", slave_values.shape)

x_full = x_full.at[self.slave_dofs].set(slave_values)
return x_full


Loading
Loading