Source code for thetis.tracer_eq_2d

r"""
2D advection diffusion equation for tracers.

The advection-diffusion equation of tracer :math:`T` in non-conservative form reads

.. math::
    \frac{\partial T}{\partial t}
    + \nabla_h \cdot (\textbf{u} T)
    = \nabla_h \cdot (\mu_h \nabla_h T)
    :label: tracer_eq_2d

where :math:`\nabla_h` denotes horizontal gradient, :math:`\textbf{u}` are the horizontal
velocities, and
:math:`\mu_h` denotes horizontal diffusivity.

The advection-diffusion equation of depth-integrated tracer :math:`q=HT` in conservative form reads

.. math::
    \frac{\partial q}{\partial t}
    + \nabla_h \cdot (\textbf{u} q)
    = \nabla_h \cdot (\mu_h \nabla_h q)
    :label: cons_tracer_eq_2d

where :math:`\nabla_h` denotes horizontal gradient, :math:`\textbf{u}` are the horizontal
velocities, and
:math:`\mu_h` denotes horizontal diffusivity.
"""
from .utility import *
from .equation import Term, Equation

__all__ = [
    'TracerEquation2D',
    'TracerTerm',
    'HorizontalAdvectionTerm',
    'HorizontalDiffusionTerm',
    'SourceTerm',
    'ConservativeTracerTerm',
    'ConservativeHorizontalAdvectionTerm',
    'ConservativeHorizontalDiffusionTerm',
    'ConservativeSourceTerm',
]


[docs] class TracerTerm(Term): """ Generic tracer term that provides commonly used members and mapping for boundary functions. """ def __init__(self, index, label, function_space, depth, options, test_function=None, trial_function=None): """ :arg index: index for this tracer component within the vector :arg label: label for this tracer component :arg function_space: :class:`FunctionSpace` where the solution belongs :arg depth: :class:`DepthExpression` containing depth info :arg options: :class`ModelOptions2d` containing parameters :kwarg test_function: custom :class:`TestFunction` :kwarg trial_function: custom :class:`TrialFunction` """ super(TracerTerm, self).__init__(function_space, test_function=test_function, trial_function=trial_function) self.index = index self.label = label self.depth = depth self.options = options self.cellsize = CellSize(self.mesh) continuity = element_continuity(self.function_space.ufl_element()) self.horizontal_dg = continuity.horizontal == 'dg' # define measures with a reasonable quadrature degree p = self.function_space.ufl_element().degree() self.quad_degree = 2*p + 1 self.dx = dx(degree=self.quad_degree) self.dS = dS(degree=self.quad_degree) self.ds = ds(degree=self.quad_degree)
[docs] def get_bnd_functions(self, c_in, uv_in, elev_in, bnd_id, bnd_conditions): """ Returns external values of tracer and uv for all supported boundary conditions. Volume flux (flux) and normal velocity (un) are defined positive out of the domain. :arg c_in: Internal value of tracer :arg uv_in: Internal value of horizontal velocity :arg elev_in: Internal value of elevation :arg bnd_id: boundary id :type bnd_id: int :arg bnd_conditions: dict of boundary conditions: ``{bnd_id: {field: value, ...}, ...}`` """ funcs = bnd_conditions.get(bnd_id) if 'elev' in funcs: elev_ext = funcs['elev'] else: elev_ext = elev_in if 'value' in funcs: c_ext = funcs['value'] else: c_ext = c_in if 'uv' in funcs: uv_ext = self.corr_factor * funcs['uv'] elif 'flux' in funcs: h_ext = self.depth.get_total_depth(elev_ext) area = h_ext*self.boundary_len[bnd_id] # NOTE using external data only uv_ext = self.corr_factor * funcs['flux']/area*self.normal elif 'un' in funcs: uv_ext = funcs['un']*self.normal else: uv_ext = uv_in return c_ext, uv_ext, elev_ext
[docs] def component(self, f): """ Return the component of a function corresponding to this tracer. """ return f if len(f.dof_dset.dim) == 1 else split(f)[self.index]
[docs] class HorizontalAdvectionTerm(TracerTerm): r""" Advection of tracer term, :math:`\bar{\textbf{u}} \cdot \nabla T` The weak form is .. math:: \int_\Omega \bar{\textbf{u}} \cdot \boldsymbol{\psi} \cdot \nabla T dx = - \int_\Omega \nabla_h \cdot (\bar{\textbf{u}} \boldsymbol{\psi}) \cdot T dx + \int_\Gamma \text{avg}(T) \cdot \text{jump}(\boldsymbol{\psi} (\bar{\textbf{u}}\cdot\textbf{n})) dS where the right hand side has been integrated by parts; :math:`\textbf{n}` is the unit normal of the element interfaces, and :math:`\text{jump}` and :math:`\text{avg}` denote the jump and average operators across the interface. For the continuous Galerkin method we use .. math:: \int_\Omega \bar{\textbf{u}} \cdot \boldsymbol{\psi} \cdot \nabla T dx = - \int_\Omega \nabla_h \cdot (\bar{\textbf{u}} \boldsymbol{\psi}) \cdot T dx. """
[docs] def residual(self, solution, solution_old, fields, fields_old, bnd_conditions): if fields_old.get('uv_2d') is None: return 0 elev = fields_old['elev_2d'] self.corr_factor = fields_old.get('tracer_advective_velocity_factor') c = self.component(solution) uv = self.corr_factor * fields_old['uv_2d'] # FIXME is this an option? lax_friedrichs_factor = fields_old.get('lax_friedrichs_tracer_scaling_factor') f = 0 f += -(Dx(uv[0] * self.test, 0) * c + Dx(uv[1] * self.test, 1) * c) * self.dx if self.horizontal_dg: # add interface term uv_av = avg(uv) un_av = (uv_av[0]*self.normal('-')[0] + uv_av[1]*self.normal('-')[1]) s = 0.5*(sign(un_av) + 1.0) c_up = c('-')*s + c('+')*(1-s) f += c_up*(jump(self.test, uv[0] * self.normal[0]) + jump(self.test, uv[1] * self.normal[1])) * self.dS # Lax-Friedrichs stabilization if self.options.use_lax_friedrichs_tracer: gamma = 0.5*abs(un_av)*lax_friedrichs_factor f += gamma*dot(jump(self.test), jump(c))*self.dS for bnd_marker in self.boundary_markers: funcs = bnd_conditions.get(bnd_marker) ds_bnd = ds(int(bnd_marker), degree=self.quad_degree) c_in = c if funcs is not None: c_ext, uv_ext, eta_ext = self.get_bnd_functions(c_in, uv, elev, bnd_marker, bnd_conditions) uv_av = 0.5*(uv + uv_ext) un_av = self.normal[0]*uv_av[0] + self.normal[1]*uv_av[1] s = 0.5*(sign(un_av) + 1.0) c_up = c_in*s + c_ext*(1-s) f += c_up*(uv_av[0]*self.normal[0] + uv_av[1]*self.normal[1])*self.test*ds_bnd else: f += c_in * (uv[0]*self.normal[0] + uv[1]*self.normal[1])*self.test*ds_bnd return -f
[docs] class HorizontalDiffusionTerm(TracerTerm): r""" Horizontal diffusion term :math:`-\nabla_h \cdot (\mu_h \nabla_h T)` Using the symmetric interior penalty method the weak form becomes .. math:: -\int_\Omega \nabla_h \cdot (\mu_h \nabla_h T) \phi dx =& \int_\Omega \mu_h (\nabla_h \phi) \cdot (\nabla_h T) dx \\ &- \int_{\mathcal{I}_h\cup\mathcal{I}_v} \text{jump}(\phi \textbf{n}_h) \cdot \text{avg}(\mu_h \nabla_h T) dS - \int_{\mathcal{I}_h\cup\mathcal{I}_v} \text{jump}(T \textbf{n}_h) \cdot \text{avg}(\mu_h \nabla \phi) dS \\ &+ \int_{\mathcal{I}_h\cup\mathcal{I}_v} \sigma \text{avg}(\mu_h) \text{jump}(T \textbf{n}_h) \cdot \text{jump}(\phi \textbf{n}_h) dS \\ &- \int_\Gamma \mu_h (\nabla_h \phi) \cdot \textbf{n}_h ds where :math:`\sigma` is a penalty parameter, see Hillewaert (2013). For the continuous Galerkin method we use .. math:: -\int_\Omega \nabla_h \cdot (\mu_h \nabla_h T) \phi dx = \int_\Omega \mu_h (\nabla_h \phi) \cdot (\nabla_h T) dx. Hillewaert, Koen (2013). Development of the discontinuous Galerkin method for high-resolution, large scale CFD and acoustics in industrial geometries. PhD Thesis. Université catholique de Louvain. https://dial.uclouvain.be/pr/boreal/object/boreal:128254/ """
[docs] def residual(self, solution, solution_old, fields, fields_old, bnd_conditions): diffusivity_h = fields_old.get(f'diffusivity_h-{self.label}') if diffusivity_h is None: return 0 c = self.component(solution) diff_tensor = as_matrix([[diffusivity_h, 0, ], [0, diffusivity_h, ]]) grad_test = grad(self.test) diff_flux = dot(diff_tensor, grad(c)) sipg_factor = self.options.sipg_factor_tracer f = 0 f += inner(grad_test, diff_flux)*self.dx if self.horizontal_dg: cell = self.mesh.ufl_cell() p = self.function_space.ufl_element().degree() cp = (p + 1) * (p + 2) / 2 if cell == triangle else (p + 1)**2 l_normal = CellVolume(self.mesh) / FacetArea(self.mesh) # by default the factor is multiplied by 2 to ensure convergence sigma = sipg_factor * cp / l_normal sp = sigma('+') sm = sigma('-') sigma_max = conditional(sp > sm, sp, sm) ds_interior = self.dS f += sigma_max * inner( jump(self.test, self.normal), dot(avg(diff_tensor), jump(c, self.normal)))*ds_interior f += -inner(avg(dot(diff_tensor, grad(self.test))), jump(c, self.normal))*ds_interior f += -inner(jump(self.test, self.normal), avg(dot(diff_tensor, grad(c))))*ds_interior for bnd_marker in self.boundary_markers: funcs = bnd_conditions.get(bnd_marker) ds_bnd = ds(int(bnd_marker), degree=self.quad_degree) c_in = c elev = fields_old['elev_2d'] self.corr_factor = fields_old.get('tracer_advective_velocity_factor') uv = self.corr_factor * fields_old['uv_2d'] if funcs is not None: if 'diff_flux' in funcs: f += -self.test*funcs['diff_flux']*ds_bnd else: c_ext, uv_ext, eta_ext = self.get_bnd_functions(c_in, uv, elev, bnd_marker, bnd_conditions) uv_av = 0.5*(uv + uv_ext) un_av = self.normal[0]*uv_av[0] + self.normal[1]*uv_av[1] s = 0.5*(sign(un_av) + 1.0) c_up = c_in*s + c_ext*(1-s) diff_flux_up = dot(diff_tensor, grad(c_up)) f += -self.test*dot(diff_flux_up, self.normal)*ds_bnd return -f
[docs] class SourceTerm(TracerTerm): r""" Generic source term The weak form reads .. math:: F_s = \int_\Omega \sigma \phi dx where :math:`\sigma` is a user defined scalar :class:`Function`. """
[docs] def residual(self, solution, solution_old, fields, fields_old, bnd_conditions): f = 0 source = fields_old.get(f'source-{self.label}') if source is not None: f += -inner(source, self.test)*self.dx return -f
[docs] class ConservativeTracerTerm(TracerTerm): """ Generic depth-integrated tracer term that provides commonly used members and mapping for boundary functions. """ def __init__(self, index, label, function_space, depth, options, test_function=None, trial_function=None): """ :arg index: index for this tracer component within the vector :arg label: label for this tracer component :arg function_space: :class:`FunctionSpace` where the solution belongs :arg depth: :class: `DepthExpression` containing depth info :arg options: :class`ModelOptions2d` containing parameters :kwarg test_function: custom :class:`TestFunction`. :kwarg trial_function: custom :class:`TrialFunction`. """ super(ConservativeTracerTerm, self).__init__(index, label, function_space, depth, options, test_function=test_function, trial_function=trial_function)
# TODO: at the moment this is the same as TracerTerm, but we probably want to overload its # get_bnd_functions method
[docs] class ConservativeHorizontalAdvectionTerm(ConservativeTracerTerm): r""" Advection of tracer term, :math:`\nabla \cdot \bar{\textbf{u}} \nabla q` The weak form is .. math:: \int_\Omega \boldsymbol{\psi} \nabla\cdot \bar{\textbf{u}} \nabla q dx = - \int_\Omega \left(\nabla_h \boldsymbol{\psi})\right) \cdot \bar{\textbf{u}} \cdot q dx + \int_\Gamma \text{avg}(q\bar{\textbf{u}}\cdot\textbf{n}) \cdot \text{jump}(\boldsymbol{\psi}) dS where the right hand side has been integrated by parts; :math:`\textbf{n}` is the unit normal of the element interfaces, and :math:`\text{jump}` and :math:`\text{avg}` denote the jump and average operators across the interface. """
[docs] def residual(self, solution, solution_old, fields, fields_old, bnd_conditions=None): if fields_old.get('uv_2d') is None: return 0 elev = fields_old['elev_2d'] self.corr_factor = fields_old.get('tracer_advective_velocity_factor') c = self.component(solution) uv = self.corr_factor * fields_old['uv_2d'] uv_p1 = fields_old.get('uv_p1') uv_mag = fields_old.get('uv_mag') lax_friedrichs_factor = fields_old.get('lax_friedrichs_tracer_scaling_factor') f = 0 f += -(Dx(self.test, 0) * uv[0] * c + Dx(self.test, 1) * uv[1] * c) * self.dx if self.horizontal_dg: # add interface term uv_av = avg(uv) un_av = (uv_av[0]*self.normal('-')[0] + uv_av[1]*self.normal('-')[1]) s = 0.5*(sign(un_av) + 1.0) flux_up = c('-')*uv('-')*s + c('+')*uv('+')*(1-s) f += (flux_up[0] * jump(self.test, self.normal[0]) + flux_up[1] * jump(self.test, self.normal[1])) * self.dS # Lax-Friedrichs stabilization if self.options.use_lax_friedrichs_tracer: if uv_p1 is not None: gamma = 0.5*abs((avg(uv_p1)[0]*self.normal('-')[0] + avg(uv_p1)[1]*self.normal('-')[1]))*lax_friedrichs_factor elif uv_mag is not None: gamma = 0.5*avg(uv_mag)*lax_friedrichs_factor else: gamma = 0.5*abs(un_av)*lax_friedrichs_factor f += gamma*dot(jump(self.test), jump(c))*self.dS if bnd_conditions is not None: for bnd_marker in self.boundary_markers: funcs = bnd_conditions.get(bnd_marker) ds_bnd = ds(int(bnd_marker), degree=self.quad_degree) c_in = c if funcs is not None: c_ext, uv_ext, eta_ext = self.get_bnd_functions(c_in, uv, elev, bnd_marker, bnd_conditions) uv_av = 0.5*(uv + uv_ext) un_av = self.normal[0]*uv_av[0] + self.normal[1]*uv_av[1] s = 0.5*(sign(un_av) + 1.0) flux_up = c_in*uv*s + c_ext*uv_ext*(1-s) f += (flux_up[0]*self.normal[0] + flux_up[1]*self.normal[1])*self.test*ds_bnd else: f += c_in * (uv[0]*self.normal[0] + uv[1]*self.normal[1])*self.test*ds_bnd return -f
[docs] class ConservativeHorizontalDiffusionTerm(ConservativeTracerTerm, HorizontalDiffusionTerm): r""" Horizontal diffusion term :math:`-\nabla_h \cdot (\mu_h \nabla_h q)` Using the symmetric interior penalty method the weak form becomes .. math:: -\int_\Omega \nabla_h \cdot (\mu_h \nabla_h q) \phi dx =& \int_\Omega \mu_h (\nabla_h \phi) \cdot (\nabla_h q) dx \\ &- \int_{\mathcal{I}_h\cup\mathcal{I}_v} \text{jump}(\phi \textbf{n}_h) \cdot \text{avg}(\mu_h \nabla_h q) dS - \int_{\mathcal{I}_h\cup\mathcal{I}_v} \text{jump}(q \textbf{n}_h) \cdot \text{avg}(\mu_h \nabla \phi) dS \\ &+ \int_{\mathcal{I}_h\cup\mathcal{I}_v} \sigma \text{avg}(\mu_h) \text{jump}(q \textbf{n}_h) \cdot \text{jump}(\phi \textbf{n}_h) dS \\ &- \int_\Gamma \mu_h (\nabla_h \phi) \cdot \textbf{n}_h ds where :math:`\sigma` is a penalty parameter, see Hillewaert (2013). Hillewaert, Koen (2013). Development of the discontinuous Galerkin method for high-resolution, large scale CFD and acoustics in industrial geometries. PhD Thesis. Université catholique de Louvain. https://dial.uclouvain.be/pr/boreal/object/boreal:128254/ """
# TODO: at the moment the same as HorizontalDiffusionTerm # do we need additional H-derivative term? # would also become different if ConservativeTracerTerm gets different bc options
[docs] class ConservativeSourceTerm(ConservativeTracerTerm): r""" Generic source term The weak form reads .. math:: F_s = \int_\Omega \sigma \phi dx where :math:`\sigma` is a user defined scalar :class:`Function`. """
[docs] def residual(self, solution, solution_old, fields, fields_old, bnd_conditions=None): f = 0 source = fields_old.get(f'source-{self.label}') if source is not None: H = self.depth.get_total_depth(fields_old['elev_2d']) f += -inner(H*source, self.test)*self.dx return -f
[docs] class TracerEquation2D(Equation): """ Several 2D tracer advection-diffusion equations :eq:`tracer_eq` in conservative or non-conservative form, solved as a mixed system. """ def __init__(self, system, function_space, depth, options, velocity): """ :arg system: comma separated tracer fields under consideration :arg function_space: :class:`FunctionSpace` where the solution belongs :arg depth: :class: `DepthExpression` containing depth info :arg options: :class`ModelOptions2d` containing parameters :arg velocity: velocity field associated with the shallow water model """ super().__init__(function_space) self.depth = depth self.options = options self.velocity = velocity test_functions = TestFunctions(function_space) trial_functions = TrialFunctions(function_space) for i, label in enumerate(system.split(',')): args = (i, label, function_space[i], depth, options) kwargs = {'trial_function': trial_functions[i]} if options.use_supg_tracer: # TODO: allow SUPG for only some fields kwargs['test_function'] = self.apply_supg(test_functions[i]) else: kwargs['test_function'] = test_functions[i] if options.tracer[label].use_conservative_form: self.add_conservative_terms(*args, **kwargs) else: self.add_nonconservative_terms(*args, **kwargs)
[docs] def add_nonconservative_terms(self, *args, **kwargs): self.add_term(HorizontalAdvectionTerm(*args, **kwargs), 'explicit', args[1]) self.add_term(HorizontalDiffusionTerm(*args, **kwargs), 'explicit', args[1]) self.add_term(SourceTerm(*args, **kwargs), 'source', args[1])
[docs] def add_conservative_terms(self, *args, **kwargs): self.add_term(ConservativeHorizontalAdvectionTerm(*args, **kwargs), 'explicit', args[1]) self.add_term(ConservativeHorizontalDiffusionTerm(*args, **kwargs), 'explicit', args[1]) self.add_term(ConservativeSourceTerm(*args, **kwargs), 'source', args[1])
[docs] def apply_supg(self, test_function): """ Apply SUPG stabilization in the sense of modifying the test function. """ unorm = self.options.horizontal_velocity_scale self.cellsize = anisotropic_cell_size(self.function_space.mesh()) tau = 0.5*self.cellsize/unorm D = self.options.horizontal_diffusivity_scale Pe = 0.5*unorm*self.cellsize/D tau = conditional(D > 0, min_value(tau, Pe/3), tau) inc = tau*dot(self.velocity, grad(self.test)) return test_function + conditional(unorm > 0, inc, 0)