"""
Classes for computing diagnostics.
"""
from .utility import *
from .configuration import *
from abc import ABCMeta, abstractmethod
__all__ = ["VorticityCalculator2D", "HessianRecoverer2D", "KineticEnergyCalculator",
"ShallowWaterDualWeightedResidual2D", "TracerDualWeightedResidual2D"]
class DiagnosticCalculator(FrozenHasTraits):
"""
Base class that defines the API for all diagnostic calculators.
"""
__metaclass__ = ABCMeta
def __call__(self):
self.solve()
@abstractmethod
def solve(self):
pass
[docs]
class VorticityCalculator2D(DiagnosticCalculator):
r"""
Linear solver for recovering fluid vorticity,
interpreted as a scalar field:
.. math::
\omega = -v_x + u_y,
for a velocity field :math:`\mathbf{u} = (u, v)`.
It is recommended that the vorticity is sought
in :math:`\mathbb P1` space.
"""
uv_2d = FiredrakeVectorExpression(
Constant(as_vector([0.0, 0.0])), help='Horizontal velocity').tag(config=True)
@unfrozen
@PETSc.Log.EventDecorator("thetis.VorticityCalculator2D.__init__")
def __init__(self, uv_2d, vorticity_2d, **kwargs):
"""
:arg uv_2d: vector expression for the horizontal velocity.
:arg vorticity_2d: :class:`Function` to hold calculated vorticity.
:kwargs: to be passed to the :class:`LinearVariationalSolver`.
"""
self.uv_2d = uv_2d
fs = vorticity_2d.function_space()
dim = fs.mesh().topological_dimension()
if dim != 2:
raise ValueError(f'Dimension {dim} not supported')
if element_continuity(fs.ufl_element()).horizontal != 'cg':
raise ValueError('Vorticity must be calculated in a continuous space')
n = FacetNormal(fs.mesh())
# Weak formulation
test = TestFunction(fs)
a = TrialFunction(fs)*test*dx
L = -inner(perp(self.uv_2d), grad(test))*dx \
+ dot(perp(self.uv_2d), test*n)*ds \
+ dot(avg(perp(self.uv_2d)), jump(test, n))*dS
# Setup vorticity solver
prob = LinearVariationalProblem(a, L, vorticity_2d)
kwargs.setdefault('solver_parameters', {
"ksp_type": "cg",
"pc_type": "bjacobi",
"sub_pc_type": "ilu",
})
self.solver = LinearVariationalSolver(prob, **kwargs)
[docs]
@PETSc.Log.EventDecorator("thetis.VorticityCalculator2D.solve")
def solve(self):
self.solver.solve()
[docs]
class HessianRecoverer2D(DiagnosticCalculator):
r"""
Linear solver for recovering Hessians.
Hessians are recoved using double :math:`L^2`
projection, which is implemented using a
mixed finite element method.
It is recommended that gradients and Hessians
are sought in :math:`\mathbb P1` space of
appropriate dimension.
"""
field_2d = FiredrakeScalarExpression(
Constant(0.0), help='Field to be recovered').tag(config=True)
@unfrozen
@PETSc.Log.EventDecorator("thetis.HessianRecoverer2D.__init__")
def __init__(self, field_2d, hessian_2d, gradient_2d=None, **kwargs):
"""
:arg field_2d: scalar expression to recover the Hessian of.
:arg hessian_2d: :class:`Function` to hold recovered Hessian.
:kwarg gradient_2d: :class:`Function` to hold recovered gradient.
:kwargs: to be passed to the :class:`LinearVariationalSolver`.
"""
self.field_2d = field_2d
self.hessian_2d = hessian_2d
self.gradient_2d = gradient_2d
Sigma = hessian_2d.function_space()
mesh = Sigma.mesh()
dim = mesh.topological_dimension()
if dim != 2:
raise ValueError(f'Dimension {dim} not supported')
n = FacetNormal(mesh)
# Extract function spaces
if element_continuity(Sigma.ufl_element()).horizontal != 'cg':
raise ValueError('Hessians must be calculated in a continuous space')
if Sigma.dof_dset.dim != (2, 2):
raise ValueError('Expecting a square tensor function')
if gradient_2d is None:
V = get_functionspace(mesh, 'CG', 1, vector=True)
else:
V = gradient_2d.function_space()
if element_continuity(V.ufl_element()).horizontal != 'cg':
raise ValueError('Gradients must be calculated in a continuous space')
if V.dof_dset.dim != (2,):
raise ValueError('Expecting a 2D vector function')
# Setup function spaces
W = V*Sigma
g, H = TrialFunctions(W)
phi, tau = TestFunctions(W)
sol = Function(W)
self._gradient, self._hessian = sol.subfunctions
# The formulation is chosen such that f does not need to have any
# finite element derivatives
a = inner(tau, H)*dx \
+ inner(div(tau), g)*dx \
+ inner(phi, g)*dx \
- dot(g, dot(tau, n))*ds \
- dot(avg(g), jump(tau, n))*dS
L = self.field_2d*dot(phi, n)*ds \
+ avg(self.field_2d)*jump(phi, n)*dS \
- self.field_2d*div(phi)*dx
# Apply stationary preconditioners in the Schur complement to get away
# with applying GMRES to the whole mixed system
sp = {
"mat_type": "aij",
"ksp_type": "gmres",
"ksp_max_it": 20,
"pc_type": "fieldsplit",
"pc_fieldsplit_type": "schur",
"pc_fieldsplit_0_fields": "1",
"pc_fieldsplit_1_fields": "0",
"pc_fieldsplit_schur_precondition": "selfp",
"fieldsplit_0_ksp_type": "preonly",
"fieldsplit_1_ksp_type": "preonly",
"fieldsplit_1_pc_type": "gamg",
"fieldsplit_1_mg_levels_ksp_max_it": 5,
}
if COMM_WORLD.size == 1:
sp["fieldsplit_0_pc_type"] = "ilu"
sp["fieldsplit_1_mg_levels_pc_type"] = "ilu"
else:
sp["fieldsplit_0_pc_type"] = "bjacobi"
sp["fieldsplit_0_sub_ksp_type"] = "preonly"
sp["fieldsplit_0_sub_pc_type"] = "ilu"
sp["fieldsplit_1_mg_levels_pc_type"] = "bjacobi"
sp["fieldsplit_1_mg_levels_sub_ksp_type"] = "preonly"
sp["fieldsplit_1_mg_levels_sub_pc_type"] = "ilu"
# Setup solver
prob = LinearVariationalProblem(a, L, sol)
kwargs.setdefault('solver_parameters', sp)
self.solver = LinearVariationalSolver(prob, **kwargs)
[docs]
@PETSc.Log.EventDecorator("thetis.HessianRecoverer2D.solve")
def solve(self):
self.solver.solve()
self.hessian_2d.assign(self._hessian)
if self.gradient_2d is not None:
self.gradient_2d.assign(self._gradient)
[docs]
class KineticEnergyCalculator(DiagnosticCalculator):
r"""
Class for calculating dynamic pressure (i.e. kinetic energy),
.. math::
E_K = \frac12 \rho \|\mathbf{u}\|^2,
where :math:`\rho` is the water density and :math:`\mathbf{u}`
is the velocity.
"""
density = FiredrakeScalarExpression(
physical_constants['rho0'], help='Fluid density').tag(config=True)
@unfrozen
@PETSc.Log.EventDecorator("thetis.KineticEnergyCalculator.__init__")
def __init__(self, uv, ke, density=None, horizontal=False, project=False):
"""
:arg uv: scalar expression for the fluid velocity.
:arg ke: :class:`Function` to hold calculated kinetic energy.
:kwarg density: fluid density.
:kwarg horizontal: consider the horizontal components of velocity only?
:kwarg project: project, rather than interpolate?
"""
if density is not None:
self.density = density
self.ke = ke
u_sq = uv[0]*uv[0] + uv[1]*uv[1] if horizontal else dot(uv, uv)
self.ke_expr = 0.5*self.density*u_sq
if project:
self.projector = Projector(self.ke_expr, self.ke)
else:
self.interpolator = Interpolator(self.ke_expr, self.ke)
[docs]
@PETSc.Log.EventDecorator("thetis.KineticEnergyCalculator.solve")
def solve(self):
if hasattr(self, 'projector'):
self.projector.project()
else:
assert hasattr(self, 'interpolator')
self.interpolator.interpolate()
class DualWeightedResidual2D(DiagnosticCalculator):
r"""
Class for computing contributions to dual weighted residual (DWR)
error indicators.
Suppose we have a weak formulation
.. math::
F(u_h; v) = 0,\quad\forall v\in V,
where :math:`F(u_h;\cdot)` is the weak residual of the forward PDE
and :math:`u_h` is its weak solution. The DWR is obtained by
replacing the test function :math:`v` with the (exact) adjoint
solution :math:`u^*`.
In practice, we do not have the exact adjoint solution, so it is
common practice to approximate it in some enriched finite element
space.
"""
__metaclass__ = ABCMeta
error = None
@unfrozen
@PETSc.Log.EventDecorator("thetis.DualWeightedResidual.__init__")
def __init__(self, solver_obj, dual):
"""
:arg solver_obj: :class:`FlowSolver2d` instance
:arg dual: a :class:`Function` that approximates the true adjoint solution,
which will replace the test function
"""
mesh2d = solver_obj.mesh2d
if mesh2d.topological_dimension() != 2:
dim = mesh2d.topological_dimension()
raise ValueError(f"Expected a mesh of dimension 2, not {dim}")
if mesh2d != dual.ufl_domain():
raise ValueError(f"Mismatching meshes ({mesh2d} vs {func.ufl_domain()})")
self.F = replace(self.form, {TestFunction(self.space): dual})
@abstractmethod
def form(self):
pass
@abstractmethod
def space(self):
pass
@PETSc.Log.EventDecorator("thetis.DualWeightedResidual.solve")
def solve(self):
self.error = form2indicator(self.F)
[docs]
class ShallowWaterDualWeightedResidual2D(DualWeightedResidual2D):
"""
Class for computing dual weighted residual contributions
for the shallow water equations.
"""
def __init__(self, solver_obj, dual):
"""
:arg solver_obj: :class:`FlowSolver2d` instance
:arg dual: a :class:`Function` that approximates the true adjoint solution,
which will replace the test function
"""
self.solver_obj = solver_obj
options = solver_obj.options
if options.swe_timestepper_type not in ("SteadyState", "CrankNicolson"):
typ = options.swe_timestepper_type
raise NotImplementedError(f"Error indication not yet supported for {typ}")
super().__init__(solver_obj, dual)
@property
def form(self):
ts = self.solver_obj.timestepper
return ts.F if not hasattr(ts, "timesteppers") else ts.timesteppers.swe2d.F
@property
def space(self):
return self.solver_obj.function_spaces.V_2d
[docs]
class TracerDualWeightedResidual2D(DualWeightedResidual2D):
"""
Class for computing dual weighted residual contributions
for 2D tracer transport problems.
"""
def __init__(self, solver_obj, dual, label="tracer_2d"):
"""
:arg solver_obj: :class:`FlowSolver2d` instance
:arg dual: a :class:`Function` that approximates the true adjoint solution,
which will replace the test function
"""
self.solver_obj = solver_obj
self.label = label
options = solver_obj.options
if options.tracer_timestepper_type not in ("SteadyState", "CrankNicolson"):
typ = options.tracer_timestepper_type
raise NotImplementedError(f"Error indication not yet supported for {typ}")
super().__init__(solver_obj, dual)
@property
def form(self):
return self.solver_obj.timestepper.timesteppers[self.label].F
@property
def space(self):
return self.solver_obj.function_spaces.Q_2d