From 503e37366d33824e798b59fb35952c44b435851b Mon Sep 17 00:00:00 2001 From: Alexandru Fikl Date: Sat, 20 Sep 2025 20:00:46 +0300 Subject: [PATCH] Add Elasticity/Stokes using biharmonic/Laplace kernels --- doc/conf.py | 3 + doc/symbolic.rst | 17 + pyproject.toml | 2 + pytential/qbx/__init__.py | 5 +- pytential/symbolic/elasticity.py | 807 +++++++++++++++++ pytential/symbolic/mappers.py | 7 +- pytential/symbolic/pde/system_utils.py | 646 ++++++++++++++ pytential/symbolic/primitives.py | 29 +- pytential/symbolic/stokes.py | 1137 +++++++++++++++--------- pytential/utils.py | 93 +- test/extra_int_eq_data.py | 2 +- test/test_pde_system_utils.py | 243 +++++ test/test_stokes.py | 374 +++++++- test/test_tools.py | 20 + 14 files changed, 2900 insertions(+), 485 deletions(-) create mode 100644 pytential/symbolic/elasticity.py create mode 100644 pytential/symbolic/pde/system_utils.py create mode 100644 test/test_pde_system_utils.py diff --git a/doc/conf.py b/doc/conf.py index 7b6d3117d..246fe3ca5 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -80,6 +80,7 @@ # sumpy "ExpansionBase": "class:sumpy.expansion.ExpansionBase", "ExpansionFactoryBase": "class:sumpy.expansion.ExpansionFactoryBase", + "ExpressionKernel": "class:sumpy.kernel.ExpressionKernel", "Kernel": "class:sumpy.kernel.Kernel", "P2PBase": "class:sumpy.p2p.P2PBase", # pytential @@ -91,7 +92,9 @@ "KernelArgumentMapping": "obj:pytential.symbolic.primitives.KernelArgumentMapping", "Operand": "obj:pytential.symbolic.primitives.Operand", "QBXForcedLimit": "obj:pytential.symbolic.primitives.QBXForcedLimit", + "Side": "obj:pytential.symbolic.primitives.Side", "TargetOrDiscretization": "obj:pytential.target.TargetOrDiscretization", + "VectorExpression": "obj:pytential.symbolic.elasticity.VectorExpression", "pytential.symbolic.dof_desc.DOFDescriptorLike": "data:pytential.symbolic.dof_desc.DOFDescriptorLike", "sym.DOFDescriptor": "class:pytential.symbolic.dof_desc.DOFDescriptor", diff --git a/doc/symbolic.rst b/doc/symbolic.rst index 20366692d..e79f7844f 100644 --- a/doc/symbolic.rst +++ b/doc/symbolic.rst @@ -38,6 +38,11 @@ Maxwell's equations .. automodule:: pytential.symbolic.pde.maxwell +Elasticity equations +^^^^^^^^^^^^^^^^^^^^ + +.. automodule:: pytential.symbolic.elasticity + Stokes' equations ^^^^^^^^^^^^^^^^^ @@ -48,6 +53,11 @@ Scalar Beltrami equations .. automodule:: pytential.symbolic.pde.beltrami +Rewriting expressions with layer potentials +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. automodule:: pytential.symbolic.pde.system_utils + Internal affairs ---------------- @@ -62,3 +72,10 @@ How a symbolic operator gets executed .. automodule:: pytential.symbolic.execution .. automodule:: pytential.symbolic.compiler + +Rewriting expressions with ``IntG``\ s internals +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. automethod:: pytential.symbolic.pde.system_utils.convert_target_transformation_to_source +.. automethod:: pytential.symbolic.pde.system_utils.rewrite_int_g_using_base_kernel + diff --git a/pyproject.toml b/pyproject.toml index 82fde8ffa..23c88f335 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,6 +145,8 @@ extend-ignore-re = [ [tool.typos.default.extend-words] "nd" = "nd" +# short for multi-indices +mis = "mis" [tool.basedpyright] reportImplicitStringConcatenation = "none" diff --git a/pytential/qbx/__init__.py b/pytential/qbx/__init__.py index 842c14919..322d75330 100644 --- a/pytential/qbx/__init__.py +++ b/pytential/qbx/__init__.py @@ -1047,9 +1047,12 @@ def get_flat_strengths_from_densities( places, sym.weights_and_area_elements(places.ambient_dim, dofdesc=dofdesc), )(actx) + + from numbers import Number + density_dofarrays = [evaluate(density) for density in densities] for i, ary in enumerate(density_dofarrays): - if not isinstance(ary, DOFArray): + if not isinstance(ary, DOFArray | Number): raise ValueError( f"DOFArray expected for density '{densities[i]}', " f"{type(ary)} received instead") diff --git a/pytential/symbolic/elasticity.py b/pytential/symbolic/elasticity.py new file mode 100644 index 000000000..f33b88945 --- /dev/null +++ b/pytential/symbolic/elasticity.py @@ -0,0 +1,807 @@ +from __future__ import annotations + + +__copyright__ = """ +Copyright (C) 2017 Natalie Beams +Copyright (C) 2022 Isuru Fernando +""" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum +from functools import cached_property +from typing import TYPE_CHECKING, TypeAlias, cast + +import numpy as np +from typing_extensions import override + +from pytools import obj_array +from sumpy.kernel import ( + AxisSourceDerivative, + AxisTargetDerivative, + BiharmonicKernel, + ElasticityKernel, + Kernel, + LaplaceKernel, + StokesletKernel, + StressletKernel, + TargetPointMultiplier, +) +from sumpy.symbolic import SpatialConstant + +from pytential import sym +from pytential.symbolic.pde.system_utils import rewrite_using_base_kernel + + +if TYPE_CHECKING: + from pymbolic.typing import ArithmeticExpression + from pytools.obj_array import ObjectArray1D + + from pytential.symbolic.primitives import QBXForcedLimit + +__doc__ = """ +.. autoclass:: VectorExpression + +.. autoclass:: Method + :members: + +.. autofunction:: make_elasticity_wrapper +.. autofunction:: make_elasticity_double_layer_wrapper + +.. autoclass:: ElasticityWrapperBase +.. autoclass:: ElasticityDoubleLayerWrapperBase + +.. autoclass:: ElasticityWrapperNaive +.. autoclass:: ElasticityDoubleLayerWrapperNaive + +.. autoclass:: ElasticityWrapperBiharmonic +.. autoclass:: ElasticityDoubleLayerWrapperBiharmonic + +.. autoclass:: ElasticityWrapperYoshida +.. autoclass:: ElasticityDoubleLayerWrapperYoshida +""" + +VectorExpression: TypeAlias = "ObjectArray1D[ArithmeticExpression]" + + +# {{{ ElasiticityWrapper ABCs + +@dataclass +class ElasticityWrapperBase(ABC): + """Wrapper class for the :class:`~sumpy.kernel.ElasticityKernel` kernel. + + This class is meant to shield the user from the messiness of writing + out every term in the expansion of the double-indexed elasticity kernel + applied to a density vector. + + The :meth:`apply` function returns the integral expressions needed for + the vector displacement resulting from convolution with the vector density, + and is meant to work similarly to calling + :func:`~pytential.symbolic.primitives.S` (which is + :class:`~pytential.symbolic.primitives.IntG`). + + Similar functions are available for other useful things related to + the flow: :meth:`apply_derivative` (target derivative). + + .. autoattribute:: dim + .. autoattribute:: mu + .. autoattribute:: nu + + .. automethod:: apply + .. automethod:: apply_derivative + """ + + dim: int + """Ambient dimension of the representation.""" + mu: float | SpatialConstant + r"""Expression or value for the shear modulus :math:`\mu`.""" + nu: float | SpatialConstant + r"""Expression or value for Poisson's ratio :math:`\nu`.""" + + @abstractmethod + def apply(self, + density_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + """Symbolic expressions for the elasticity single-layer potential. + + This constructs an object array of symbolic expressions for the vector + resulting from integrating the dyadic elasticity kernel with the density + *density_vec_sym*. + + :arg density_vec_sym: a symbolic vector variable for the density vector. + :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on + to :class:`~pytential.symbolic.primitives.IntG`. + :arg extra_deriv_dirs: adds target derivatives to all the integral + objects with the given derivative axis. + """ + + def apply_derivative(self, + deriv_dir: int, + density_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + """Symbolic derivative of the elasticity single-layer kernel. + + This constructs an object array of symbolic expressions for the vector + resulting from integrating the *deriv_dir* target derivative of the + dyadic elasticity kernel with the density *density_vec_sym*. + + :arg deriv_dir: integer denoting the axis direction for the derivative, + i.e. the directions *x*, *y*, *z* correspond to the integers *0*, + *1*, *2*. + :arg density_vec_sym: a symbolic vector variable for the density vector. + :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on + to :class:`~pytential.symbolic.primitives.IntG`. + """ + return self.apply(density_vec_sym, qbx_forced_limit, (deriv_dir,)) + + +@dataclass +class ElasticityDoubleLayerWrapperBase(ABC): + """Wrapper class for the double layer of + :class:`~sumpy.kernel.ElasticityKernel` kernel. + + This class is meant to shield the user from the messiness of writing + out every term in the expansion of the triple-indexed elasticity double-layer + kernel applied to both a normal vector and the density vector. + + The :meth:`apply` function returns the integral expressions needed for + convolving the kernel with a vector density, and is meant to work + similarly to :func:`~pytential.symbolic.primitives.D` (which is + :class:`~pytential.symbolic.primitives.IntG`). + + Similar functions are available for other useful things related to + the flow: :meth:`apply_derivative` (target derivative). + + .. autoattribute:: dim + .. autoattribute:: mu + .. autoattribute:: nu + + .. automethod:: apply + .. automethod:: apply_derivative + """ + + dim: int + """Ambient dimension of the representation.""" + mu: float | SpatialConstant + r"""Expression or value for the shear modulus :math:`\mu`.""" + nu: float | SpatialConstant + r"""Expression or value for Poisson's ration :math:`\nu`.""" + + @abstractmethod + def apply(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + """Symbolic expressions for the elasticity double-layer potential. + + This constructs an object array of symbolic expressions for the vector + resulting from integrating the triadic kernel with the density + *density_vec_sym* and the source direction vector *dir_vec_sym*. + + :arg density_vec_sym: a symbolic vector variable for the density vector. + :arg dir_vec_sym: a symbolic vector variable for the direction vector. + :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on + to :class:`~pytential.symbolic.primitives.IntG`. + :arg extra_deriv_dirs: adds target derivatives to all the integral + objects with the given derivative axis. + """ + + def apply_derivative(self, + deriv_dir: int, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + """Symbolic derivative of the elasticity double-layer potential. + + This constructs an object array of symbolic expressions for the vector + resulting from integrating the *deriv_dir* target derivative of the + triadic elasticity kernel with variable *density_vec_sym* and the + source direction *dir_vec_sym*. + + :arg deriv_dir: integer denoting the axis direction for the derivative, + i.e. the directions *x*, *y*, *z* correspond to the integers *0*, + *1*, *2*. + :arg density_vec_sym: a symbolic vector variable for the density vector. + :arg dir_vec_sym: a symbolic vector variable for the normal direction. + :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on + to :class:`~pytential.symbolic.primitives.IntG`. + """ + return self.apply(density_vec_sym, dir_vec_sym, qbx_forced_limit, (deriv_dir,)) + +# }}} + + +# {{{ Naive and Biharmonic impl + +def _create_int_g(knl: Kernel, + deriv_dirs: tuple[int, ...], + density: ArithmeticExpression, + *, + qbx_forced_limit: QBXForcedLimit, + **kwargs: float | SpatialConstant) -> ArithmeticExpression: + for deriv_dir in deriv_dirs: + knl = AxisTargetDerivative(deriv_dir, knl) + + kernel_arg_names = { + karg.loopy_arg.name + for karg in (*knl.get_args(), *knl.get_source_args())} + + # When the kernel is Laplace, mu and nu are not kernel arguments + # Also when nu==0.5 is not a kernel argument to StokesletKernel + for var_name in ("mu", "nu"): + if var_name not in kernel_arg_names: + kwargs.pop(var_name) + + return sym.int_g_vec( + knl, density, qbx_forced_limit, + kernel_arguments=cast("dict[str, ArithmeticExpression]", kwargs), + ) + + +@dataclass +class _ElasticityWrapperNaiveOrBiharmonic: + dim: int + mu: float | SpatialConstant + nu: float | SpatialConstant + base_kernel: Kernel | None + + def __post_init__(self): + if not (self.dim == 3 or self.dim == 2): + raise ValueError( + f"unsupported dimension given to {type(self).__name__!r}: {self.dim}" + ) + + @cached_property + def kernel_dict(self) -> dict[tuple[int, int], Kernel]: + d: dict[tuple[int, int], Kernel] = {} + + # The dictionary allows us to exploit symmetry -- that :math:`T_{01}` + # is identical to :math:`T_{10}` -- and avoid creating multiple + # expansions for the same kernel in a different ordering. + for i in range(self.dim): + for j in range(i, self.dim): + if self.nu == 0.5: + d[i, j] = StokesletKernel(dim=self.dim, icomp=i, jcomp=j) + else: + d[i, j] = ElasticityKernel(dim=self.dim, icomp=i, jcomp=j) + d[j, i] = d[i, j] + + return d + + def _get_int_g(self, + idx: tuple[int, int], + density_sym: ArithmeticExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + deriv_dirs: tuple[int, ...]) -> ArithmeticExpression: + """ + Returns the convolution of the elasticity kernel given by `idx` + and its derivatives. + """ + res = _create_int_g( + self.kernel_dict[idx], + deriv_dirs, + density=density_sym*dir_vec_sym[idx[-1]], + qbx_forced_limit=qbx_forced_limit, + mu=self.mu, + nu=self.nu) + + return res / (2 * (1 - self.nu)) + + def apply(self, + density_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + + sym_expr = np.zeros((self.dim,), dtype=object) + dir_vec_sym = obj_array.new_1d([1] * self.dim) + + # For stokeslet, there's no direction vector involved + # passing a list of ones instead to remove its usage. + for comp in range(self.dim): + for i in range(self.dim): + sym_expr[comp] += self._get_int_g( + (comp, i), + density_vec_sym[i], + dir_vec_sym, + qbx_forced_limit, + deriv_dirs=extra_deriv_dirs) + + return np.array(rewrite_using_base_kernel( + sym_expr, + base_kernel=self.base_kernel)) + + +class ElasticityWrapperNaive(_ElasticityWrapperNaiveOrBiharmonic, + ElasticityWrapperBase): + r"""Elasticity single-layer wrapper based on the standard elasticity kernel. + + This representation uses the elasticity kernel denoted by + :attr:`Method.Naive`. + """ + + def __init__(self, + dim: int, + mu: float | SpatialConstant, + nu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, nu=nu, base_kernel=None) + ElasticityWrapperBase.__init__(self, dim=dim, mu=mu, nu=nu) + + +class ElasticityWrapperBiharmonic(_ElasticityWrapperNaiveOrBiharmonic, + ElasticityWrapperBase): + r"""Elasticity single-layer wrapper based on the biharmonic kernel. + + This representation uses the biharmonic kernel denoted by + :attr:`Method.Biharmonic`. + """ + + def __init__(self, + dim: int, + mu: float | SpatialConstant, + nu: float | SpatialConstant) -> None: + super().__init__( + dim=dim, mu=mu, nu=nu, + base_kernel=BiharmonicKernel(dim)) + ElasticityWrapperBase.__init__(self, dim=dim, mu=mu, nu=nu) + + +# }}} + + +# {{{ ElasticityDoubleLayerWrapper Naive and Biharmonic impl + +# NOTE: this is used in the `kernel_dict` to store the Laplace kernel +LAPLACIAN_INDEX = (-1, -1, -1) + + +@dataclass +class _ElasticityDoubleLayerWrapperNaiveOrBiharmonic: + dim: int + mu: float | SpatialConstant + nu: float | SpatialConstant + base_kernel: Kernel | None + + def __post_init__(self): + if not (self.dim == 3 or self.dim == 2): + raise ValueError("unsupported dimension given to " + f"ElasticityDoubleLayerWrapper: {self.dim}") + + @cached_property + def kernel_dict(self) -> dict[tuple[int, int, int], Kernel]: + d: dict[tuple[int, int, int], Kernel] = {} + + for i in range(self.dim): + for j in range(i, self.dim): + for k in range(j, self.dim): + d[i, j, k] = ( + StressletKernel(dim=self.dim, icomp=i, jcomp=j, kcomp=k)) + + # The dictionary allows us to exploit symmetry -- that + # :math:`T_{012}` is identical to :math:`T_{120}` -- and avoid creating + # multiple expansions for the same kernel in a different ordering. + for i in range(self.dim): + for j in range(self.dim): + for k in range(self.dim): + if (i, j, k) in d: + continue + s = cast("tuple[int, int, int]", tuple(sorted([i, j, k]))) + d[i, j, k] = d[s] + + # For elasticity (nu != 0.5), we need the laplacian of the + # BiharmonicKernel which is the LaplaceKernel. + if self.nu != 0.5: + d[LAPLACIAN_INDEX] = LaplaceKernel(self.dim) + + return d + + def _get_int_g(self, + idx: tuple[int, int, int], + density_sym: ArithmeticExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + deriv_dirs: tuple[int, ...]) -> ArithmeticExpression: + """ + Returns the convolution of the double layer of the elasticity kernel + given by `idx` and its derivatives. + """ + + nu = self.nu + kernel_indices = [idx] + dir_vec_indices = [idx[-1]] + coeffs = [1] + + kernel_indices = [idx, LAPLACIAN_INDEX, LAPLACIAN_INDEX, LAPLACIAN_INDEX] + dir_vec_indices = [idx[-1], idx[1], idx[0], idx[2]] + coeffs = [1, (1 - 2*nu)/self.dim, -(1 - 2*nu)/self.dim, -(1 - 2*nu)] + extra_deriv_dirs_vec = [[], [idx[0]], [idx[1]], [idx[2]]] + if idx[0] != idx[1]: + coeffs[-1] = 0 + + result = 0 + for kernel_idx, dir_vec_idx, coeff, extra_deriv_dirs in zip( + kernel_indices, + dir_vec_indices, + coeffs, + extra_deriv_dirs_vec, strict=True): + if coeff == 0: + continue + + knl = self.kernel_dict[kernel_idx] + result += coeff * _create_int_g( + knl, + (*deriv_dirs, *extra_deriv_dirs), + density=density_sym*dir_vec_sym[dir_vec_idx], + qbx_forced_limit=qbx_forced_limit, + mu=self.mu, + nu=self.nu) + + return result/(2*(1 - nu)) + + def apply(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + + sym_expr = np.zeros((self.dim,), dtype=object) + + for comp in range(self.dim): + for i in range(self.dim): + for j in range(self.dim): + sym_expr[comp] += self._get_int_g((comp, i, j), + density_vec_sym[i], dir_vec_sym, + qbx_forced_limit, deriv_dirs=extra_deriv_dirs) + + return np.array(rewrite_using_base_kernel( + sym_expr, + base_kernel=self.base_kernel)) + + def apply_single_and_double_layer( + self, + stokeslet_density_vec_sym: VectorExpression, + stresslet_density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + stokeslet_weight: ArithmeticExpression, + stresslet_weight: ArithmeticExpression, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + + stokeslet_obj = _ElasticityWrapperNaiveOrBiharmonic( + dim=self.dim, + mu=self.mu, nu=self.nu, + base_kernel=self.base_kernel) + + sym_expr = 0 + if stresslet_weight != 0: + sym_expr += stresslet_weight * self.apply( + stresslet_density_vec_sym, dir_vec_sym, + qbx_forced_limit, extra_deriv_dirs) + + if stokeslet_weight != 0: + sym_expr += stokeslet_weight * stokeslet_obj.apply( + stokeslet_density_vec_sym, + qbx_forced_limit, extra_deriv_dirs) + + return sym_expr + + +class ElasticityDoubleLayerWrapperNaive( + _ElasticityDoubleLayerWrapperNaiveOrBiharmonic, + ElasticityDoubleLayerWrapperBase): + r"""Elasticity double-layer wrapper based on the standard elasticity kernel. + + This representation uses the elasticity kernel denoted by + :attr:`Method.Naive`. + """ + + def __init__(self, + dim: int, + mu: float | SpatialConstant, + nu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, nu=nu, base_kernel=None) + ElasticityDoubleLayerWrapperBase.__init__( + self, dim=dim, mu=mu, nu=nu) + + +class ElasticityDoubleLayerWrapperBiharmonic( + _ElasticityDoubleLayerWrapperNaiveOrBiharmonic, + ElasticityDoubleLayerWrapperBase): + r"""Elasticity double-layer wrapper based on the biharmonic kernel. + + This representation uses the biharmonic kernel denoted by + :attr:`Method.Biharmonic`. + """ + + def __init__(self, + dim: int, + mu: float | SpatialConstant, + nu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, nu=nu, base_kernel=BiharmonicKernel(dim)) + ElasticityDoubleLayerWrapperBase.__init__( + self, dim=dim, mu=mu, nu=nu) + +# }}} + + +# {{{ dispatch function + +class Method(Enum): + """Method to represent the elasticity kernel.""" + Naive = 1 + """The naive representation is given by the standard tensor expression of + kernel using e.g. :class:`~sumpy.kernel.ElasticityKernel`. + """ + Laplace = 2 + """A representation of the elasticity kernel in terms of the Laplace kernel.""" + Biharmonic = 3 + """A representation of the elasticity kernel in terms of the biharmonic kernel. + """ + + +def make_elasticity_wrapper( + dim: int, + mu: float | str | SpatialConstant = "mu", + nu: float | str | SpatialConstant = "nu", + method: Method = Method.Naive) -> ElasticityWrapperBase: + """Creates an appropriate :class:`ElasticityWrapperBase` object. + + :param dim: ambient dimension of the representation. + :param mu: expression or value for the shear modulus. + :param nu: expression or value for Poisson's ratio. If this is set to + *0.5* exactly, an appropriate Stokes object is created instead. + :param method: method to use - defaults to the :attr:`Method.Naive`. + """ + if isinstance(mu, str): + mu = SpatialConstant(mu) + + if isinstance(nu, str): + nu = SpatialConstant(nu) + + if nu == 0.5: + from pytential.symbolic.stokes import make_stokeslet_wrapper + return make_stokeslet_wrapper(dim=dim, mu=mu, method=method) + + if method == Method.Naive: + return ElasticityWrapperNaive(dim=dim, mu=mu, nu=nu) + elif method == Method.Biharmonic: + return ElasticityWrapperBiharmonic(dim=dim, mu=mu, nu=nu) + elif method == Method.Laplace: + return ElasticityWrapperYoshida(dim=dim, mu=mu, nu=nu) + else: + raise ValueError(f"invalid 'method': {method}") + + +def make_elasticity_double_layer_wrapper( + dim: int, + mu: float | str | SpatialConstant = "mu", + nu: float | str | SpatialConstant = "nu", + method: Method = Method.Naive) -> ElasticityDoubleLayerWrapperBase: + """Creates an appropriate :class:`ElasticityDoubleLayerWrapperBase` object. + + :param dim: ambient dimension of the representation. + :param mu: expression or value for the shear modulus. + :param nu: expression or value for Poisson's ratio. If this is set to + *0.5* exactly, an appropriate Stokes object is created instead. + :param method: method to use - defaults to the :attr:`Method.Naive`. + """ + if isinstance(mu, str): + mu = SpatialConstant(mu) + + if isinstance(nu, str): + nu = SpatialConstant(nu) + + if nu == 0.5: + from pytential.symbolic.stokes import make_stresslet_wrapper + return make_stresslet_wrapper(dim, mu=mu, method=method) + + if method == Method.Naive: + return ElasticityDoubleLayerWrapperNaive(dim=dim, mu=mu, nu=nu) + elif method == Method.Biharmonic: + return ElasticityDoubleLayerWrapperBiharmonic(dim=dim, mu=mu, nu=nu) + elif method == Method.Laplace: + return ElasticityDoubleLayerWrapperYoshida(dim=dim, mu=mu, nu=nu) + else: + raise ValueError(f"invalid 'method': {method}") + + +# }}} + + +# {{{ Yoshida + +@dataclass +class ElasticityDoubleLayerWrapperYoshida(ElasticityDoubleLayerWrapperBase): + r"""Elasticity double-layer wrapper based on [Yoshida2001]_. + + This representation uses the Laplace kernel denoted by :attr:`Method.Laplace` + and can only be used in 3D. + + .. [Yoshida2001] K.-I. Yoshida, N. Nishimura, S. Kobayashi, + *Application of Fast Multipole Galerkin Boundary Integral Equation Method + to Elastostatic Crack Problems in 3D*, + International Journal for Numerical Methods in Engineering, Vol. 50, pp. + 525--547, 2001, + `DOI `__. + """ + + def __post_init__(self): + if not self.dim == 3: + raise ValueError("unsupported dimension given to " + f"ElasticityDoubleLayerWrapperYoshida: {self.dim}") + + @cached_property + def laplace_kernel(self): + return LaplaceKernel(dim=3) + + @override + def apply(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + return self.apply_single_and_double_layer( + obj_array.new_1d([0]*self.dim), + density_vec_sym, + dir_vec_sym, + qbx_forced_limit, + 0, 1, + extra_deriv_dirs) + + def apply_single_and_double_layer( + self, + stokeslet_density_vec_sym: VectorExpression, + stresslet_density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + stokeslet_weight: ArithmeticExpression, + stresslet_weight: ArithmeticExpression, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + + mu = self.mu + nu = self.nu + lam = 2*nu*mu/(1-2*nu) + stokeslet_weight *= -1 + + def C(i: int, j: int, k: int, l: int) -> ArithmeticExpression: # noqa: E741 + res = 0 + if i == j and k == l: + res += lam + if i == k and j == l: + res += mu + if i == l and j == k: + res += mu + + return res * stresslet_weight + + def add_extra_deriv_dirs(target_kernel: Kernel) -> Kernel: + for deriv_dir in extra_deriv_dirs: + target_kernel = AxisTargetDerivative(deriv_dir, target_kernel) + + return target_kernel + + def P(i: int, j: int, int_g: sym.IntG) -> ArithmeticExpression: + target_kernel = add_extra_deriv_dirs( + TargetPointMultiplier(j, AxisTargetDerivative(i, int_g.target_kernel)) + ) + + res = -int_g.copy(target_kernel=target_kernel) + if i == j: + res += (3 - 4*nu)*int_g.copy( + target_kernel=add_extra_deriv_dirs(int_g.target_kernel)) + + return res / (4*mu*(1 - nu)) + + def Q(i: int, int_g: sym.IntG) -> ArithmeticExpression: + target_kernel = add_extra_deriv_dirs( + AxisTargetDerivative(i, int_g.target_kernel) + ) + res = int_g.copy(target_kernel=target_kernel) + + return res / (4*mu*(1 - nu)) + + sym_expr = np.zeros((3,), dtype=object) + + source = sym.nodes(3).as_vector() + normal = dir_vec_sym + sigma = stresslet_density_vec_sym + + kernel = self.laplace_kernel + source_kernels: tuple[Kernel, ...] = tuple( + [AxisSourceDerivative(i, kernel) for i in range(3)] + + [kernel]) + + for i in range(3): + for k in range(3): + densities = obj_array.new_1d([0] * len(source_kernels)) + for l in range(3): # noqa: E741 + for j in range(3): + for m in range(3): + densities[l] += C(k, l, m, j)*normal[m]*sigma[j] + densities[3] += stokeslet_weight * stokeslet_density_vec_sym[k] + + int_g = sym.IntG( + target_kernel=kernel, + source_kernels=tuple(source_kernels), + densities=densities, + source=sym.DEFAULT_DOFDESC, + target=sym.DEFAULT_DOFDESC, + qbx_forced_limit=qbx_forced_limit) + sym_expr[i] += P(i, k, int_g) + + densities = obj_array.new_1d([0] * len(source_kernels)) + for k in range(3): + for m in range(3): + for j in range(3): + for l in range(3): # noqa: E741 + densities[l] += C(k, l, m, j)*normal[m]*sigma[j]*source[k] + if k == l: + densities[3] += C(k, l, m, j)*normal[m]*sigma[j] + densities[3] += ( + stokeslet_weight * source[k] * stokeslet_density_vec_sym[k]) + + int_g = sym.IntG( + target_kernel=kernel, + source_kernels=tuple(source_kernels), + densities=densities, + source=sym.DEFAULT_DOFDESC, + target=sym.DEFAULT_DOFDESC, + qbx_forced_limit=qbx_forced_limit) + sym_expr[i] += Q(i, int_g) + + return sym_expr + + +@dataclass +class ElasticityWrapperYoshida(ElasticityWrapperBase): + r"""Elasticity single-layer wrapper based on [Yoshida2001]_. + + This representation uses the Laplace kernel denoted by :attr:`Method.Laplace` + and can only be used in 3D. + """ + + def __post_init__(self): + if not self.dim == 3: + raise ValueError("unsupported dimension given to " + f"ElasticityDoubleLayerWrapperYoshida: {self.dim}") + + @cached_property + def stresslet(self): + return ElasticityDoubleLayerWrapperYoshida(3, self.mu, self.nu) + + @override + def apply(self, + density_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + return self.stresslet.apply_single_and_double_layer( + density_vec_sym, + obj_array.new_1d([0]*self.dim), + obj_array.new_1d([0]*self.dim), + qbx_forced_limit, + 1, 0, + extra_deriv_dirs) + +# }}} diff --git a/pytential/symbolic/mappers.py b/pytential/symbolic/mappers.py index b361a50b1..31f244a60 100644 --- a/pytential/symbolic/mappers.py +++ b/pytential/symbolic/mappers.py @@ -300,7 +300,12 @@ def map_common_subexpression(self, expr: p.CommonSubexpression): # {{{ FlattenMapper class FlattenMapper(FlattenMapperBase, IdentityMapper): - pass + def map_int_g(self, expr): + densities, kernel_arguments, changed = rec_int_g_arguments(self, expr) + if not changed: + return expr + + return replace(expr, densities=densities, kernel_arguments=kernel_arguments) def flatten(expr): diff --git a/pytential/symbolic/pde/system_utils.py b/pytential/symbolic/pde/system_utils.py new file mode 100644 index 000000000..481f84b8a --- /dev/null +++ b/pytential/symbolic/pde/system_utils.py @@ -0,0 +1,646 @@ +from __future__ import annotations + + +__copyright__ = "Copyright (C) 2020 Isuru Fernando" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +import logging +from dataclasses import dataclass, replace +from typing import TYPE_CHECKING, Any, cast + +import numpy as np + +import sumpy.symbolic as sp +from pytools import ( + generate_nonnegative_integer_tuples_summing_to_at_most as gnitstam, + memoize_on_first_arg, +) +from sumpy.kernel import ( + AxisSourceDerivative, + AxisTargetDerivative, + DirectionalSourceDerivative, + ExpressionKernel, + Kernel, + KernelWrapper, + TargetPointMultiplier, +) + +from pytential import sym +from pytential.symbolic.mappers import IdentityMapper, flatten +from pytential.utils import chop, solve_from_lu + + +if TYPE_CHECKING: + from collections.abc import Mapping, Sequence + + from pymbolic.typing import ArithmeticExpression + +logger = logging.getLogger(__name__) + +__all__ = ( + "rewrite_using_base_kernel", + "get_deriv_relation", + ) + +__doc__ = """ +.. autoexception:: RewriteFailedError +.. autofunction:: rewrite_using_base_kernel + +.. autoclass:: DerivRelation +.. autofunction:: get_deriv_relation +""" + + +# {{{ rewrite_using_base_kernel + +_NO_ARG_SENTINEL = object() + + +class RewriteFailedError(RuntimeError): + """An error raised by :func:`rewrite_using_base_kernel` when an expression + cannot be rewritten using the given kernel. + """ + + +def rewrite_using_base_kernel( + exprs: Sequence[ArithmeticExpression], + base_kernel: Kernel | None = _NO_ARG_SENTINEL) -> list[ArithmeticExpression]: + """ + Rewrites a list of expressions with :class:`~pytential.symbolic.primitives.IntG` + objects using *base_kernel*. + + For example, if *base_kernel* is the biharmonic kernel, and a Laplace kernel + is encountered, this will (forcibly) rewrite the Laplace kernel in terms of + derivatives of the Biharmonic kernel. + + If *base_kernel* is *None*, the expression list is returned as is. + To perform these manipulation, we assume that potentials are smooth, i.e. that + Schwarz's theorem holds. If applied to on-surface evaluation, then the layer + potentials to which this is applied must be one-sided limits, and the potential + must be non-singular (as might occur due to corners). + + If *base_kernel* is not given, the method will try to find a base kernel. + This is currently not implemented and will raise a ``NotImplementedError``. + + The routine will fail with a :exc:`RewriteFailedError` when *base_kernel* is + given, but method fails to find a way to rewrite. + """ + if base_kernel is None: + return list(exprs) + + if base_kernel == _NO_ARG_SENTINEL: + raise NotImplementedError + + mapper = RewriteUsingBaseKernelMapper(base_kernel) + return [cast("ArithmeticExpression", mapper(expr)) for expr in exprs] + + +class RewriteUsingBaseKernelMapper(IdentityMapper): + r"""Rewrites :class:`~pytential.symbolic.primitives.IntG`\ s using a base + kernel. + + First this method replaces ``IntG``s with + :class:`sumpy.kernel.AxisTargetDerivative` to ``IntG``s + :class:`sumpy.kernel.AxisSourceDerivative` and + ``IntG``s with :class:`sumpy.kernel.TargetPointMultiplier` to ``IntG``s + without them using :class:`sumpy.kernel.ExpressionKernel`. Then, it converts + them to the base kernel by finding a relationship between the derivatives. + + .. automethod:: __init__ + """ + + def __init__(self, base_kernel): + self.base_kernel = base_kernel + + def map_int_g(self, expr): + # Convert IntGs with TargetPointMultiplier/AxisTargetDerivative to a sum of + # IntGs without TargetPointMultipliers + new_int_gs = convert_target_transformation_to_source(expr) + # Convert IntGs with different kernels to expressions containing + # IntGs with base_kernel or its derivatives + return sum(rewrite_int_g_using_base_kernel(new_int_g, + self.base_kernel) for new_int_g in new_int_gs) + + +def _get_sympy_kernel_expression( + expr: ArithmeticExpression, + kernel_arguments: Mapping[str, Any]) -> sp.Basic: + """Convert a :mod:`pymbolic` expression to :mod:`sympy` expression + after substituting kernel arguments. + + For example, ``exp(I*k*r)/r`` with ``{k: 1}`` is converted to the sympy + expression ``exp(I*r)/r``. + """ + from pymbolic.mapper.substitutor import substitute + from sumpy.symbolic import PymbolicToSympyMapperWithSymbols + + pymbolic_expr = substitute(expr, kernel_arguments) + + res = PymbolicToSympyMapperWithSymbols()(pymbolic_expr) + return res + + +def _monom_to_expr(monom: Sequence[int], + variables: Sequence[sp.Basic | ArithmeticExpression] + ) -> sp.Basic | ArithmeticExpression: + """Convert a monomial to an expression using given variables. + + For example, ``[3, 2, 1]`` with variables ``[x, y, z]`` is converted to + ``x^3 y^2 z``. + """ + prod: ArithmeticExpression = 1 + for i, nrepeats in enumerate(monom): + for _ in range(nrepeats): + prod *= variables[i] + + return prod + + +def convert_target_transformation_to_source(int_g: sym.IntG) -> list[sym.IntG]: + r"""Convert an ``IntG`` with :class:`~sumpy.kernel.AxisTargetDerivative` + or :class:`~sumpy.kernel.TargetPointMultiplier` to a list + of ``IntG``\ s without them and only source dependent transformations. + + The sum of the list returned is equivalent to the input *int_g*. + + For example:: + + IntG(d/dx r, sigma) -> [IntG(d/dy r, -sigma)] + IntG(x*r, sigma) -> [IntG(r, sigma*y), IntG(r*(x -y), sigma)] + """ + import sympy + + from pymbolic.interop.sympy import SympyToPymbolicMapper + conv = SympyToPymbolicMapper() + + knl = int_g.target_kernel + if not knl.is_translation_invariant: + from warnings import warn + + warn(f"Translation variant kernel ({knl}) found.", stacklevel=2) + return [int_g] + + # we use a symbol for d = (x - y) + ds = sympy.symbols(f"d0:{knl.dim}") + sources = sympy.symbols(f"y0:{knl.dim}") + # instead of just x, we use x = (d + y) + targets = [d + source for d, source in zip(ds, sources, strict=True)] + orig_expr = sympy.Function("f")(*ds) # pylint: disable=not-callable + expr = orig_expr + found = False + while isinstance(knl, KernelWrapper): + if isinstance(knl, TargetPointMultiplier): + expr = targets[knl.axis] * expr + found = True + elif isinstance(knl, AxisTargetDerivative): + # sympy can't differentiate w.r.t target because + # it's not a symbol, but d/d(x) = d/d(d) + expr = expr.diff(ds[knl.axis]) + found = True + else: + from warnings import warn + + warn( + f"Unknown target kernel ({knl}) found. " + "Returning IntG expression unchanged.", stacklevel=2) + return [int_g] + knl = knl.inner_kernel + + if not found: + return [int_g] + + int_g = replace(int_g, target_kernel=knl) + + sources_pymbolic = sym.nodes(knl.dim).as_vector() + expr = expr.expand() + # Now the expr is an Add and looks like + # u_{d[0], d[1]}(d, y)*d[0]*y[1] + u(d, y) * d[1] + # or a single term like u(d, y) * d[1] + if isinstance(expr, sympy.Add): + kernel_terms = expr.args + else: + kernel_terms = [expr] + + result = [] + for kernel_term in kernel_terms: + deriv_factors = kernel_term.atoms(sympy.Derivative) + if len(deriv_factors) == 1: + # for eg: if kernel_terms is u_{d[0], d[1]}(d, y) * d[0] * y[1] + # deriv_factor is u_{d[0], d[1]} + (deriv_factor,) = deriv_factors + # eg: remaining_factors is d[0] * y[1] + remaining_factors = sympy.Poly(kernel_term.xreplace( + {deriv_factor: 1}), *ds, *sources) + # eg: derivatives is (d[0], 1), (d[1], 1) + derivatives = deriv_factor.args[1:] + elif len(deriv_factors) == 0: + # for eg: we have a term like u(d, y) * d[1] + # remaining_factors = d[1] + remaining_factors = sympy.Poly(kernel_term.xreplace( + {orig_expr: 1}), *ds, *sources) + derivatives = [] + else: + raise AssertionError("impossible condition") + + # apply the derivatives + new_source_kernels = [] + for source_kernel in int_g.source_kernels: + knl = source_kernel + for axis_var, nrepeats in derivatives: + axis = ds.index(axis_var) + for _ in range(nrepeats): + knl = AxisSourceDerivative(axis, knl) + new_source_kernels.append(knl) + new_int_g = replace(int_g, source_kernels=tuple(new_source_kernels)) + + (monom, coeff,) = remaining_factors.terms()[0] + # Now from d[0]*y[1], we separate the two terms + # d terms end up in the expression and y terms end up in the density + d_terms, y_terms = monom[:len(ds)], monom[len(ds):] + expr_multiplier = _monom_to_expr(d_terms, ds) + density_multiplier = _monom_to_expr(y_terms, sources_pymbolic) \ + * conv(coeff) + # since d/d(d) = - d/d(y), we multiply by -1 to get source derivatives + density_multiplier *= (-1)**int(sum(nrepeats for _, nrepeats in derivatives)) + new_int_gs = _multiply_int_g(new_int_g, sp.sympify(expr_multiplier), + density_multiplier) + result.extend(new_int_gs) + return result + + +def _multiply_int_g( + int_g: sym.IntG, + expr_multiplier: sp.Basic, + density_multiplier: ArithmeticExpression) -> list[sym.IntG]: + """Multiply the expression in ``IntG`` with the *expr_multiplier* + which is a symbolic (:mod:`sympy` or :mod:`symengine`) expression and + multiply the densities with *density_multiplier* which is a :mod:`pymbolic` + expression. + """ + from pymbolic import substitute + + result = [] + + base_kernel = int_g.target_kernel.get_base_kernel() + sym_d = sp.make_sym_vector("d", base_kernel.dim) + base_kernel_expr = _get_sympy_kernel_expression(base_kernel.expression, + int_g.kernel_arguments) + subst = {sym.var(f"d{i}"): sym.var("d")[i] for i in + range(base_kernel.dim)} + conv = sp.SympyToPymbolicMapper() + + if expr_multiplier == 1: + # if there's no expr_multiplier, only multiply the densities + return [replace( + int_g, + densities=tuple(density*density_multiplier for density in int_g.densities)) + ] + + for knl, density in zip(int_g.source_kernels, int_g.densities, strict=True): + if expr_multiplier == 1: + new_knl = knl.get_base_kernel() + else: + new_expr = conv(knl.postprocess_at_source(base_kernel_expr, sym_d) + * expr_multiplier) + new_expr = substitute(new_expr, subst) + new_knl = ExpressionKernel(knl.dim, flatten(new_expr), + knl.get_base_kernel().global_scaling_const, + knl.is_complex_valued) + result.append(replace( + int_g, + target_kernel=new_knl, + densities=(density*density_multiplier,), + source_kernels=(new_knl,) + )) + return result + + +def rewrite_int_g_using_base_kernel( + int_g: sym.IntG, base_kernel: ExpressionKernel) -> ArithmeticExpression: + r"""Rewrite an ``IntG`` to an expression with ``IntG``\ s having the + base kernel *base_kernel*. + """ + result: ArithmeticExpression = 0 + for knl, density in zip(int_g.source_kernels, int_g.densities, strict=True): + result += _rewrite_int_g_using_base_kernel( + replace(int_g, source_kernels=(knl,), densities=(density,)), + base_kernel) + + return result + + +def _rewrite_int_g_using_base_kernel( + int_g: sym.IntG, base_kernel: ExpressionKernel) -> ArithmeticExpression: + r"""Rewrites an ``IntG`` with only one source kernel to an expression with + ``IntG``\ s having the base kernel *base_kernel*. + """ + target_kernel = int_g.target_kernel.replace_base_kernel(base_kernel) + dim = target_kernel.dim + + result: ArithmeticExpression = 0 + + density, = int_g.densities + source_kernel, = int_g.source_kernels + deriv_relation = get_deriv_relation_kernel(source_kernel.get_base_kernel(), + base_kernel, hashable_kernel_arguments=( + sym.hashable_kernel_args(int_g.kernel_arguments))) + + const = deriv_relation.const + # NOTE: we set a dofdesc here to force the evaluation of this integral + # on the source instead of the target when using automatic tagging + # see :meth:`pytential.symbolic.mappers.LocationTagger._default_dofdesc` + if int_g.source.geometry is None: + dd = int_g.source.copy(geometry=sym.DEFAULT_SOURCE) + else: + dd = int_g.source + const *= sym.integral(dim, dim-1, density, dofdesc=dd) + + if const != 0 and target_kernel != target_kernel.get_base_kernel(): + # There might be some TargetPointMultipliers hanging around. + # FIXME: handle them instead of bailing out + return int_g + + if const != 0 and source_kernel != source_kernel.get_base_kernel(): + # We only handle the case where any source transformation is a derivative + # and the constant when applied becomes zero. We bail out if not + knl = source_kernel + while isinstance(knl, KernelWrapper): + if not isinstance(knl, + AxisSourceDerivative | DirectionalSourceDerivative): + return int_g + knl = knl.inner_kernel + const = 0 + + result += const + + new_kernel_args = filter_kernel_arguments([base_kernel], + int_g.kernel_arguments) + + for mi, c in deriv_relation.linear_combination: + knl = source_kernel.replace_base_kernel(base_kernel) + for d, val in enumerate(mi): + for _ in range(val): + knl = AxisSourceDerivative(d, knl) + c *= -1 + result += replace( + int_g, + source_kernels=(knl,), + target_kernel=target_kernel, + densities=(density * c,), + kernel_arguments=new_kernel_args) + + return result + + +@dataclass +class DerivRelation: + """A class to hold the relationship between a kernel and a base kernel. + + The relation is given by:: + + kernel = const + sum(deriv(base_kernel, mi) * coeff) + + .. autoattribute:: const + .. autoattribute:: linear_combination + """ + + const: ArithmeticExpression + """A constant to add to the combination.""" + linear_combination: Sequence[tuple[tuple[int, ...], ArithmeticExpression]] + """A list of pairs ``(mi, coeffs)``.""" + + +def get_deriv_relation( + kernels: Sequence[ExpressionKernel], + base_kernel: ExpressionKernel, + kernel_arguments: Mapping[str, Any], + tol: float = 1e-10, + order: int | None = None, + ) -> list[DerivRelation]: + r""" + Given a sequence of *kernels*, a *base_kernel* and an *order*, this + gives a relation between the *base_kernel* and each of the *kernels*. + For each kernel in *kernels* we have that the kernel is equal to the + linear combination of derivatives of *base_kernel* up to the order + *order* and a constant. i.e. + + .. math:: + + K = \sum_{m \in M(order)} \partial^m baseKernel \partial x^m + const. + + This is done by sampling the baseKernel and its derivatives at random + points to get a matrix ``A``, then sampling the kernel at the same + points to get a matrix ``b`` and solving for the system ``Ax = b`` using + an LU factorization of ``A``. The solution ``x`` is the vector of weights + in the linear combination. To represent a constant in the relation we + add a column of ones into ``A``. + + When *order* is not given, the algorithm starts with one and increases + the order upto the order of the PDE satisfied by the *base_kernel* until + a relation is found. + """ + res = [] + for knl in kernels: + res.append(get_deriv_relation_kernel(knl, base_kernel, + hashable_kernel_arguments=sym.hashable_kernel_args(kernel_arguments), + tol=tol, order=order)) + return res + + +@memoize_on_first_arg +def get_deriv_relation_kernel( + kernel: ExpressionKernel, + base_kernel: ExpressionKernel, + hashable_kernel_arguments: tuple[tuple[str, Any], ...], + tol: float = 1e-10, + order: int | None = None, + ) -> DerivRelation: + """Takes a *kernel* and a base_kernel* as input and re-writes the + *kernel* as a linear combination of derivatives of *base_kernel* up-to + order *order* and a constant. + + :param tol: an upper limit for small numbers that are replaced with zero + in the numerical procedure. + :returns: the constant and a list of (multi-index, coeff) to represent the + linear combination of derivatives as a *DerivRelation* object. + """ + kernel_arguments = dict(hashable_kernel_arguments) + lu, rand, mis = _get_base_kernel_matrix_lu_factorization( + base_kernel, + order=order, + hashable_kernel_arguments=hashable_kernel_arguments) + dim = base_kernel.dim + sym_vec = sp.make_sym_vector("d", dim) + sympy_conv = sp.SympyToPymbolicMapper() + + expr = _get_sympy_kernel_expression(kernel.expression, kernel_arguments) + vec = [] + for i in range(len(mis)): + vec.append(evalf(expr.xreplace(dict(zip(sym_vec, rand[:, i], strict=True))))) + vec = sp.Matrix(vec) + result = [] + const = 0 + logger.debug("%s = ", kernel) + + sol = solve_from_lu(lu.L, lu.U, lu.perm, vec, lambda expr: expr.expand()) + for i, coeff in enumerate(sol): + coeff = chop(coeff, tol) + if coeff == 0: + continue + if mis[i] != (-1,)*dim: + coeff *= _get_sympy_kernel_expression(kernel.global_scaling_const, + kernel_arguments) + coeff /= _get_sympy_kernel_expression(base_kernel.global_scaling_const, + kernel_arguments) + result.append((mis[i], sympy_conv(coeff))) + logger.debug(" + %s.diff(%s)*%s", base_kernel, mis[i], coeff) + else: + const = sympy_conv(coeff * _get_sympy_kernel_expression( + kernel.global_scaling_const, kernel_arguments)) + logger.debug(" + %s", const) + return DerivRelation(const, result) + + +@dataclass +class LUFactorization: + L: sp.Matrix + U: sp.Matrix + perm: Sequence[tuple[int, int]] + + +@memoize_on_first_arg +def _get_base_kernel_matrix_lu_factorization( + base_kernel: ExpressionKernel, + hashable_kernel_arguments: tuple[tuple[str, Any], ...], + order: int | None = None, + retries: int = 3, + ) -> tuple[LUFactorization, np.ndarray, list[tuple[int, ...]]]: + """ + Takes a *base_kernel* and samples the kernel and its derivatives upto + order *order*. + + :returns: a tuple with the LU factorization of the sampled matrix, + the sampled points, and the multi-indices corresponding to the + derivatives represented by the rows of the matrix. + """ + dim = base_kernel.dim + + pde = base_kernel.get_pde_as_diff_op() + if order is None: + order = pde.order + + if order > pde.order: + raise NotImplementedError("Computing derivative relation when " + "the base kernel's derivatives are linearly dependent has not" + "been implemented yet.") + + mis = sorted(gnitstam(order, dim), key=sum) + # (-1, -1, -1) represents a constant + # ((0,0,0) would be "function with no derivatives") + mis.append((-1,)*dim) + + if order == pde.order: + pde_mis = [ident.mi for eq in pde.eqs for ident in eq.keys()] + pde_mis = [mi for mi in pde_mis if sum(mi) == order] + logger.debug("Removing %s to avoid linear dependent mis", pde_mis[-1]) + mis.remove(pde_mis[-1]) + + rng = np.random.default_rng() + rand: np.ndarray = rng.integers(1, 10**15, size=(dim, len(mis))).astype(object) + for i in range(rand.shape[0]): + for j in range(rand.shape[1]): + rand[i, j] = sp.sympify(rand[i, j])/10**15 + sym_vec = sp.make_sym_vector("d", dim) + + base_expr = _get_sympy_kernel_expression(base_kernel.expression, + dict(hashable_kernel_arguments)) + + mat = [] + for rand_vec_idx in range(rand.shape[1]): + row = [] + for mi in mis[:-1]: + expr = base_expr + for var_idx, nderivs in enumerate(mi): + if nderivs == 0: + continue + expr = expr.diff(sym_vec[var_idx], nderivs) + replace_dict = dict(zip(sym_vec, rand[:, rand_vec_idx], strict=True)) + eval_expr = evalf(expr.xreplace(replace_dict)) + row.append(eval_expr) + row.append(1) + mat.append(row) + + sym_mat = sp.Matrix(mat) + failed = False + try: + L, U, perm = sym_mat.LUdecomposition() + except RewriteFailedError: + # symengine throws an error when rank deficient + # and sympy returns U with last row zero + failed = True + + if not sp.USE_SYMENGINE and all(expr == 0 for expr in U[-1, :]): + failed = True + + if failed: + if retries == 0: + # The derivatives of the base kernel are not linearly + # independent. + # TODO: Extract a linearly independent set and return them + raise NotImplementedError("Computing derivative relation when " + "the base kernel's derivatives are linearly dependent has not " + "been implemented yet.") + return _get_base_kernel_matrix_lu_factorization( + base_kernel, + hashable_kernel_arguments=hashable_kernel_arguments, + order=order, + retries=retries-1, + ) + + return LUFactorization(L, U, perm), rand, mis + + +def evalf(expr, prec=100): + """Evaluate an expression numerically using ``prec`` number of bits.""" + from sumpy.symbolic import USE_SYMENGINE + if USE_SYMENGINE: + return expr.n(prec=prec) + else: + import sympy + dps = int(sympy.log(2**prec, 10)) + return expr.n(n=dps) + + +def filter_kernel_arguments(knls, kernel_arguments): + """From a dictionary of kernel arguments, filter out arguments + that are not needed for the kernels given as a list and return a new + dictionary. + """ + kernel_arg_names = set() + + for kernel in knls: + for karg in (kernel.get_args() + kernel.get_source_args()): + kernel_arg_names.add(karg.loopy_arg.name) + + return {k: v for (k, v) in kernel_arguments.items() if k in kernel_arg_names} + +# }}} diff --git a/pytential/symbolic/primitives.py b/pytential/symbolic/primitives.py index fe4daff35..1b25d298f 100644 --- a/pytential/symbolic/primitives.py +++ b/pytential/symbolic/primitives.py @@ -65,7 +65,6 @@ ShapeT, from_numpy, ) -from sumpy.kernel import Kernel from sumpy.symbolic import SpatialConstant from pytential.symbolic.dof_desc import ( @@ -92,6 +91,7 @@ from pymbolic.mapper.stringifier import StringifyMapper from pymbolic.primitives import CommonSubexpression, Quotient from pytools import P + from sumpy.kernel import Kernel __doc__ = """ @@ -149,6 +149,8 @@ .. autoclass:: ArithmeticExpressionT +.. autoclass:: Side + .. autoclass:: QBXForcedLimit .. class:: P @@ -377,6 +379,18 @@ """ __all__ = ( + # re-exported from pymbolic + "cse", "make_sym_vector", "var", + + # re-exported from sumpy + "SpatialConstant", + + # re-exported from dof_desc + "DEFAULT_DOFDESC", "DEFAULT_SOURCE", "DEFAULT_TARGET", + "QBX_SOURCE_STAGE1", "QBX_SOURCE_STAGE2", "QBX_SOURCE_QUAD_STAGE2", + "GRANULARITY_NODE", "GRANULARITY_CENTER", "GRANULARITY_ELEMENT", + "DOFDescriptor", "DOFDescriptorLike", "as_dofdesc", + "Operand", "OperandTc", "Side", "QBXForcedLimit", "ArithmeticExpressionT", @@ -387,7 +401,7 @@ "ExpressionNode", "ErrorExpression", - "var", "SpatialConstant", "make_sym_mv", "make_sym_surface_mv", + "make_sym_mv", "make_sym_surface_mv", "real", "imag", "conj", "abs", "sqrt", @@ -425,11 +439,7 @@ "pretty", - "DEFAULT_SOURCE", "DEFAULT_TARGET", - "QBX_SOURCE_STAGE1", "QBX_SOURCE_STAGE2", "QBX_SOURCE_QUAD_STAGE2", - "GRANULARITY_NODE", "GRANULARITY_CENTER", "GRANULARITY_ELEMENT", - "DOFDescriptor", "DOFDescriptorLike", "as_dofdesc", - ) +) # {{{ helpers @@ -2324,6 +2334,8 @@ def Sp( "Choosing default 'avg'.", stacklevel=2) qbx_forced_limit = "avg" + from sumpy.kernel import Kernel + if ambient_dim is None and isinstance(kernel, Kernel): ambient_dim = kernel.dim @@ -2356,6 +2368,7 @@ def Spp( "Choosing default '+1'.", stacklevel=2) qbx_forced_limit = +1 + from sumpy.kernel import Kernel if ambient_dim is None and isinstance(kernel, Kernel): ambient_dim = kernel.dim @@ -2388,6 +2401,7 @@ def D( "Choosing default 'avg'.", stacklevel=2) qbx_forced_limit = "avg" + from sumpy.kernel import Kernel if ambient_dim is None and isinstance(kernel, Kernel): ambient_dim = kernel.dim @@ -2422,6 +2436,7 @@ def Dp( "Choosing default '+1'.", stacklevel=2) qbx_forced_limit = +1 + from sumpy.kernel import Kernel if ambient_dim is None and isinstance(kernel, Kernel): ambient_dim = kernel.dim diff --git a/pytential/symbolic/stokes.py b/pytential/symbolic/stokes.py index 4ae4cafbc..dc7fc4fe4 100644 --- a/pytential/symbolic/stokes.py +++ b/pytential/symbolic/stokes.py @@ -1,7 +1,10 @@ from __future__ import annotations -__copyright__ = "Copyright (C) 2017 Natalie Beams" +__copyright__ = """ +Copyright (C) 2017 Natalie Beams" +Copyright (C) 2022 Isuru Fernando +""" __license__ = """ Permission is hereby granted, free of charge, to any person obtaining a copy @@ -23,16 +26,57 @@ THE SOFTWARE. """ -import numpy as np +from abc import ABC, abstractmethod +from dataclasses import dataclass +from functools import cached_property +from typing import TYPE_CHECKING, Literal -from sumpy.kernel import LaplaceKernel, StokesletKernel, StressletKernel +import numpy as np +from typing_extensions import override + +from pytools import MovedFunctionDeprecationWrapper, obj_array +from sumpy.kernel import ( + AxisSourceDerivative, + AxisTargetDerivative, + BiharmonicKernel, + Kernel, + LaplaceKernel, + TargetPointMultiplier, +) +from sumpy.symbolic import SpatialConstant from pytential import sym +from pytential.symbolic.elasticity import ( + ElasticityDoubleLayerWrapperBase, + ElasticityWrapperBase, + Method, + VectorExpression, + _ElasticityDoubleLayerWrapperNaiveOrBiharmonic, + _ElasticityWrapperNaiveOrBiharmonic, +) +from pytential.symbolic.pde.system_utils import rewrite_using_base_kernel + +if TYPE_CHECKING: + from pymbolic.typing import ArithmeticExpression + + from pytential.symbolic.primitives import QBXForcedLimit, Side __doc__ = """ -.. autoclass:: StokesletWrapper -.. autoclass:: StressletWrapper +.. autofunction:: make_stokeslet_wrapper +.. autofunction:: make_stresslet_wrapper + +.. autoclass:: StokesletWrapperBase +.. autoclass:: StressletWrapperBase + +.. autoclass:: StokesletWrapperNaive +.. autoclass:: StressletWrapperNaive + +.. autoclass:: StokesletWrapperBiharmonic +.. autoclass:: StressletWrapperBiharmonic + +.. autoclass:: StokesletWrapperTornberg +.. autoclass:: StressletWrapperTornberg .. autoclass:: StokesOperator .. autoclass:: HsiaoKressExteriorStokesOperator @@ -40,37 +84,16 @@ """ -# {{{ StokesletWrapper +# {{{ StokesletWrapper/StressletWrapper base classes -class StokesletWrapper: +class StokesletWrapperBase(ElasticityWrapperBase, ABC): """Wrapper class for the :class:`~sumpy.kernel.StokesletKernel` kernel. - This class is meant to shield the user from the messiness of writing - out every term in the expansion of the double-indexed Stokeslet kernel - applied to the density vector. The object is created - to do some of the set-up and bookkeeping once, rather than every - time we want to create a symbolic expression based on the kernel -- say, - once when we solve for the density, and once when we want a symbolic - representation for the solution, for example. - - The :meth:`apply` function returns the integral expressions needed for - the vector velocity resulting from convolution with the vector density, - and is meant to work similarly to calling - :func:`~pytential.symbolic.primitives.S` (which is - :class:`~pytential.symbolic.primitives.IntG`). - - Similar functions are available for other useful things related to - the flow: :meth:`apply_pressure`, :meth:`apply_derivative` (target derivative), - :meth:`apply_stress` (applies symmetric viscous stress tensor in - the requested direction). - - .. attribute:: kernel_dict - - The dictionary allows us to exploit symmetry -- that - :math:`S_{01}` is identical to :math:`S_{10}` -- and avoid creating - multiple expansions for the same kernel in a different ordering. + In addition to the methods in + :class:`~pytential.symbolic.elasticity.ElasticityWrapperBase`, this class + also provides :meth:`apply_stress`, which applies symmetric viscous stress tensor + in the requested direction, and :meth:`apply_pressure`. - .. automethod:: __init__ .. automethod:: apply .. automethod:: apply_pressure .. automethod:: apply_derivative @@ -79,137 +102,110 @@ class StokesletWrapper: dim: int - def __init__(self, dim: int): - self.dim = dim + def __init__(self, dim: int, mu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, nu=0.5) + + def apply_pressure(self, + density_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> ArithmeticExpression: + """Symbolic expression for pressure field associated with the Stokeslet.""" + # NOTE: the pressure representation does not differ depending on the + # representation and is implemented in base class only + lknl = LaplaceKernel(dim=self.dim) + + sym_expr = 0 + for i in range(self.dim): + deriv_dirs = (*extra_deriv_dirs, i) + knl = lknl + for deriv_dir in deriv_dirs: + knl = AxisTargetDerivative(deriv_dir, knl) + sym_expr += sym.int_g_vec(knl, density_vec_sym[i], + qbx_forced_limit=qbx_forced_limit) + return sym_expr + + @abstractmethod + def apply_stress(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + r"""Symbolic expression for viscous stress applied to a direction. - if dim == 2: - self.kernel_dict = { - (2, 0): StokesletKernel(dim=2, icomp=0, jcomp=0), - (1, 1): StokesletKernel(dim=2, icomp=0, jcomp=1), - (0, 2): StokesletKernel(dim=2, icomp=1, jcomp=1) - } + Returns a vector of symbolic expressions for the force resulting + from the viscous stress - elif dim == 3: - self.kernel_dict = { - (2, 0, 0): StokesletKernel(dim=3, icomp=0, jcomp=0), - (1, 1, 0): StokesletKernel(dim=3, icomp=0, jcomp=1), - (1, 0, 1): StokesletKernel(dim=3, icomp=0, jcomp=2), - (0, 2, 0): StokesletKernel(dim=3, icomp=1, jcomp=1), - (0, 1, 1): StokesletKernel(dim=3, icomp=1, jcomp=2), - (0, 0, 2): StokesletKernel(dim=3, icomp=2, jcomp=2) - } + .. math:: - else: - raise ValueError("unsupported dimension given to StokesletWrapper") + -p \delta_{ij} + \mu (\nabla_i u_j + \nabla_j u_i) - def apply(self, density_vec_sym, mu_sym, qbx_forced_limit): - """Symbolic expressions for integrating Stokeslet kernel. + applied in the direction of *dir_vec_sym*. - Returns an object array of symbolic expressions for the vector - resulting from integrating the dyadic Stokeslet kernel with - variable *density_vec_sym*. + Note that this computation is very similar to computing + a double-layer potential with the Stresslet kernel in + :class:`StressletWrapperBase`. The difference is that here the direction + vector is applied at the target points, while in the Stresslet the + direction is applied at the source points. :arg density_vec_sym: a symbolic vector variable for the density vector. - :arg mu_sym: a symbolic variable for the viscosity. + :arg dir_vec_sym: a symbolic vector for the application direction. :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on to :class:`~pytential.symbolic.primitives.IntG`. """ - sym_expr = np.empty((self.dim,), dtype=object) - - for comp in range(self.dim): - - # Start variable count for kernel with 1 for the requested result - # component - base_count = np.zeros(self.dim, dtype=np.int32) - base_count[comp] += 1 - - for i in range(self.dim): - var_ctr = base_count.copy() - var_ctr[i] += 1 - ctr_key = tuple(var_ctr) - - if i < 1: - sym_expr[comp] = sym.int_g_vec( - self.kernel_dict[ctr_key], density_vec_sym[i], - qbx_forced_limit=qbx_forced_limit, mu=mu_sym) - else: - sym_expr[comp] = sym_expr[comp] + sym.int_g_vec( - self.kernel_dict[ctr_key], density_vec_sym[i], - qbx_forced_limit=qbx_forced_limit, mu=mu_sym) - - return sym_expr - - def apply_pressure(self, density_vec_sym, mu_sym, qbx_forced_limit): - """Symbolic expression for pressure field associated with the Stokeslet.""" - - from pytential.symbolic.mappers import DerivativeTaker - kernel = LaplaceKernel(dim=self.dim) - - for i in range(self.dim): +class StressletWrapperBase(ElasticityDoubleLayerWrapperBase, ABC): + """Wrapper class for the :class:`~sumpy.kernel.StressletKernel` kernel. - if i < 1: - sym_expr = DerivativeTaker(i).map_int_g( - sym.int_g_vec(kernel, density_vec_sym[i], - qbx_forced_limit=qbx_forced_limit)) - else: - sym_expr = sym_expr + (DerivativeTaker(i).map_int_g( - sym.int_g_vec(kernel, density_vec_sym[i], - qbx_forced_limit=qbx_forced_limit))) + In addition to the methods in + :class:`pytential.symbolic.elasticity.ElasticityDoubleLayerWrapperBase`, this + class also provides :meth:`apply_stress` which applies symmetric viscous stress + tensor in the requested direction and :meth:`apply_pressure`. - return sym_expr + .. automethod:: apply + .. automethod:: apply_pressure + .. automethod:: apply_derivative + .. automethod:: apply_stress + """ - def apply_derivative(self, deriv_dir, density_vec_sym, - mu_sym, qbx_forced_limit): - """Symbolic derivative of velocity from Stokeslet. + dim: int - Returns an object array of symbolic expressions for the vector - resulting from integrating the *deriv_dir* target derivative of the - dyadic Stokeslet kernel with variable *density_vec_sym*. + def __init__(self, dim: int, mu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, nu=0.5) - :arg deriv_dir: integer denoting the axis direction for the derivative. - :arg density_vec_sym: a symbolic vector variable for the density vector. - :arg mu_sym: a symbolic variable for the viscosity. - :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on - to :class:`~pytential.symbolic.primitives.IntG`. + def apply_pressure(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> ArithmeticExpression: + """Symbolic expression for pressure field associated with the Stresslet. """ + # NOTE: the pressure representation does not differ depending on the + # representation and is implemented in base class only - from pytential.symbolic.mappers import DerivativeTaker - - sym_expr = np.empty((self.dim,), dtype=object) + import itertools + lknl = LaplaceKernel(dim=self.dim) - for comp in range(self.dim): + factor = (2. * self.mu) - # Start variable count for kernel with 1 for the requested result - # component - base_count = np.zeros(self.dim, dtype=np.int32) - base_count[comp] += 1 + sym_expr = 0 - for i in range(self.dim): - var_ctr = base_count.copy() - var_ctr[i] += 1 - ctr_key = tuple(var_ctr) - - if i < 1: - sym_expr[comp] = DerivativeTaker(deriv_dir).map_int_g( - sym.int_g_vec(self.kernel_dict[ctr_key], - density_vec_sym[i], - qbx_forced_limit=qbx_forced_limit, - mu=mu_sym)) - - else: - sym_expr[comp] = sym_expr[comp] + DerivativeTaker( - deriv_dir).map_int_g( - sym.int_g_vec(self.kernel_dict[ctr_key], - density_vec_sym[i], - qbx_forced_limit=qbx_forced_limit, - mu=mu_sym)) + for i, j in itertools.product(range(self.dim), range(self.dim)): + deriv_dirs = (*extra_deriv_dirs, i, j) + knl = lknl + for deriv_dir in deriv_dirs: + knl = AxisTargetDerivative(deriv_dir, knl) + sym_expr += factor * sym.int_g_vec(knl, + density_vec_sym[i] * dir_vec_sym[j], + qbx_forced_limit=qbx_forced_limit) return sym_expr - def apply_stress(self, density_vec_sym, dir_vec_sym, - mu_sym, qbx_forced_limit): + @abstractmethod + def apply_stress(self, + density_vec_sym: VectorExpression, + normal_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: r"""Symbolic expression for viscous stress applied to a direction. Returns a vector of symbolic expressions for the force resulting @@ -221,301 +217,451 @@ def apply_stress(self, density_vec_sym, dir_vec_sym, applied in the direction of *dir_vec_sym*. - Note that this computation is very similar to computing - a double-layer potential with the Stresslet kernel in - :class:`StressletWrapper`. The difference is that here the direction - vector is applied at the target points, while in the Stresslet the - direction is applied at the source points. - :arg density_vec_sym: a symbolic vector variable for the density vector. + :arg normal_vec_sym: a symbolic vector variable for the normal vectors + (outward facing normals at source locations). :arg dir_vec_sym: a symbolic vector for the application direction. - :arg mu_sym: a symbolic variable for the viscosity. :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on to :class:`~pytential.symbolic.primitives.IntG`. """ + raise NotImplementedError - import itertools +# }}} - sym_expr = np.empty((self.dim,), dtype=object) - stresslet_obj = StressletWrapper(dim=self.dim) +# {{{ Stokeslet/StressletWrapper Naive and Biharmonic + +class _StokesletWrapperNaiveOrBiharmonic(_ElasticityWrapperNaiveOrBiharmonic, + StokesletWrapperBase): + def __init__(self, dim: int, mu: float | SpatialConstant, + base_kernel: Kernel | None) -> None: + super().__init__(dim=dim, mu=mu, nu=0.5, base_kernel=base_kernel) + StokesletWrapperBase.__init__(self, dim=dim, mu=mu) + + @override + def apply_pressure(self, + density_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> ArithmeticExpression: + sym_expr = super().apply_pressure(density_vec_sym, qbx_forced_limit, + extra_deriv_dirs=extra_deriv_dirs) + res, = rewrite_using_base_kernel([sym_expr], base_kernel=self.base_kernel) + return res + + @override + def apply_stress(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + + sym_expr = np.zeros((self.dim,), dtype=object) + stresslet_obj = _StressletWrapperNaiveOrBiharmonic( + dim=self.dim, + mu=self.mu, nu=0.5, + base_kernel=self.base_kernel) + + # For stokeslet, there's no direction vector involved + # passing a list of ones instead to remove its usage. for comp in range(self.dim): - - # Start variable count for kernel with 1 for the requested result - # component - base_count = np.zeros(self.dim, dtype=np.int32) - base_count[comp] += 1 - - for i, j in itertools.product(range(self.dim), range(self.dim)): - var_ctr = base_count.copy() - var_ctr[i] += 1 - var_ctr[j] += 1 - ctr_key = tuple(var_ctr) - - if i + j < 1: - sym_expr[comp] = dir_vec_sym[i] * sym.int_g_vec( - stresslet_obj.kernel_dict[ctr_key], - density_vec_sym[j], - qbx_forced_limit=qbx_forced_limit, mu=mu_sym) - - else: - sym_expr[comp] = sym_expr[comp] + dir_vec_sym[i] * sym.int_g_vec( - stresslet_obj.kernel_dict[ctr_key], - density_vec_sym[j], - qbx_forced_limit=qbx_forced_limit, - mu=mu_sym) + for i in range(self.dim): + for j in range(self.dim): + sym_expr[comp] += dir_vec_sym[i] * stresslet_obj._get_int_g( + (comp, i, j), + density_vec_sym[j], + obj_array.new_1d([1] * self.dim), + qbx_forced_limit, + deriv_dirs=()) return sym_expr -# }}} +class _StressletWrapperNaiveOrBiharmonic( + _ElasticityDoubleLayerWrapperNaiveOrBiharmonic, + StressletWrapperBase): + + def __init__(self, dim: int, mu: float | SpatialConstant, + base_kernel: Kernel | None) -> None: + super().__init__(dim=dim, mu=mu, nu=0.5, base_kernel=base_kernel) + StressletWrapperBase.__init__(self, dim=dim, mu=mu) + + @override + def apply_pressure(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> ArithmeticExpression: + sym_expr = super().apply_pressure( + density_vec_sym, + dir_vec_sym, + qbx_forced_limit, + extra_deriv_dirs=extra_deriv_dirs) + + res, = rewrite_using_base_kernel([sym_expr], base_kernel=self.base_kernel) + return res + + @override + def apply_stress(self, + density_vec_sym: VectorExpression, + normal_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: -# {{{ StressletWrapper + sym_expr = np.empty((self.dim,), dtype=object) -class StressletWrapper: - """Wrapper class for the :class:`~sumpy.kernel.StressletKernel` kernel. + # Build velocity derivative matrix + sym_grad_matrix = np.empty((self.dim, self.dim), dtype=object) + for i in range(self.dim): + sym_grad_matrix[:, i] = self.apply_derivative(i, density_vec_sym, + normal_vec_sym, qbx_forced_limit) - This class is meant to shield the user from the messiness of writing - out every term in the expansion of the triple-indexed Stresslet - kernel applied to both a normal vector and the density vector. - The object is created to do some of the set-up and bookkeeping once, - rather than every time we want to create a symbolic expression based - on the kernel -- say, once when we solve for the density, and once when - we want a symbolic representation for the solution, for example. + for comp in range(self.dim): - The :meth:`apply` function returns the integral expressions needed for - convolving the kernel with a vector density, and is meant to work - similarly to :func:`~pytential.symbolic.primitives.S` (which is - :class:`~pytential.symbolic.primitives.IntG`). + # First, add the pressure term: + sym_expr[comp] = - dir_vec_sym[comp] * self.apply_pressure( + density_vec_sym, normal_vec_sym, + qbx_forced_limit) - Similar functions are available for other useful things related to - the flow: :meth:`apply_pressure`, :meth:`apply_derivative` (target derivative), - :meth:`apply_stress` (applies symmetric viscous stress tensor in - the requested direction). + # Now add the velocity derivative components + for j in range(self.dim): + sym_expr[comp] = sym_expr[comp] + ( + dir_vec_sym[j] * self.mu * ( + sym_grad_matrix[comp][j] + + sym_grad_matrix[j][comp]) + ) + return sym_expr - .. attribute:: kernel_dict - The dictionary allows us to exploit symmetry -- that - :math:`T_{012}` is identical to :math:`T_{120}` -- and avoid creating - multiple expansions for the same kernel in a different ordering. +class StokesletWrapperNaive(_StokesletWrapperNaiveOrBiharmonic, + ElasticityWrapperBase): + r"""Stokeslet wrapper based on the full Stokeslet kernel. - .. automethod:: __init__ - .. automethod:: apply - .. automethod:: apply_pressure - .. automethod:: apply_derivative - .. automethod:: apply_stress + This representation uses the Stokeslet kernel denoted by + :attr:`~pytential.symbolic.elasticity.Method.Naive`. """ - dim: int + def __init__(self, dim: int, mu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, base_kernel=None) + ElasticityWrapperBase.__init__(self, dim=dim, mu=mu, nu=0.5) - def __init__(self, dim: int): - self.dim = dim - - if dim == 2: - self.kernel_dict = { - (3, 0): StressletKernel(dim=2, icomp=0, jcomp=0, kcomp=0), - (2, 1): StressletKernel(dim=2, icomp=0, jcomp=0, kcomp=1), - (1, 2): StressletKernel(dim=2, icomp=0, jcomp=1, kcomp=1), - (0, 3): StressletKernel(dim=2, icomp=1, jcomp=1, kcomp=1) - } - - elif dim == 3: - self.kernel_dict = { - (3, 0, 0): StressletKernel(dim=3, icomp=0, jcomp=0, kcomp=0), - (2, 1, 0): StressletKernel(dim=3, icomp=0, jcomp=0, kcomp=1), - (2, 0, 1): StressletKernel(dim=3, icomp=0, jcomp=0, kcomp=2), - (1, 2, 0): StressletKernel(dim=3, icomp=0, jcomp=1, kcomp=1), - (1, 1, 1): StressletKernel(dim=3, icomp=0, jcomp=1, kcomp=2), - (1, 0, 2): StressletKernel(dim=3, icomp=0, jcomp=2, kcomp=2), - (0, 3, 0): StressletKernel(dim=3, icomp=1, jcomp=1, kcomp=1), - (0, 2, 1): StressletKernel(dim=3, icomp=1, jcomp=1, kcomp=2), - (0, 1, 2): StressletKernel(dim=3, icomp=1, jcomp=2, kcomp=2), - (0, 0, 3): StressletKernel(dim=3, icomp=2, jcomp=2, kcomp=2) - } - - else: - raise ValueError("unsupported dimension given to StressletWrapper") - - def apply(self, density_vec_sym, dir_vec_sym, mu_sym, qbx_forced_limit): - """Symbolic expressions for integrating Stresslet kernel. - - Returns an object array of symbolic expressions for the vector - resulting from integrating the dyadic Stresslet kernel with - variable *density_vec_sym* and source direction vectors *dir_vec_sym*. - :arg density_vec_sym: a symbolic vector variable for the density vector. - :arg dir_vec_sym: a symbolic vector variable for the direction vector. - :arg mu_sym: a symbolic variable for the viscosity. - :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on - to :class:`~pytential.symbolic.primitives.IntG`. - """ +class StressletWrapperNaive(_StressletWrapperNaiveOrBiharmonic, + ElasticityDoubleLayerWrapperBase): + r"""Stresslet wrapper based on the full Stresslet kernel. - import itertools + This representation uses the Stresslet kernel denoted by + :attr:`~pytential.symbolic.elasticity.Method.Naive`. + """ - sym_expr = np.empty((self.dim,), dtype=object) + def __init__(self, dim: int, mu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, base_kernel=None) + ElasticityDoubleLayerWrapperBase.__init__(self, dim=dim, mu=mu, nu=0.5) - for comp in range(self.dim): - # Start variable count for kernel with 1 for the requested result - # component - base_count = np.zeros(self.dim, dtype=np.int32) - base_count[comp] += 1 - - for i, j in itertools.product(range(self.dim), range(self.dim)): - var_ctr = base_count.copy() - var_ctr[i] += 1 - var_ctr[j] += 1 - ctr_key = tuple(var_ctr) - - if i + j < 1: - sym_expr[comp] = sym.int_g_vec( - self.kernel_dict[ctr_key], - dir_vec_sym[i] * density_vec_sym[j], - qbx_forced_limit=qbx_forced_limit, mu=mu_sym) - - else: - sym_expr[comp] = sym_expr[comp] + sym.int_g_vec( - self.kernel_dict[ctr_key], - dir_vec_sym[i] * density_vec_sym[j], - qbx_forced_limit=qbx_forced_limit, - mu=mu_sym) +class StokesletWrapperBiharmonic(_StokesletWrapperNaiveOrBiharmonic, + ElasticityWrapperBase): + r"""Stokeslet wrapper based on the biharmonic kernel. - return sym_expr + This representation uses the Stokeslet kernel denoted by + :attr:`~pytential.symbolic.elasticity.Method.Biharmonic`. + """ - def apply_pressure(self, density_vec_sym, dir_vec_sym, mu_sym, qbx_forced_limit): - """Symbolic expression for pressure field associated with the Stresslet.""" + def __init__(self, dim: int, mu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, base_kernel=BiharmonicKernel(dim)) + ElasticityWrapperBase.__init__(self, dim=dim, mu=mu, nu=0.5) - import itertools - from pytential.symbolic.mappers import DerivativeTaker - kernel = LaplaceKernel(dim=self.dim) +class StressletWrapperBiharmonic(_StressletWrapperNaiveOrBiharmonic, + ElasticityDoubleLayerWrapperBase): + r"""Stresslet wrapper based on the biharmonic kernel. - factor = (2. * mu_sym) + This representation uses the Stresslet kernel denoted by + :attr:`~pytential.symbolic.elasticity.Method.Biharmonic`. + """ - for i, j in itertools.product(range(self.dim), range(self.dim)): + def __init__(self, dim: int, mu: float | SpatialConstant) -> None: + super().__init__(dim=dim, mu=mu, base_kernel=BiharmonicKernel(dim)) + ElasticityDoubleLayerWrapperBase.__init__(self, dim=dim, mu=mu, nu=0.5) - if i + j < 1: - sym_expr = factor * DerivativeTaker(i).map_int_g( - DerivativeTaker(j).map_int_g( - sym.int_g_vec(kernel, - density_vec_sym[i] * dir_vec_sym[j], - qbx_forced_limit=qbx_forced_limit))) - else: - sym_expr = sym_expr + ( - factor * DerivativeTaker(i).map_int_g( - DerivativeTaker(j).map_int_g( - sym.int_g_vec(kernel, - density_vec_sym[i] * dir_vec_sym[j], - qbx_forced_limit=qbx_forced_limit)))) +# }}} - return sym_expr - def apply_derivative(self, deriv_dir, density_vec_sym, dir_vec_sym, - mu_sym, qbx_forced_limit): - """Symbolic derivative of velocity from stresslet. +# {{{ Stokeslet/Stresslet using Laplace (Tornberg) - Returns an object array of symbolic expressions for the vector - resulting from integrating the *deriv_dir* target derivative of the - dyadic Stresslet kernel with variable *density_vec_sym* and source - direction vectors *dir_vec_sym*. +@dataclass +class StokesletWrapperTornberg(StokesletWrapperBase): + """A Stokeslet wrapper based on [Tornberg2008]_. - :arg deriv_dir: integer denoting the axis direction for the derivative. - :arg density_vec_sym: a symbolic vector variable for the density vector. - :arg dir_vec_sym: a symbolic vector variable for the normal direction. - :arg mu_sym: a symbolic variable for the viscosity. - :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on - to :class:`~pytential.symbolic.primitives.IntG`. - """ + This representation uses the Laplace kernel denoted by + :attr:`~pytential.symbolic.elasticity.Method.Laplace`. - import itertools + .. [Tornberg2008] A.-K. Tornberg, L. Greengard, + *A Fast Multipole Method for the Three-Dimensional Stokes Equations*, + Journal of Computational Physics, Vol. 227, pp. 1613--1619, 2008, + `DOI `__. + """ - from pytential.symbolic.mappers import DerivativeTaker + dim: int + mu: float | SpatialConstant + nu: float | SpatialConstant + + def __post_init__(self): + if self.nu != 0.5: + raise ValueError("nu != 0.5 is not supported") + + @override + def apply(self, + density_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + stresslet = StressletWrapperTornberg(self.dim, self.mu, self.nu) + + return stresslet.apply_single_and_double_layer( + density_vec_sym, + obj_array.new_1d([0]*self.dim), + obj_array.new_1d([0]*self.dim), + qbx_forced_limit=qbx_forced_limit, + stokeslet_weight=1, + stresslet_weight=0, + extra_deriv_dirs=extra_deriv_dirs) + + @override + def apply_stress(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + raise NotImplementedError - sym_expr = np.empty((self.dim,), dtype=object) - for comp in range(self.dim): +@dataclass +class StressletWrapperTornberg(StressletWrapperBase): + """A Stresslet wrapper based on [Tornberg2008]_. - # Start variable count for kernel with 1 for the requested result - # component - base_count = np.zeros(self.dim, dtype=np.int32) - base_count[comp] += 1 - - for i, j in itertools.product(range(self.dim), range(self.dim)): - var_ctr = base_count.copy() - var_ctr[i] += 1 - var_ctr[j] += 1 - ctr_key = tuple(var_ctr) - - if i + j < 1: - sym_expr[comp] = DerivativeTaker(deriv_dir).map_int_g( - sym.int_g_vec(self.kernel_dict[ctr_key], - dir_vec_sym[i] * density_vec_sym[j], - qbx_forced_limit=qbx_forced_limit, - mu=mu_sym)) - - else: - sym_expr[comp] = sym_expr[comp] + DerivativeTaker( - deriv_dir).map_int_g( - sym.int_g_vec(self.kernel_dict[ctr_key], - dir_vec_sym[i] * density_vec_sym[j], - qbx_forced_limit=qbx_forced_limit, - mu=mu_sym)) + This representation uses the Laplace kernel denoted by + :attr:`~pytential.symbolic.elasticity.Method.Laplace`. + """ - return sym_expr + dim: int + mu: float | SpatialConstant + nu: float | SpatialConstant + + def __post_init__(self) -> None: + if self.nu != 0.5: + raise ValueError("nu != 0.5 is not supported") + + @cached_property + def laplace_kernel(self) -> Kernel: + return LaplaceKernel(dim=self.dim) + + @override + def apply(self, + density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + return self.apply_single_and_double_layer( + obj_array.new_1d([0]*self.dim), + density_vec_sym, + dir_vec_sym, + qbx_forced_limit=qbx_forced_limit, + stokeslet_weight=0, + stresslet_weight=1, + extra_deriv_dirs=extra_deriv_dirs) + + @override + def apply_stress(self, + density_vec_sym: VectorExpression, + normal_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + raise NotImplementedError - def apply_stress(self, density_vec_sym, normal_vec_sym, dir_vec_sym, - mu_sym, qbx_forced_limit): - r"""Symbolic expression for viscous stress applied to a direction. + def _create_int_g(self, + target_kernel: Kernel, + source_kernels: tuple[Kernel, ...], + densities: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> sym.IntG | Literal[0]: + new_source_kernels: list[Kernel] = [] + new_densities: list[ArithmeticExpression] = [] + for source_kernel, density in zip(source_kernels, densities, strict=True): + if density != 0.0: + new_source_kernels.append(source_kernel) + new_densities.append(density) + + if not new_densities: + return 0 + + return sym.IntG( + target_kernel=target_kernel, + source_kernels=tuple(new_source_kernels), + densities=tuple(new_densities), + source=sym.DEFAULT_DOFDESC, + target=sym.DEFAULT_DOFDESC, + qbx_forced_limit=qbx_forced_limit) + + def apply_single_and_double_layer( + self, + stokeslet_density_vec_sym: VectorExpression, + stresslet_density_vec_sym: VectorExpression, + dir_vec_sym: VectorExpression, + qbx_forced_limit: QBXForcedLimit, + stokeslet_weight: ArithmeticExpression, + stresslet_weight: ArithmeticExpression, + extra_deriv_dirs: tuple[int, ...] = ()) -> VectorExpression: + sym_expr = np.zeros((self.dim,), dtype=object) + + source = sym.nodes(self.dim).as_vector() + + # The paper in [1] ignores the scaling we use Stokeslet/Stresslet + # and gives formulae for the kernel expression only + # stokeslet_weight = StokesletKernel.global_scaling_const / + # LaplaceKernel.global_scaling_const + # stresslet_weight = StressletKernel.global_scaling_const / + # LaplaceKernel.global_scaling_const + stresslet_weight *= 3.0 + stokeslet_weight *= -0.5*self.mu**(-1) + + common_source_kernels: tuple[Kernel, ...] = tuple( + [AxisSourceDerivative(k, self.laplace_kernel) for k in range(self.dim)] + + [self.laplace_kernel]) - Returns a vector of symbolic expressions for the force resulting - from the viscous stress + for i in range(self.dim): + for j in range(self.dim): + densities = obj_array.new_1d([ + stresslet_weight / 6.0 * ( + stresslet_density_vec_sym[k] * dir_vec_sym[j] + + stresslet_density_vec_sym[j] * dir_vec_sym[k]) + for k in range(self.dim) + ] + [stokeslet_weight * stokeslet_density_vec_sym[j]]) + + target_kernel = TargetPointMultiplier(j, + AxisTargetDerivative(i, self.laplace_kernel)) + for deriv_dir in extra_deriv_dirs: + target_kernel = AxisTargetDerivative(deriv_dir, target_kernel) + + sym_expr[i] -= self._create_int_g( + target_kernel=target_kernel, + source_kernels=tuple(common_source_kernels), + densities=densities, + qbx_forced_limit=qbx_forced_limit) + + if i == j: + target_kernel = self.laplace_kernel + for deriv_dir in extra_deriv_dirs: + target_kernel = AxisTargetDerivative(deriv_dir, target_kernel) + + sym_expr[i] += self._create_int_g( + target_kernel=target_kernel, + source_kernels=common_source_kernels, + densities=densities, + qbx_forced_limit=qbx_forced_limit) + + common_density0 = sum( + source[k] * stresslet_density_vec_sym[k] for k in range(self.dim)) + common_density1 = sum( + source[k] * dir_vec_sym[k] for k in range(self.dim)) + common_density2 = sum( + source[k] * stokeslet_density_vec_sym[k] for k in range(self.dim)) + + densities = obj_array.new_1d([ + stresslet_weight / 6.0 * ( + common_density0 * dir_vec_sym[k] + + common_density1 * stresslet_density_vec_sym[k]) + for k in range(self.dim) + ] + [stokeslet_weight * common_density2]) + + target_kernel = AxisTargetDerivative(i, self.laplace_kernel) + for deriv_dir in extra_deriv_dirs: + target_kernel = AxisTargetDerivative(deriv_dir, target_kernel) + + sym_expr[i] += self._create_int_g( + target_kernel=target_kernel, + source_kernels=tuple(common_source_kernels), + densities=densities, + qbx_forced_limit=qbx_forced_limit) - .. math:: + return sym_expr - -p \delta_{ij} + \mu (\nabla_i u_j + \nabla_j u_i) +# }}} - applied in the direction of *dir_vec_sym*. - :arg density_vec_sym: a symbolic vector variable for the density vector. - :arg normal_vec_sym: a symbolic vector variable for the normal vectors - (outward facing normals at source locations). - :arg dir_vec_sym: a symbolic vector for the application direction. - :arg mu_sym: a symbolic variable for the viscosity. - :arg qbx_forced_limit: the *qbx_forced_limit* argument to be passed on - to :class:`~pytential.symbolic.primitives.IntG`. - """ +# {{{ StokesletWrapper dispatch method - sym_expr = np.empty((self.dim,), dtype=object) +def make_stokeslet_wrapper( + dim: int, + mu: float | str | SpatialConstant = "mu", + method: Method | None = None + ) -> StokesletWrapperBase: + """Creates an appropriate :class:`StokesletWrapperBase` object. - # Build velocity derivative matrix - sym_grad_matrix = np.empty((self.dim, self.dim), dtype=object) - for i in range(self.dim): - sym_grad_matrix[:, i] = self.apply_derivative(i, density_vec_sym, - normal_vec_sym, mu_sym, qbx_forced_limit) + :param dim: ambient dimension of the representation. + :param nu: expression or value for the viscosity. + :param method: method to use - defaults to the + :attr:`~pytential.symbolic.elasticity.Method.Naive` method. + """ + if method is None: + import warnings + warnings.warn( + "'method' argument not given. Falling back to 'naive'. " + "This argument will be required in the future.", stacklevel=2) + + method = Method.Naive + + if isinstance(mu, str): + mu = SpatialConstant(mu) + + if method == Method.Naive: + return StokesletWrapperNaive(dim=dim, mu=mu) + elif method == Method.Biharmonic: + return StokesletWrapperBiharmonic(dim=dim, mu=mu) + elif method == Method.Laplace: + return StokesletWrapperTornberg(dim=dim, mu=mu, nu=0.5) + else: + raise ValueError(f"invalid 'method': {method}") + + +def make_stresslet_wrapper( + dim: int, + mu: float | str | SpatialConstant = "mu", + method: Method | None = None + ) -> StressletWrapperBase: + """Creates an appropriate :class:`StressletWrapperBase` object. + + :param dim: ambient dimension of the representation. + :param nu: expression or value for the viscosity. + :param method: method to use - defaults to the + :attr:`~pytential.symbolic.elasticity.Method.Naive` method. + """ - for comp in range(self.dim): + if method is None: + import warnings + warnings.warn( + "'method' argument not given. Falling back to 'naive'. " + "This argument will be required in the future.", stacklevel=2) - # First, add the pressure term: - sym_expr[comp] = - dir_vec_sym[comp] * self.apply_pressure( - density_vec_sym, normal_vec_sym, - mu_sym, qbx_forced_limit) + method = Method.Naive - # Now add the velocity derivative components - for j in range(self.dim): - sym_expr[comp] = sym_expr[comp] + ( - dir_vec_sym[j] * mu_sym * ( - sym_grad_matrix[comp][j] - + sym_grad_matrix[j][comp]) - ) + if isinstance(mu, str): + mu = SpatialConstant(mu) + + if method == Method.Naive: + return StressletWrapperNaive(dim=dim, mu=mu) + elif method == Method.Biharmonic: + return StressletWrapperBiharmonic(dim=dim, mu=mu) + elif method == Method.Laplace: + return StressletWrapperTornberg(dim=dim, mu=mu, nu=0.5) + else: + raise ValueError(f"invalid 'method': {method}") - return sym_expr + +StokesletWrapper = MovedFunctionDeprecationWrapper(make_stokeslet_wrapper) +StressletWrapper = MovedFunctionDeprecationWrapper(make_stresslet_wrapper) # }}} # {{{ base Stokes operator -class StokesOperator: +class StokesOperator(ABC): """ .. attribute:: ambient_dim .. attribute:: side @@ -529,52 +675,104 @@ class StokesOperator: .. automethod:: pressure """ - def __init__(self, ambient_dim, side): + ambient_dim: int + side: Side + stokeslet: StokesletWrapperBase + stresslet: StressletWrapperBase + + def __init__(self, + ambient_dim: int, + side: Side, + stokeslet: StokesletWrapperBase | None = None, + stresslet: StressletWrapperBase | None = None, + mu: float | str | SpatialConstant | None = None) -> None: """ :arg ambient_dim: dimension of the ambient space. :arg side: :math:`+1` for exterior or :math:`-1` for interior. """ - if side not in [+1, -1]: raise ValueError(f"invalid evaluation side: {side}") + if mu is not None: + import warnings + warnings.warn( + "Explicitly giving 'mu' is deprecated. Pass in the 'stokeslet' " + "and 'stresslet' arguments explicitly instead.", + DeprecationWarning, stacklevel=2) + + if isinstance(mu, str): + mu = SpatialConstant(mu) + + # attempt to get mu from the inputs + if mu is None and stokeslet is not None: + mu = stokeslet.mu + + if mu is None and stresslet is not None: + mu = stresslet.mu + + # if not found, use the default + if mu is None: + mu = SpatialConstant("mu") + + if stresslet is None: + stresslet = make_stresslet_wrapper(dim=ambient_dim, mu=mu) + + if stokeslet is None: + stokeslet = make_stokeslet_wrapper(dim=ambient_dim, mu=mu) + + assert mu == stokeslet.mu + assert mu == stresslet.mu + self.ambient_dim = ambient_dim self.side = side - - self.stresslet = StressletWrapper(dim=self.ambient_dim) - self.stokeslet = StokesletWrapper(dim=self.ambient_dim) + self.stokeslet = stokeslet + self.stresslet = stresslet @property def dim(self): return self.ambient_dim - 1 - def get_density_var(self, name="sigma"): + def get_density_var(self, name: str = "sigma") -> VectorExpression: """ :returns: a symbolic vector corresponding to the density. """ return sym.make_sym_vector(name, self.ambient_dim) - def prepare_rhs(self, b, *, mu): + def prepare_rhs(self, b: VectorExpression) -> VectorExpression: """ :returns: a (potentially) modified right-hand side *b* that matches requirements of the representation. """ return b - def operator(self, sigma): + @abstractmethod + def operator(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = "avg") -> VectorExpression: """ :returns: the integral operator that should be solved to obtain the density *sigma*. """ raise NotImplementedError - def velocity(self, sigma, *, normal, mu, qbx_forced_limit=None): + @abstractmethod + def velocity(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = None, + ) -> VectorExpression: """ :returns: a representation of the velocity field in the Stokes flow. """ raise NotImplementedError - def pressure(self, sigma, *, normal, mu, qbx_forced_limit=None): + @abstractmethod + def pressure(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = None, + ) -> ArithmeticExpression: """ :returns: a representation of the pressure in the Stokes flow. """ @@ -598,7 +796,17 @@ class HsiaoKressExteriorStokesOperator(StokesOperator): .. automethod:: __init__ """ - def __init__(self, *, omega, alpha=None, eta=None): + omega: VectorExpression + alpha: float + eta: float + + def __init__(self, *, + omega: VectorExpression, + alpha: float = 1.0, + eta: float = 1.0, + stokeslet: StokesletWrapperBase | None = None, + stresslet: StressletWrapperBase | None = None, + mu: float | str | SpatialConstant | None = None) -> None: r""" :arg omega: farfield behaviour of the velocity field, as defined by :math:`A` in [HsiaoKress1985]_ Equation 2.3. @@ -606,7 +814,9 @@ def __init__(self, *, omega, alpha=None, eta=None): :arg eta: real parameter :math:`\eta > 0`. Choosing this parameter well can have a non-trivial effect on the conditioning. """ - super().__init__(ambient_dim=2, side=+1) + super().__init__( + ambient_dim=2, side=+1, + stokeslet=stokeslet, stresslet=stresslet, mu=mu) # NOTE: in [hsiao-kress], there is an analysis on a circle, which # recommends values in @@ -614,61 +824,76 @@ def __init__(self, *, omega, alpha=None, eta=None): # so we choose alpha = eta = 1, which seems to be in line with some # of the presented numerical results too. - if alpha is None: - alpha = 1.0 - - if eta is None: - eta = 1.0 - self.omega = omega self.alpha = alpha self.eta = eta - def _farfield(self, mu, qbx_forced_limit): - length = sym.integral(self.ambient_dim, self.dim, 1) - return self.stokeslet.apply( - -self.omega / length, - mu, - qbx_forced_limit=qbx_forced_limit) + def _farfield(self, qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + source_dofdesc = sym.DOFDescriptor(None, discr_stage=sym.QBX_SOURCE_STAGE1) - def _operator(self, sigma, normal, mu, qbx_forced_limit): - slp_qbx_forced_limit = qbx_forced_limit - if slp_qbx_forced_limit == "avg": - slp_qbx_forced_limit = +1 + length = sym.integral(self.ambient_dim, self.dim, 1, dofdesc=source_dofdesc) + result = self.stresslet.apply_single_and_double_layer( + -self.omega / length, + obj_array.new_1d([0]*self.ambient_dim), + obj_array.new_1d([0]*self.ambient_dim), + qbx_forced_limit=qbx_forced_limit, + stokeslet_weight=1, + stresslet_weight=0) + return result + + def _operator(self, + sigma: VectorExpression, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: # NOTE: we set a dofdesc here to force the evaluation of this integral # on the source instead of the target when using automatic tagging # see :meth:`pytential.symbolic.mappers.LocationTagger._default_dofdesc` dd = sym.DOFDescriptor(None, discr_stage=sym.QBX_SOURCE_STAGE1) - int_sigma = sym.integral(self.ambient_dim, self.dim, sigma, dofdesc=dd) - meanless_sigma = sym.cse(sigma - sym.mean(self.ambient_dim, self.dim, sigma)) - - op_k = self.stresslet.apply(sigma, normal, mu, - qbx_forced_limit=qbx_forced_limit) - op_s = ( - self.alpha / (2.0 * np.pi) * int_sigma - - self.stokeslet.apply(meanless_sigma, mu, - qbx_forced_limit=slp_qbx_forced_limit) - ) - - return op_k + self.eta * op_s - - def prepare_rhs(self, b, *, mu): - return b + self._farfield(mu, qbx_forced_limit=+1) - def operator(self, sigma, *, normal, mu): + meanless_sigma = sym.cse( + sigma - sym.mean(self.ambient_dim, self.dim, sigma, dofdesc=dd) + ) + + result = self.eta * self.alpha / (2.0 * np.pi) * int_sigma + result += self.stresslet.apply_single_and_double_layer( + meanless_sigma, + sigma, + normal, + qbx_forced_limit=qbx_forced_limit, + stokeslet_weight=-self.eta, + stresslet_weight=1) + + return result + + @override + def prepare_rhs(self, b: VectorExpression) -> VectorExpression: + return b + self._farfield(qbx_forced_limit=+1) + + @override + def operator(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = "avg") -> VectorExpression: # NOTE: H. K. 1985 Equation 2.18 - return -0.5 * self.side * sigma - self._operator(sigma, normal, mu, "avg") - - def velocity(self, sigma, *, normal, mu, qbx_forced_limit=2): + lp = self._operator(sigma, normal, qbx_forced_limit) + return -0.5 * self.side * sigma - lp + + @override + def velocity(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = 2) -> VectorExpression: # NOTE: H. K. 1985 Equation 2.16 - return ( - -self._farfield(mu, qbx_forced_limit) - - self._operator(sigma, normal, mu, qbx_forced_limit) - ) - - def pressure(self, sigma, *, normal, mu, qbx_forced_limit=2): + lp = self._operator(sigma, normal, qbx_forced_limit) + return -self._farfield(qbx_forced_limit) - lp + + @override + def pressure(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = 2) -> ArithmeticExpression: # FIXME: H. K. 1985 Equation 2.17 raise NotImplementedError @@ -686,13 +911,22 @@ class HebekerExteriorStokesOperator(StokesOperator): .. automethod:: __init__ """ - def __init__(self, *, eta=None): + eta: float + laplace_kernel: LaplaceKernel + + def __init__(self, *, + eta: float | None = None, + stokeslet: StokesletWrapperBase | None = None, + stresslet: StressletWrapperBase | None = None, + mu: float | str | SpatialConstant | None = None) -> None: r""" :arg eta: a parameter :math:`\eta > 0`. Choosing this parameter well can have a non-trivial effect on the conditioning of the operator. """ - super().__init__(ambient_dim=3, side=+1) + super().__init__( + ambient_dim=3, side=+1, + stokeslet=stokeslet, stresslet=stresslet, mu=mu) # NOTE: eta is chosen here based on H. 1986 Figure 1, which is # based on solving on the unit sphere @@ -700,28 +934,45 @@ def __init__(self, *, eta=None): eta = 0.75 self.eta = eta - - def _operator(self, sigma, normal, mu, qbx_forced_limit): - slp_qbx_forced_limit = qbx_forced_limit - if slp_qbx_forced_limit == "avg": - slp_qbx_forced_limit = self.side - - op_w = self.stresslet.apply(sigma, normal, mu, - qbx_forced_limit=qbx_forced_limit) - op_v = self.stokeslet.apply(sigma, mu, - qbx_forced_limit=slp_qbx_forced_limit) - - return op_w + self.eta * op_v - - def operator(self, sigma, *, normal, mu): + self.laplace_kernel = LaplaceKernel(3) + + def _operator(self, + sigma: VectorExpression, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit) -> VectorExpression: + result = self.stresslet.apply_single_and_double_layer( + sigma, + sigma, + normal, + qbx_forced_limit=qbx_forced_limit, + stokeslet_weight=self.eta, + stresslet_weight=1, + extra_deriv_dirs=()) + + return result + + @override + def operator(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = "avg") -> VectorExpression: # NOTE: H. 1986 Equation 17 - return -0.5 * self.side * sigma - self._operator(sigma, normal, mu, "avg") - - def velocity(self, sigma, *, normal, mu, qbx_forced_limit=2): + op = self._operator(sigma, normal, qbx_forced_limit) + return -0.5 * self.side * sigma - op + + @override + def velocity(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = 2) -> VectorExpression: # NOTE: H. 1986 Equation 16 - return -self._operator(sigma, normal, mu, qbx_forced_limit) + return -self._operator(sigma, normal, qbx_forced_limit) - def pressure(self, sigma, *, normal, mu, qbx_forced_limit=2): + @override + def pressure(self, + sigma: VectorExpression, *, + normal: VectorExpression, + qbx_forced_limit: QBXForcedLimit | None = 2) -> ArithmeticExpression: # FIXME: not given in H. 1986, but should be easy to derive using the # equivalent single-/double-layer pressure kernels raise NotImplementedError diff --git a/pytential/utils.py b/pytential/utils.py index 4b0db55f5..ce8197a58 100644 --- a/pytential/utils.py +++ b/pytential/utils.py @@ -27,11 +27,13 @@ """ import sys -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any + +import sumpy.symbolic as sym if TYPE_CHECKING: - from collections.abc import Callable, Sequence + from collections.abc import Callable, Iterable, Sequence from useful_types import SupportsRichComparison @@ -51,6 +53,92 @@ def sort_arrays_together( return zip(*sorted(zip(*arys, strict=True), key=key), strict=True) +def chop(expr: sym.Basic, tol) -> sym.Basic: + """Given a symbolic expression, remove all occurrences of numbers + with absolute value less than a given tolerance and replace floating + point numbers that are close to an integer up to a given relative + tolerance by the integer. + """ + nums = expr.atoms(sym.Number) + replace_dict: dict[Any, float] = {} + for num in nums: + if float(abs(num)) < tol: + replace_dict[num] = 0 + else: + new_num = float(num) + if abs((int(new_num) - new_num)/new_num) < tol: + new_num = int(new_num) + + replace_dict[num] = new_num + + return expr.xreplace(replace_dict) + + +def forward_substitution( + L: sym.Matrix, + b: sym.Matrix, + postprocess_division: Callable[[sym.Basic], sym.Basic], + ) -> sym.Matrix: + """Given a lower triangular matrix *L* and a column vector *b*, + solve the system ``Lx = b`` and apply the callable *postprocess_division* + on each expression at the end of division calls. + """ + n = len(b) + res = sym.Matrix(b) + for i in range(n): + for j in range(i): + res[i] -= L[i, j]*res[j] + res[i] = postprocess_division(res[i] / L[i, i]) + return res + + +def backward_substitution( + U: sym.Matrix, + b: sym.Matrix, + postprocess_division: Callable[[sym.Basic], sym.Basic], + ) -> sym.Matrix: + """Given an upper triangular matrix *U* and a column vector *b*, + solve the system ``Ux = b`` and apply the callable *postprocess_division* + on each expression at the end of division calls. + """ + n = len(b) + res = sym.Matrix(b) + for i in range(n-1, -1, -1): + for j in range(n - 1, i, -1): + res[i] -= U[i, j]*res[j] + res[i] = postprocess_division(res[i] / U[i, i]) + return res + + +def solve_from_lu( + L: sym.Matrix, + U: sym.Matrix, + perm: Iterable[tuple[int, int]], + b: sym.Matrix, + postprocess_division: Callable[[sym.Basic], sym.Basic] + ) -> sym.Matrix: + """Given an LU factorization and a vector, solve a linear + system with intermediate results expanded to avoid + an explosion of the expression trees + + :param L: lower triangular matrix + :param U: upper triangular matrix + :param perm: permutation matrix + :param b: a column vector to solve for + :param postprocess_division: callable that is called after each division + """ + # Permute first + res = sym.Matrix(b) + for p, q in perm: + res[p], res[q] = res[q], res[p] + + return backward_substitution( + U, + forward_substitution(L, res, postprocess_division), + postprocess_division, + ) + + def pytest_teardown_function(): from pyopencl.tools import clear_first_arg_caches clear_first_arg_caches() @@ -72,5 +160,4 @@ def pytest_teardown_function(): libc = ctypes.CDLL("libc.so.6") libc.malloc_trim(0) - # vim: foldmethod=marker diff --git a/test/extra_int_eq_data.py b/test/extra_int_eq_data.py index b6e304afe..fb9f44a71 100644 --- a/test/extra_int_eq_data.py +++ b/test/extra_int_eq_data.py @@ -643,7 +643,7 @@ def get_mesh(self, resolution, mesh_order): # now centered at origin and extends to -1,1 from meshmode.mesh.processing import perform_flips - return perform_flips(mesh, np.ones(mesh.nelements), dtype=np.bool) + return perform_flips(mesh, np.ones(mesh.nelements)) @dataclass diff --git a/test/test_pde_system_utils.py b/test/test_pde_system_utils.py new file mode 100644 index 000000000..e3ce8055b --- /dev/null +++ b/test/test_pde_system_utils.py @@ -0,0 +1,243 @@ +from __future__ import annotations + + +__copyright__ = "Copyright (C) 2022 Isuru Fernando" + +__license__ = """ +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +""" + +from functools import partial + +import numpy as np + +import pymbolic.primitives as prim +from sumpy.kernel import ( + AxisSourceDerivative, + AxisTargetDerivative, + BiharmonicKernel, + ExpressionKernel, + HelmholtzKernel, + LaplaceKernel, + StokesletKernel, + TargetPointMultiplier, +) +from sumpy.symbolic import USE_SYMENGINE + +from pytential import sym +from pytential.symbolic.mappers import flatten +from pytential.symbolic.pde.system_utils import ( + convert_target_transformation_to_source, + rewrite_int_g_using_base_kernel, +) + + +class RealExpressionKernel(ExpressionKernel): + @property + def is_complex_valued(self) -> bool: + return False + + +class ComplexExpressionKernel(ExpressionKernel): + @property + def is_complex_valued(self) -> bool: + return True + + +def test_convert_target_deriv(): + knl = LaplaceKernel(2) + dsource = partial(AxisSourceDerivative, inner_kernel=knl) + + int_g = sym.IntG( + AxisTargetDerivative(0, knl), + (dsource(1), knl), + (1, 2), qbx_forced_limit=1) + expected_int_g = sym.IntG( + knl, + (AxisSourceDerivative(0, dsource(1)), dsource(0)), + (-1, -2), qbx_forced_limit=1) + + expr = sum(convert_target_transformation_to_source(int_g)) + assert flatten(expr) == expected_int_g + + +def test_convert_target_point_multiplier(): + ambient_dim = 3 + knl = LaplaceKernel(ambient_dim) + dsource = partial(AxisSourceDerivative, inner_kernel=knl) + + int_g = sym.IntG( + TargetPointMultiplier(0, knl), + (dsource(1), knl), + (1, 2), qbx_forced_limit=1) + + d = sym.make_sym_vector("d", ambient_dim) + r2 = d[2]**2 + d[1]**2 + d[0]**2 + + eknl0 = RealExpressionKernel(ambient_dim, + d[1]*d[0]*r2**prim.Quotient(-3, 2), + knl.global_scaling_const) + eknl2 = RealExpressionKernel(ambient_dim, + d[0]*r2**prim.Quotient(-1, 2), + knl.global_scaling_const) + + r2 = d[0]**2 + d[1]**2 + d[2]**2 + + eknl1 = RealExpressionKernel(ambient_dim, + d[0]*d[1]*r2**prim.Quotient(-3, 2), + knl.global_scaling_const) + eknl3 = RealExpressionKernel(ambient_dim, + d[0]*r2**prim.Quotient(-1, 2), + knl.global_scaling_const) + + xs = sym.nodes(3).as_vector() + + possible_int_g1 = flatten( + sym.IntG(eknl0, (eknl0,), (1,), qbx_forced_limit=1) + + sym.IntG(eknl2, (eknl2,), (2,), qbx_forced_limit=1) + + sym.IntG(knl, (dsource(1), knl), (xs[0], 2*xs[0]), qbx_forced_limit=1)) + possible_int_g2 = flatten( + sym.IntG(eknl1, (eknl1,), (1,), qbx_forced_limit=1) + + sym.IntG(eknl3, (eknl3,), (2,), qbx_forced_limit=1) + + sym.IntG(knl, (dsource(1), knl), (xs[0], 2*xs[0]), qbx_forced_limit=1)) + + expr = flatten(sum(convert_target_transformation_to_source(int_g))) + assert expr in (possible_int_g1, possible_int_g2) + + +def test_product_rule(): + ambient_dim = 3 + knl = LaplaceKernel(ambient_dim) + dsource = partial(AxisSourceDerivative, inner_kernel=knl) + + int_g = sym.IntG( + AxisTargetDerivative(0, TargetPointMultiplier(0, knl)), + (knl,), (1,), qbx_forced_limit=1) + + d = sym.make_sym_vector("d", 3) + r2 = d[2]**2 + d[1]**2 + d[0]**2 + + eknl0 = RealExpressionKernel(ambient_dim, + d[0]**2*r2**prim.Quotient(-3, 2), + knl.global_scaling_const) + + r2 = d[0]**2 + d[1]**2 + d[2]**2 + + eknl1 = RealExpressionKernel(ambient_dim, + d[0]**2*r2**prim.Quotient(-3, 2), + knl.global_scaling_const) + + xs = sym.nodes(3).as_vector() + + possible_int_g1 = flatten( + sym.IntG(eknl0, (eknl0,), (-1,), qbx_forced_limit=1) + + sym.IntG(knl, (dsource(0),), (xs[0]*(-1),), qbx_forced_limit=1) + ) + possible_int_g2 = flatten( + sym.IntG(eknl1, (eknl1,), (-1,), qbx_forced_limit=1) + + sym.IntG(knl, (dsource(0),), (xs[0]*(-1),), qbx_forced_limit=1)) + + expr = flatten(sum(convert_target_transformation_to_source(int_g))) + assert expr in [possible_int_g1, possible_int_g2] + + +def test_convert_helmholtz(): + ambient_dim = 3 + knl = HelmholtzKernel(ambient_dim) + int_g = sym.IntG( + TargetPointMultiplier(0, knl), + (knl,), (1,), qbx_forced_limit=1, kernel_arguments={"k": 1}) + + d = sym.make_sym_vector("d", 3) + exp = prim.Variable("exp") + + if USE_SYMENGINE: + r2 = d[2]**2 + d[1]**2 + d[0]**2 + eknl = ComplexExpressionKernel( + ambient_dim, + exp(1j*r2**prim.Quotient(1, 2))*d[0] * r2**prim.Quotient(-1, 2), + knl.global_scaling_const) + else: + r2 = d[0]**2 + d[1]**2 + d[2]**2 + eknl = ComplexExpressionKernel( + ambient_dim, + d[0]*r2**prim.Quotient(-1, 2) * exp(1j*r2**prim.Quotient(1, 2)), + knl.global_scaling_const) + + xs = sym.nodes(3).as_vector() + expected_int_g = flatten( + sym.IntG(eknl, (eknl,), (1,), qbx_forced_limit=1, kernel_arguments={"k": 1}) + + sym.IntG(knl, (knl,), (xs[0],), qbx_forced_limit=1, kernel_arguments={"k": 1}) + ) + + expr = flatten(sum(convert_target_transformation_to_source(int_g))) + assert expr == expected_int_g + + +def test_convert_int_g_base(): + ambient_dim = 3 + knl = LaplaceKernel(ambient_dim) + int_g = sym.IntG(knl, (knl,), (1,), qbx_forced_limit=1) + + base_knl = BiharmonicKernel(ambient_dim) + expected_int_g = sum(sym.IntG( + base_knl, + (AxisSourceDerivative(d, AxisSourceDerivative(d, base_knl)),), + (-1,), qbx_forced_limit=1) + for d in range(ambient_dim)) + + expr = flatten(rewrite_int_g_using_base_kernel(int_g, base_kernel=base_knl)) + assert expr == flatten(expected_int_g) + + +def test_convert_int_g_base_with_const(): + ambient_dim = 2 + knl = StokesletKernel(ambient_dim, 0, 0) + base_knl = BiharmonicKernel(ambient_dim) + + dd = sym.DOFDescriptor(sym.DEFAULT_SOURCE) + int_g = sym.IntG(knl, (knl,), (1,), qbx_forced_limit=1, kernel_arguments={"mu": 2}) + + expected_int_g = flatten( + (-0.1875) / np.pi * sym.integral(ambient_dim, ambient_dim - 1, 1, dofdesc=dd) + + sym.IntG(base_knl, + (AxisSourceDerivative(1, AxisSourceDerivative(1, base_knl)),), + (0.5,), qbx_forced_limit=1)) + + expr = flatten(rewrite_int_g_using_base_kernel(int_g, base_kernel=base_knl)) + assert expr == expected_int_g + + +def test_convert_int_g_base_with_const_and_deriv(): + ambient_dim = 2 + knl = StokesletKernel(ambient_dim, 0, 0) + base_knl = BiharmonicKernel(ambient_dim) + + int_g = sym.IntG( + knl, + (AxisSourceDerivative(0, knl),), (1,), qbx_forced_limit=1, + kernel_arguments={"mu": 2}) + + expected_int_g = sym.IntG( + base_knl, + (AxisSourceDerivative( + 1, AxisSourceDerivative( + 1, AxisSourceDerivative(0, base_knl))),), + (0.5,), qbx_forced_limit=1) + + expr = flatten(rewrite_int_g_using_base_kernel(int_g, base_kernel=base_knl)) + assert expr == expected_int_g diff --git a/test/test_stokes.py b/test/test_stokes.py index 487b19429..789417a09 100644 --- a/test/test_stokes.py +++ b/test/test_stokes.py @@ -36,15 +36,21 @@ from meshmode.discretization import Discretization from meshmode.discretization.poly_element import InterpolatoryQuadratureGroupFactory from pytools import obj_array +from sumpy.symbolic import SpatialConstant from pytential import GeometryCollection, bind, sym +from pytential.symbolic.elasticity import ( + ElasticityDoubleLayerWrapperBase, + Method, + make_elasticity_double_layer_wrapper, + make_elasticity_wrapper, +) +from pytential.symbolic.stokes import StokesletWrapper +from pytential.utils import pytest_teardown_function as teardown_function # noqa: F401 logger = logging.getLogger(__name__) -from pytential.utils import pytest_teardown_function as teardown_function # noqa: F401 - - pytest_generate_tests = pytest_generate_tests_for_array_contexts([ PytestPyOpenCLArrayContextFactory, ]) @@ -73,11 +79,13 @@ def dof_array_rel_error(actx, x, xref, p=None): def run_exterior_stokes(actx_factory, *, ambient_dim, target_order, qbx_order, resolution, - fmm_order=False, # FIXME: FMM is slower than direct evaluation + fmm_order=None, # FIXME: FMM is slower than direct evaluation source_ovsmp=None, radius=1.5, aspect_ratio=1.0, mu=1.0, + nu=0.4, visualize=False, + method="naive", _target_association_tolerance=0.05, _expansions_in_tree_have_extent=True): @@ -157,27 +165,44 @@ def run_exterior_stokes(actx_factory, *, # {{{ symbolic sym_normal = sym.make_sym_vector("normal", ambient_dim) - sym_mu = sym.var("mu") + sym_mu = SpatialConstant("mu2") + + if nu == 0.5: + sym_nu = 0.5 + else: + sym_nu = SpatialConstant("nu2") + + stokeslet = make_elasticity_wrapper(ambient_dim, mu=sym_mu, + nu=sym_nu, method=Method[method]) + stresslet = make_elasticity_double_layer_wrapper(ambient_dim, mu=sym_mu, + nu=sym_nu, method=Method[method]) if ambient_dim == 2: from pytential.symbolic.stokes import HsiaoKressExteriorStokesOperator sym_omega = sym.make_sym_vector("omega", ambient_dim) - op = HsiaoKressExteriorStokesOperator(omega=sym_omega) + op = HsiaoKressExteriorStokesOperator(omega=sym_omega, stokeslet=stokeslet, + stresslet=stresslet) elif ambient_dim == 3: from pytential.symbolic.stokes import HebekerExteriorStokesOperator - op = HebekerExteriorStokesOperator() + op = HebekerExteriorStokesOperator(stokeslet=stokeslet, stresslet=stresslet) else: raise AssertionError() sym_sigma = op.get_density_var("sigma") sym_bc = op.get_density_var("bc") - sym_op = op.operator(sym_sigma, normal=sym_normal, mu=sym_mu) - sym_rhs = op.prepare_rhs(sym_bc, mu=mu) + sym_op = op.operator(sym_sigma, normal=sym_normal) + sym_rhs = op.prepare_rhs(sym_bc) - sym_velocity = op.velocity(sym_sigma, normal=sym_normal, mu=sym_mu) + sym_velocity = op.velocity(sym_sigma, normal=sym_normal) - sym_source_pot = op.stokeslet.apply(sym_sigma, sym_mu, qbx_forced_limit=None) + if ambient_dim == 3: + sym_source_pot = op.stokeslet.apply(sym_sigma, qbx_forced_limit=None) + else: + # Use the naive method here as biharmonic requires source derivatives + # of point_source + sym_source_pot = make_elasticity_wrapper(ambient_dim, mu=sym_mu, + nu=sym_nu, method=Method.Naive).apply(sym_sigma, qbx_forced_limit=None) # }}} @@ -198,20 +223,43 @@ def run_exterior_stokes(actx_factory, *, omega = bind(places, total_charge * sym.Ones())(actx) if ambient_dim == 2: - bc_context = {"mu": mu, "omega": omega} - op_context = {"mu": mu, "omega": omega, "normal": normal} + bc_context = {"mu2": mu, "omega": omega} + op_context = {"mu2": mu, "omega": omega, "normal": normal} else: bc_context = {} - op_context = {"mu": mu, "normal": normal} + op_context = {"mu2": mu, "normal": normal} + direct_context = {"mu2": mu} - bc = bind(places, sym_source_pot, - auto_where=("point_source", "source"))(actx, sigma=charges, mu=mu) + if sym_nu != 0.5: + bc_context["nu2"] = nu + op_context["nu2"] = nu + direct_context["nu2"] = nu + + bc_op = bind(places, sym_source_pot, + auto_where=("point_source", "source")) + bc = bc_op(actx, sigma=charges, **direct_context) rhs = bind(places, sym_rhs)(actx, bc=bc, **bc_context) bound_op = bind(places, sym_op) # }}} + fmm_timing_data = {} + bound_op.eval({"sigma": rhs, **op_context}, array_context=actx, + timing_data=fmm_timing_data) + + def print_timing_data(timings, name): + result = dict.fromkeys(next(timings.values()), value=0) + total = 0 + for k, timing in timings.items(): + for k, v in timing.items(): + result[k] += v["wall_elapsed"] + total += v["wall_elapsed"] + result["total"] = total + print(f"{name}={result}") + + # print_timing_data(fmm_timing_data, method) + # {{{ solve from pytential.linalg.gmres import gmres @@ -224,7 +272,6 @@ def run_exterior_stokes(actx_factory, *, progress=visualize, stall_iterations=0, hard_failure=True) - sigma = result.solution # }}} @@ -234,7 +281,8 @@ def run_exterior_stokes(actx_factory, *, velocity = bind(places, sym_velocity, auto_where=("source", "point_target"))(actx, sigma=sigma, **op_context) ref_velocity = bind(places, sym_source_pot, - auto_where=("point_source", "point_target"))(actx, sigma=charges, mu=mu) + auto_where=("point_source", "point_target"))(actx, sigma=charges, + **direct_context) v_error = [0.0] * ambient_dim v_error[:ambient_dim] = [ @@ -271,11 +319,19 @@ def run_exterior_stokes(actx_factory, *, return h_max, v_error -@pytest.mark.parametrize("ambient_dim", [ - 2, - pytest.param(3, marks=pytest.mark.slowtest) +@pytest.mark.parametrize("ambient_dim, method, nu", [ + (2, "Naive", 0.5), + (2, "Laplace", 0.5), + (2, "Biharmonic", 0.5), + pytest.param(3, "Naive", 0.5, marks=pytest.mark.slowtest), + pytest.param(3, "Biharmonic", 0.5, marks=pytest.mark.slowtest), + pytest.param(3, "Laplace", 0.5, marks=pytest.mark.slowtest), + + (2, "Biharmonic", 0.4), + pytest.param(3, "Biharmonic", 0.4, marks=pytest.mark.slowtest), + pytest.param(3, "Laplace", 0.4, marks=pytest.mark.slowtest), ]) -def test_exterior_stokes(actx_factory, ambient_dim, visualize=False): +def test_exterior_stokes(actx_factory, ambient_dim, method, nu, visualize=False): if visualize: logging.basicConfig(level=logging.INFO) @@ -287,8 +343,10 @@ def test_exterior_stokes(actx_factory, ambient_dim, visualize=False): qbx_order = 3 if ambient_dim == 2: + fmm_order = 10 resolutions = [20, 35, 50] elif ambient_dim == 3: + fmm_order = 6 resolutions = [0, 1, 2] else: raise ValueError(f"unsupported dimension: {ambient_dim}") @@ -297,9 +355,12 @@ def test_exterior_stokes(actx_factory, ambient_dim, visualize=False): h_max, errors = run_exterior_stokes(actx_factory, ambient_dim=ambient_dim, target_order=target_order, + fmm_order=fmm_order, qbx_order=qbx_order, source_ovsmp=source_ovsmp, resolution=resolution, + method=method, + nu=nu, visualize=visualize) for eoc, e in zip(eocs, errors, strict=True): @@ -311,12 +372,17 @@ def test_exterior_stokes(actx_factory, ambient_dim, visualize=False): error_format="%.8e", eoc_format="%.2f")) + orders_lost = 0 + if method == "Biharmonic": + orders_lost += 1 + elif nu != 0.5: + orders_lost += 0.5 for eoc in eocs: # This convergence data is not as clean as it could be. See # https://github.com/inducer/pytential/pull/32 # for some discussion. order = min(target_order, qbx_order) - assert eoc.order_estimate() > order - 0.5 + assert eoc.order_estimate() > order - 0.5 - orders_lost # }}} @@ -408,15 +474,14 @@ class StokesletIdentity: """[Pozrikidis1992] Problem 3.1.1""" def __init__(self, ambient_dim): - from pytential.symbolic.stokes import StokesletWrapper self.ambient_dim = ambient_dim - self.stokeslet = StokesletWrapper(self.ambient_dim) + self.stokeslet = StokesletWrapper(self.ambient_dim, mu=1) def apply_operator(self): sym_density = sym.normal(self.ambient_dim).as_vector() return self.stokeslet.apply( sym_density, - mu_sym=1, qbx_forced_limit=+1) + qbx_forced_limit=+1) def ref_result(self): return obj_array.new_1d([1.0e-15 * sym.Ones()] * self.ambient_dim) @@ -467,15 +532,14 @@ class StressletIdentity: """[Pozrikidis1992] Equation 3.2.7""" def __init__(self, ambient_dim): - from pytential.symbolic.stokes import StokesletWrapper self.ambient_dim = ambient_dim - self.stokeslet = StokesletWrapper(self.ambient_dim) + self.stokeslet = StokesletWrapper(self.ambient_dim, mu=1) def apply_operator(self): sym_density = sym.normal(self.ambient_dim).as_vector() return self.stokeslet.apply_stress( sym_density, sym_density, - mu_sym=1, qbx_forced_limit="avg") + qbx_forced_limit="avg") def ref_result(self): return -0.5 * sym.normal(self.ambient_dim).as_vector() @@ -520,6 +584,258 @@ def test_stresslet_identity(actx_factory, cls, visualize=False): # }}} +# {{{ test Stokes PDE + +def run_stokes_pde(actx_factory, case, identity, resolution, visualize=False): + from sumpy.point_calculus import CalculusPatch + + from pytential.target import PointsTarget + actx = actx_factory() + + dim = case.ambient_dim + qbx = case.get_layer_potential(actx, resolution, case.target_order) + + h_min = actx.to_numpy(bind(qbx, sym.h_min(dim))(actx)) + h_max = actx.to_numpy(bind(qbx, sym.h_max(dim))(actx)) + + if dim == 2: + h = h_max + else: + h = h_max / 5 + + cp = CalculusPatch([0, 0, 0][:dim], h=h, order=case.target_order + 1) + targets = PointsTarget(actx.freeze(actx.from_numpy(cp.points))) + + places = GeometryCollection({case.name: qbx, "cp": targets}, + auto_where=(case.name, "cp")) + + potential = bind(places, identity.apply_operator())(actx) + potential_host = actx.to_numpy(potential) + result = identity.apply_pde_using_calculus_patch(cp, potential_host) + result_pytential = actx.to_numpy(bind(places, + identity.apply_pde_using_pytential())(actx)) + + m = np.max([np.linalg.norm(p, ord=np.inf) for p in potential_host]) + error = ( + [np.linalg.norm(x, ord=np.inf)/m for x in result] + + [np.linalg.norm(x, ord=np.inf)/m for x in result_pytential]) + + logger.info( + "resolution %4d h_min %.5e h_max %.5e error %s", + resolution, h_min, h_max, *error[:places.ambient_dim]) + + return h_max, error + + +class StokesPDE: + def __init__(self, ambient_dim, wrapper): + self.ambient_dim = ambient_dim + self.wrapper = wrapper + + @property + def dim(self) -> int: + return self.ambient_dim + 1 + + def apply_operator(self): + dim = self.ambient_dim + args = { + "density_vec_sym": [1]*dim, + "qbx_forced_limit": None, + } + if isinstance(self.wrapper, ElasticityDoubleLayerWrapperBase): + args["dir_vec_sym"] = sym.normal(self.ambient_dim).as_vector() + + velocity = self.wrapper.apply(**args) + pressure = self.wrapper.apply_pressure(**args) + return obj_array.new_1d([*velocity, pressure]) + + def apply_pde_using_pytential(self): + dim = self.ambient_dim + args = { + "density_vec_sym": [1]*dim, + "qbx_forced_limit": None, + } + if isinstance(self.wrapper, ElasticityDoubleLayerWrapperBase): + args["dir_vec_sym"] = sym.normal(self.ambient_dim).as_vector() + + d_u = [self.wrapper.apply(**args, extra_deriv_dirs=(i,)) + for i in range(dim)] + dd_u = [self.wrapper.apply(**args, extra_deriv_dirs=(i, i)) + for i in range(dim)] + laplace_u = [sum(dd_u[j][i] for j in range(dim)) for i in range(dim)] + d_p = [self.wrapper.apply_pressure(**args, extra_deriv_dirs=(i,)) + for i in range(dim)] + eqs = [laplace_u[i] - d_p[i] for i in range(dim)] + \ + [sum(d_u[i][i] for i in range(dim))] + return obj_array.new_1d(eqs) + + def apply_pde_using_calculus_patch(self, cp, potential): + dim = self.ambient_dim + velocity = potential[:dim] + pressure = potential[dim] + eqs = [cp.laplace(velocity[i]) - cp.diff(i, pressure) for i in range(dim)] \ + + [sum(cp.diff(i, velocity[i]) for i in range(dim))] + return obj_array.new_1d(eqs) + + +class ElasticityPDE: + def __init__(self, ambient_dim, wrapper): + self.ambient_dim = ambient_dim + self.wrapper = wrapper + + @property + def dim(self) -> int: + return self.ambient_dim + + def apply_operator(self): + dim = self.ambient_dim + args = { + "density_vec_sym": [1]*dim, + "qbx_forced_limit": None, + } + if isinstance(self.wrapper, ElasticityDoubleLayerWrapperBase): + args["dir_vec_sym"] = sym.normal(self.ambient_dim).as_vector() + + return self.wrapper.apply(**args) + + def apply_pde_using_pytential(self): + dim = self.ambient_dim + args = { + "density_vec_sym": [1]*dim, + "qbx_forced_limit": None, + } + if isinstance(self.wrapper, ElasticityDoubleLayerWrapperBase): + args["dir_vec_sym"] = sym.normal(self.ambient_dim).as_vector() + + mu = self.wrapper.mu + nu = self.wrapper.nu + assert nu != 0.5 + lam = 2*nu*mu/(1-2*nu) + + derivs = {} + + for i in range(dim): + for j in range(i + 1): + derivs[i, j] = self.wrapper.apply(**args, extra_deriv_dirs=(i, j)) + derivs[j, i] = derivs[i, j] + + laplace_u = sum(derivs[i, i] for i in range(dim)) + grad_of_div_u = [sum(derivs[i, j][j] for j in range(dim)) + for i in range(dim)] + + # Navier-Cauchy equations + eqs = [(lam + mu)*grad_of_div_u[i] + mu*laplace_u[i] for i in range(dim)] + return obj_array.new_1d(eqs) + + def apply_pde_using_calculus_patch(self, cp, potential): + dim = self.ambient_dim + mu = self.wrapper.mu + nu = self.wrapper.nu + assert nu != 0.5 + lam = 2*nu*mu/(1-2*nu) + + laplace_u = [cp.laplace(potential[i]) for i in range(dim)] + grad_of_div_u = [ + sum(cp.diff(j, cp.diff(i, potential[j])) for j in range(dim)) + for i in range(dim)] + + # Navier-Cauchy equations + eqs = [(lam + mu)*grad_of_div_u[i] + mu*laplace_u[i] for i in range(dim)] + return obj_array.new_1d(eqs) + + +@pytest.mark.parametrize("dim, method, nu, is_double_layer", [ + # Single layer + pytest.param(2, "Biharmonic", 0.4, False), + pytest.param(2, "Biharmonic", 0.5, False), + pytest.param(2, "Laplace", 0.5, False), + pytest.param(3, "Laplace", 0.5, False), + pytest.param(3, "Laplace", 0.4, False), + pytest.param(2, "Naive", 0.4, False, marks=pytest.mark.slowtest), + pytest.param(3, "Naive", 0.4, False, marks=pytest.mark.slowtest), + pytest.param(2, "Naive", 0.5, False, marks=pytest.mark.slowtest), + pytest.param(3, "Naive", 0.5, False, marks=pytest.mark.slowtest), + # FIXME: re-enable when merge_int_g_exprs is in + pytest.param(3, "Biharmonic", 0.4, False, marks=pytest.mark.skip), + pytest.param(3, "Biharmonic", 0.5, False, marks=pytest.mark.skip), + # FIXME: re-enable when StokesletWrapperYoshida is implemented for 2D + pytest.param(2, "Laplace", 0.4, False, marks=pytest.mark.xfail), + + # Double layer + pytest.param(2, "Laplace", 0.5, True), + pytest.param(3, "Laplace", 0.5, True), + pytest.param(3, "Laplace", 0.4, True), + pytest.param(2, "Naive", 0.4, True, marks=pytest.mark.slowtest), + pytest.param(3, "Naive", 0.4, True, marks=pytest.mark.slowtest), + pytest.param(2, "Naive", 0.5, True, marks=pytest.mark.slowtest), + pytest.param(3, "Naive", 0.5, True, marks=pytest.mark.slowtest), + # FIXME: re-enable when merge_int_g_exprs is in + pytest.param(2, "Biharmonic", 0.4, True, marks=pytest.mark.skip), + pytest.param(2, "Biharmonic", 0.5, True, marks=pytest.mark.skip), + pytest.param(3, "Biharmonic", 0.4, True, marks=pytest.mark.skip), + pytest.param(3, "Biharmonic", 0.5, True, marks=pytest.mark.skip), + # FIXME: re-enable when StressletWrapperYoshida is implemented for 2D + pytest.param(2, "Laplace", 0.4, True, marks=pytest.mark.xfail), + ]) +def test_elasticity_pde(actx_factory, dim, method, nu, is_double_layer, + visualize=False): + if visualize: + logging.basicConfig(level=logging.INFO) + + if dim == 2: + case_cls = eid.CircleTestCase + resolutions = [4, 8, 16] + else: + case_cls = eid.SpheroidTestCase + resolutions = [0, 1, 2] + + source_ovsmp = 4 if dim == 2 else 8 + case = case_cls(fmm_backend=None, + target_order=5, qbx_order=3, source_ovsmp=source_ovsmp, + resolutions=resolutions) + + if nu == 0.5: + pde_class = StokesPDE + else: + pde_class = ElasticityPDE + + if is_double_layer: + identity = pde_class(dim, make_elasticity_double_layer_wrapper( + case.ambient_dim, mu=1, nu=nu, method=Method[method])) + else: + identity = pde_class(dim, make_elasticity_wrapper( + case.ambient_dim, mu=1, nu=nu, method=Method[method])) + + from pytools.convergence import EOCRecorder + eocs = [EOCRecorder() for _ in range(2*identity.dim)] + + for resolution in case.resolutions: + h_max, errors = run_stokes_pde( + actx_factory, case, identity, + resolution=resolution, + visualize=visualize) + + for eoc, e in zip(eocs, errors, strict=True): + eoc.add_data_point(h_max, e) + + for eoc in eocs: + print(eoc.pretty_print( + abscissa_format="%.8e", + error_format="%.8e", + eoc_format="%.2f")) + + for eoc in eocs: + order = min(case.target_order, case.qbx_order) + # Sometimes the error has already converged to a value close to + # machine epsilon. In that case we have to reduce the resolution + # to increase the error to observe the error behaviour, but when + # reducing the resolution is not possible, we check that all the + # errors are below a certain threshold. + assert eoc.order_estimate() > order - 1 or eoc.max_error() < 1e-10 + +# }}} + + # You can test individual routines by typing # $ python test_stokes.py 'test_routine()' diff --git a/test/test_tools.py b/test/test_tools.py index e320e3e5b..107dcdf1b 100644 --- a/test/test_tools.py +++ b/test/test_tools.py @@ -286,8 +286,28 @@ def test_add_geometry_to_collection(actx_factory): _add_geometry_to_collection(actx, places, sources) _add_geometry_to_collection(actx, places, targets) + # }}} +# {{{ test solve_from_lu + +def test_solve_from_lu(): + import sumpy.symbolic as sym + + from pytential.utils import solve_from_lu + x, y, z = sym.symbols("x, y, z") + m = sym.Matrix([[0, x, y], [1, 0, x], [y, 2, 5]]) + L, U, perm = m.LUdecomposition() + + b = sym.Matrix([z, 1, 2]) + sol = solve_from_lu(L, U, perm, b, lambda x: x.expand()) + expected = m.solve(b) + + assert (sol - expected).expand().applyfunc(lambda x: x.simplify()) \ + == sym.Matrix([0, 0, 0]) + + +# }}} # You can test individual routines by typing # $ python test_tools.py 'test_routine()'