"""
Module for 2D depth averaged solver
"""
from .utility import *
from . import shallowwater_eq
from . import timeintegrator
from . import rungekutta
from . import implicitexplicit
from . import coupled_timeintegrator_2d
from . import tracer_eq_2d
from . import sediment_eq_2d
from . import exner_eq
import weakref
import time as time_mod
from mpi4py import MPI
from . import exporter
from .turbines import TidalTurbineFarm, DiscreteTidalTurbineFarm
from .field_defs import field_metadata
from .options import ModelOptions2d
from . import callback
from .log import *
from collections import OrderedDict
import thetis.limiter as limiter
import numpy
import datetime
[docs]
class FlowSolver2d(FrozenClass):
"""
Main object for 2D depth averaged solver
**Example**
Create mesh
.. code-block:: python
from thetis import *
mesh2d = RectangleMesh(20, 20, 10e3, 10e3)
Create bathymetry function, set a constant value
.. code-block:: python
fs_p1 = FunctionSpace(mesh2d, 'CG', 1)
bathymetry_2d = Function(fs_p1, name='Bathymetry').assign(10.0)
Create solver object and set some options
.. code-block:: python
solver_obj = solver2d.FlowSolver2d(mesh2d, bathymetry_2d)
options = solver_obj.options
options.element_family = 'dg-dg'
options.polynomial_degree = 1
options.swe_timestepper_type = 'CrankNicolson'
options.simulation_export_time = 50.0
options.simulation_end_time = 3600.
options.timestep = 25.0
Assign initial condition for water elevation
.. code-block:: python
solver_obj.create_function_spaces()
init_elev = Function(solver_obj.function_spaces.H_2d)
coords = SpatialCoordinate(mesh2d)
init_elev.project(exp(-((coords[0] - 4e3)**2 + (coords[1] - 4.5e3)**2)/2.2e3**2))
solver_obj.assign_initial_conditions(elev=init_elev)
Run simulation
.. code-block:: python
solver_obj.iterate()
See the manual for more complex examples.
"""
@unfrozen
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.__init__")
def __init__(self, mesh2d, bathymetry_2d, options=None, keep_log=False):
"""
:arg mesh2d: :class:`Mesh` object of the 2D mesh
:arg bathymetry_2d: Bathymetry of the domain. Bathymetry stands for
the mean water depth (positive downwards).
:type bathymetry_2d: :class:`Function`
:kwarg options: Model options (optional). Model options can also be
changed directly via the :attr:`.options` class property.
:type options: :class:`.ModelOptions2d` instance
:kwarg bool keep_log: append to an existing log file, or overwrite it?
"""
self._initialized = False
self.mesh2d = mesh2d
self.comm = mesh2d.comm
# add boundary length info
bnd_len = compute_boundary_length(self.mesh2d)
self.mesh2d.boundary_len = bnd_len
self.dt = None
"""Time step"""
self.options = ModelOptions2d()
"""
Dictionary of all options. A :class:`.ModelOptions2d` object.
"""
if options is not None:
self.options.update(options)
# simulation time step bookkeeping
self.simulation_time = 0
self.iteration = 0
self.i_export = 0
self.next_export_t = self.simulation_time + self.options.simulation_export_time
self.callbacks = callback.CallbackManager()
"""
:class:`.CallbackManager` object that stores all callbacks
"""
self.fields = FieldDict()
"""
:class:`.FieldDict` that holds all functions needed by the solver
object
"""
self.function_spaces = AttrDict()
"""
:class:`.AttrDict` that holds all function spaces needed by the
solver object
"""
self.fields.bathymetry_2d = bathymetry_2d
self.export_initial_state = True
"""Do export initial state. False if continuing a simulation"""
self.sediment_model = None
"""set up option for sediment model"""
self.bnd_functions = {'shallow_water': {}, 'tracer': {}, 'sediment': {}}
if 'tracer_2d' in field_metadata:
field_metadata.pop('tracer_2d')
self.solve_tracer = False
self.keep_log = keep_log
self._field_preproc_funcs = {}
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.compute_time_step")
def compute_time_step(self, u_scale=Constant(0.0)):
r"""
Computes maximum explicit time step from CFL condition.
.. math :: \Delta t = \frac{\Delta x}{U}
Assumes velocity scale :math:`U = \sqrt{g H} + U_{scale}` where
:math:`U_{scale}` is estimated advective velocity.
:kwarg u_scale: User provided maximum advective velocity scale
:type u_scale: float or :class:`Constant`
"""
csize = self.fields.h_elem_size_2d
bath = self.fields.bathymetry_2d
fs = bath.function_space()
bath_pos = Function(fs, name='bathymetry')
bath_pos.assign(bath)
min_depth = 0.05
bath_pos.dat.data[bath_pos.dat.data < min_depth] = min_depth
test = TestFunction(fs)
trial = TrialFunction(fs)
solution = Function(fs)
g = physical_constants['g_grav']
u = (sqrt(g * bath_pos) + u_scale)
a = inner(test, trial) * dx
l = inner(test, csize / u) * dx
solve(a == l, solution)
return solution
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.compute_mesh_stats")
def compute_mesh_stats(self):
"""
Computes number of elements, nodes etc and prints to sdtout
"""
nnodes = self.function_spaces.P1_2d.dim()
P1DG_2d = self.function_spaces.P1DG_2d
nelem2d = int(P1DG_2d.dim()/P1DG_2d.ufl_cell().num_vertices())
dofs_elev2d = self.function_spaces.H_2d.dim()
dofs_u2d = self.function_spaces.U_2d.dim()
dofs_tracer2d = self.function_spaces.Q_2d.dim()
dofs_elev2d_core = int(dofs_elev2d/self.comm.size)
dofs_tracer2d_core = int(dofs_tracer2d/self.comm.size)
min_h_size = self.comm.allreduce(self.fields.h_elem_size_2d.dat.data.min(), MPI.MIN)
max_h_size = self.comm.allreduce(self.fields.h_elem_size_2d.dat.data.max(), MPI.MAX)
if not self.options.tracer_only:
print_output(f'Element family: {self.options.element_family}, degree: {self.options.polynomial_degree}')
if self.solve_tracer:
print_output(f'Tracer element family: {self.options.tracer_element_family}, degree: 1')
print_output(f'2D cell type: {self.mesh2d.ufl_cell()}')
print_output(f'2D mesh: {nnodes} vertices, {nelem2d} elements')
print_output(f'Horizontal element size: {min_h_size:.2f} ... {max_h_size:.2f} m')
if not self.options.tracer_only:
print_output(f'Number of 2D elevation DOFs: {dofs_elev2d}')
print_output(f'Number of 2D velocity DOFs: {dofs_u2d}')
if self.solve_tracer:
print_output(f'Number of 2D tracer DOFs: {dofs_tracer2d}')
print_output(f'Number of cores: {self.comm.size}')
if not self.options.tracer_only:
print_output(f'Elevation DOFs per core: ~{dofs_elev2d_core:.1f}')
if self.solve_tracer:
print_output(f'Tracer DOFs per core: ~{dofs_tracer2d_core:.1f}')
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.set_time_step")
def set_time_step(self, alpha=0.05):
"""
Sets the model the model time step
If the time integrator supports automatic time step, and
:attr:`ModelOptions2d.timestepper_options.use_automatic_timestep` is
`True`, we compute the maximum time step allowed by the CFL condition.
Otherwise uses :attr:`ModelOptions2d.timestep`.
:kwarg float alpha: CFL number scaling factor
"""
automatic_timestep = False
sed_options = self.options.sediment_model_options
ts_options = (
self.options.swe_timestepper_options, self.options.tracer_timestepper_options,
sed_options.sediment_timestepper_options, sed_options.exner_timestepper_options,
self.options.nh_model_options.free_surface_timestepper_options,
)
for options in ts_options:
if hasattr(options, 'use_automatic_timestep') and options.use_automatic_timestep:
automatic_timestep = True
# TODO revisit math alpha is OBSOLETE
if automatic_timestep:
mesh2d_dt = self.compute_time_step(u_scale=self.options.horizontal_velocity_scale)
dt = self.options.cfl_2d*alpha*float(mesh2d_dt.dat.data.min())
dt = self.comm.allreduce(dt, op=MPI.MIN)
self.dt = dt
else:
assert self.options.timestep is not None
assert self.options.timestep > 0.0
self.dt = self.options.timestep
if self.comm.rank == 0:
print_output('dt = {:}'.format(self.dt))
sys.stdout.flush()
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.set_wetting_and_drying_alpha")
def set_wetting_and_drying_alpha(self):
r"""
Compute a wetting and drying parameter :math:`\alpha` which ensures positive water
depth using the approximate method suggested by Karna et al. (2011).
This method takes
..math::
\alpha \approx \mid L_x \nabla h \mid,
where :math:`L_x` is the horizontal length scale of the mesh elements at the wet-dry
front and :math:`h` is the bathymetry profile.
This expression is interpolated into :math:`\mathbb P1` space in order to remove noise. Note
that we use the `interpolate` method, rather than the `project` method, in order to avoid
introducing new extrema.
NOTE: The minimum and maximum values at which to cap the alpha parameter may be specified via
:attr:`ModelOptions2d.wetting_and_drying_alpha_min` and
:attr:`ModelOptions2d.wetting_and_drying_alpha_max`.
"""
if not self.options.use_wetting_and_drying:
return
if self.options.use_automatic_wetting_and_drying_alpha:
min_alpha = self.options.wetting_and_drying_alpha_min
max_alpha = self.options.wetting_and_drying_alpha_max
# Take the dot product and threshold it
alpha = dot(get_cell_widths_2d(self.mesh2d), abs(grad(self.fields.bathymetry_2d)))
if max_alpha is not None:
alpha = min_value(max_alpha, alpha)
if min_alpha is not None:
alpha = max_value(min_alpha, alpha)
# Interpolate into P1 space
self.options.wetting_and_drying_alpha = Function(self.function_spaces.P1_2d)
self.options.wetting_and_drying_alpha.interpolate(alpha)
# Print to screen and check validity
alpha = self.options.wetting_and_drying_alpha
if isinstance(alpha, Constant):
msg = "Using constant wetting and drying parameter (value {:.2f})"
assert alpha.values()[0] >= 0.0
print_output(msg.format(alpha.values()[0]))
elif isinstance(alpha, Function):
msg = "Using spatially varying wetting and drying parameter (min {:.2f} max {:.2f})"
with alpha.dat.vec_ro as v:
alpha_min, alpha_max = v.min()[1], v.max()[1]
assert alpha_min >= 0.0
print_output(msg.format(alpha_min, alpha_max))
else:
msg = "Wetting and drying parameter of type '{:}' not supported"
raise TypeError(msg.format(alpha.__class__.__name__))
[docs]
@unfrozen
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.create_function_spaces")
def create_function_spaces(self):
"""
Creates function spaces
Function spaces are accessible via :attr:`.function_spaces`
object.
"""
on_the_sphere = self.mesh2d.geometric_dimension() == 3
if on_the_sphere:
assert self.options.element_family in ['rt-dg', 'bdm-dg'], \
'Spherical mesh requires \'rt-dg\' or \'bdm-dg\' ' \
'element family.'
# ----- function spaces: elev in H, uv in U, mixed is W
DG = 'DG' if self.mesh2d.ufl_cell().cellname() == 'triangle' else 'DQ'
self.function_spaces.P0_2d = get_functionspace(self.mesh2d, DG, 0, name='P0_2d')
self.function_spaces.P1_2d = get_functionspace(self.mesh2d, 'CG', 1, name='P1_2d')
self.function_spaces.P1v_2d = get_functionspace(self.mesh2d, 'CG', 1, name='P1v_2d',
vector=True)
self.function_spaces.P1DG_2d = get_functionspace(self.mesh2d, DG, 1, name='P1DG_2d')
self.function_spaces.P1DGv_2d = get_functionspace(self.mesh2d, DG, 1, name='P1DGv_2d',
vector=True)
# 2D velocity space
if self.options.element_family in ['rt-dg', 'bdm-dg']:
family_prefix = self.options.element_family.split('-')[0].upper()
family_suffix = {'triangle': 'F', 'quadrilateral': 'CF'}
cell = self.mesh2d.ufl_cell().cellname()
fam = family_prefix + family_suffix[cell]
degree = self.options.polynomial_degree + 1
self.function_spaces.U_2d = get_functionspace(self.mesh2d, fam, degree, name='U_2d')
self.function_spaces.H_2d = get_functionspace(self.mesh2d, DG, self.options.polynomial_degree, name='H_2d')
elif self.options.element_family == 'dg-cg':
self.function_spaces.U_2d = get_functionspace(self.mesh2d, DG, self.options.polynomial_degree, name='U_2d', vector=True)
self.function_spaces.H_2d = get_functionspace(self.mesh2d, 'CG', self.options.polynomial_degree+1, name='H_2d')
elif self.options.element_family == 'dg-dg':
self.function_spaces.U_2d = get_functionspace(self.mesh2d, DG, self.options.polynomial_degree, name='U_2d', vector=True)
self.function_spaces.H_2d = get_functionspace(self.mesh2d, DG, self.options.polynomial_degree, name='H_2d')
else:
raise Exception('Unsupported finite element family {:}'.format(self.options.element_family))
self.function_spaces.V_2d = MixedFunctionSpace([self.function_spaces.U_2d, self.function_spaces.H_2d])
if self.options.tracer_element_family == 'dg':
self.function_spaces.Q_2d = get_functionspace(self.mesh2d, 'DG', 1, name='Q_2d')
elif self.options.tracer_element_family == 'cg':
self.function_spaces.Q_2d = get_functionspace(self.mesh2d, 'CG', 1, name='Q_2d')
else:
raise Exception('Unsupported finite element family {:}'.format(self.options.tracer_element_family))
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.add_new_field")
def add_new_field(self, function, label, name, filename, shortname=None, unit='-', preproc_func=None):
"""
Add a field to :attr:`fields`.
:arg function: representation of the field as a :class:`Function`
:arg label: field label used internally by Thetis, e.g. 'tracer_2d'
:arg name: human readable name for the tracer field, e.g. 'Tracer concentration'
:arg filename: file name for outputs, e.g. 'Tracer2d'
:kwarg shortname: short version of name, e.g. 'Tracer'
:kwarg unit: units for field, e.g. '-'
:kwarg preproc_func: optional pre-processor function which will be called before exporting
"""
assert isinstance(function, Function)
assert isinstance(label, str)
assert isinstance(name, str)
assert isinstance(filename, str)
assert shortname is None or isinstance(shortname, str)
assert isinstance(unit, str)
assert preproc_func is None or callable(preproc_func)
assert label not in field_metadata, f"Field '{label}' already exists."
assert ' ' not in label, "Labels cannot contain spaces"
assert ' ' not in filename, "Filenames cannot contain spaces"
field_metadata[label] = {
'name': name,
'shortname': shortname or name,
'unit': unit,
'filename': filename,
}
self.fields[label] = function
if preproc_func is not None:
self._field_preproc_funcs[label] = preproc_func
[docs]
@unfrozen
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.create_fields")
def create_fields(self):
"""
Creates field Functions
"""
if not hasattr(self.function_spaces, 'U_2d'):
self.create_function_spaces()
if self.options.log_output and not self.options.no_exports:
mode = "a" if self.keep_log else "w"
set_log_directory(self.options.output_directory, mode=mode)
# Add general fields
self.fields.h_elem_size_2d = Function(self.function_spaces.P1_2d)
get_horizontal_elem_size_2d(self.fields.h_elem_size_2d)
self.set_wetting_and_drying_alpha()
self.depth = DepthExpression(self.fields.bathymetry_2d,
use_nonlinear_equations=self.options.use_nonlinear_equations,
use_wetting_and_drying=self.options.use_wetting_and_drying,
wetting_and_drying_alpha=self.options.wetting_and_drying_alpha)
# Add fields for shallow water modelling
self.fields.solution_2d = Function(self.function_spaces.V_2d, name='solution_2d')
uv_2d, elev_2d = self.fields.solution_2d.subfunctions # correct treatment of the split 2d functions
self.fields.uv_2d = uv_2d
self.fields.elev_2d = elev_2d
# Add tracer fields
self.solve_tracer = len(self.options.tracer_fields.keys()) > 0
for system, parent in self.options.tracer_fields.copy().items():
labels = system.split(',')
num_labels = len(labels)
if parent is None:
Q_2d = self.function_spaces.Q_2d
fs = Q_2d if num_labels == 1 else MixedFunctionSpace([Q_2d]*len(labels))
parent = Function(fs)
if num_labels > 1:
self.fields[system] = parent
self.options.tracer_fields[system] = parent
children = [parent] if num_labels == 1 else parent.subfunctions
for label, function in zip(labels, children):
tracer = self.options.tracer[label]
function.rename(label)
self.add_new_field(function,
label,
tracer.metadata['name'],
tracer.metadata['filename'],
shortname=tracer.metadata['shortname'],
unit=tracer.metadata['unit'])
# Add suspended sediment field
if self.options.sediment_model_options.solve_suspended_sediment:
self.fields.sediment_2d = Function(self.function_spaces.Q_2d, name='sediment_2d')
# Add fields for non-hydrostatic mode
if self.options.nh_model_options.solve_nonhydrostatic_pressure:
q_degree = self.options.polynomial_degree
if self.options.nh_model_options.q_degree is not None:
q_degree = self.options.nh_model_options.q_degree
fs_q = get_functionspace(self.mesh2d, 'CG', q_degree)
self.fields.q_2d = Function(fs_q, name='q_2d') # 2D non-hydrostatic pressure at bottom
self.fields.w_2d = Function(self.function_spaces.H_2d, name='w_2d') # depth-averaged vertical velocity
[docs]
@unfrozen
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.create_equations")
def create_equations(self):
"""
Creates equation instances
"""
if not hasattr(self.fields, 'uv_2d'):
self.create_fields()
self.equations = AttrDict()
# tidal farms, if any
if len(self.options.tidal_turbine_farms) + len(self.options.discrete_tidal_turbine_farms) > 0:
self.tidal_farms = []
p = self.function_spaces.U_2d.ufl_element().degree()
quad_degree = 2*p + 1
for subdomain, farm_options in self.options.tidal_turbine_farms.items():
fdx = dx(subdomain, degree=quad_degree)
self.tidal_farms.append(TidalTurbineFarm(farm_options.turbine_density,
fdx, farm_options))
for subdomain, farm_options in self.options.discrete_tidal_turbine_farms.items():
fdx = dx(subdomain, degree=farm_options.quadrature_degree)
self.tidal_farms.append(DiscreteTidalTurbineFarm(self.mesh2d, fdx, farm_options))
else:
self.tidal_farms = None
# Shallow water equations for hydrodynamic modelling
self.equations.sw = shallowwater_eq.ShallowWaterEquations(
self.fields.solution_2d.function_space(),
self.depth,
self.options,
tidal_farms=self.tidal_farms
)
self.equations.sw.bnd_functions = self.bnd_functions['shallow_water']
uv_2d, elev_2d = self.fields.solution_2d.subfunctions
# Passive tracer equations
for system, parent in self.options.tracer_fields.items():
self.equations[system] = tracer_eq_2d.TracerEquation2D(
system,
parent.function_space(),
self.depth,
self.options,
uv_2d,
)
# Equation for suspended sediment transport
sediment_options = self.options.sediment_model_options
if sediment_options.solve_suspended_sediment or sediment_options.solve_exner:
sediment_model_class = self.options.sediment_model_options.sediment_model_class
self.sediment_model = sediment_model_class(
self.options, self.mesh2d, uv_2d, elev_2d, self.depth)
if sediment_options.solve_suspended_sediment:
self.equations.sediment = sediment_eq_2d.SedimentEquation2D(
self.function_spaces.Q_2d, self.depth, self.options, self.sediment_model,
conservative=sediment_options.use_sediment_conservative_form)
# Exner equation for bedload transport
if sediment_options.solve_exner:
if element_continuity(self.fields.bathymetry_2d.function_space().ufl_element()).horizontal in ['cg']:
self.equations.exner = exner_eq.ExnerEquation(
self.fields.bathymetry_2d.function_space(), self.depth,
depth_integrated_sediment=sediment_options.use_sediment_conservative_form, sediment_model=self.sediment_model)
else:
raise NotImplementedError("Exner equation can currently only be implemented if the bathymetry is defined on a continuous space")
# Free surface equation for non-hydrostatic pressure
if self.options.nh_model_options.solve_nonhydrostatic_pressure:
print_output('Using non-hydrostatic pressure')
self.equations.fs = shallowwater_eq.FreeSurfaceEquation(
TestFunction(self.function_spaces.H_2d), self.function_spaces.H_2d, self.function_spaces.U_2d,
self.depth, self.options)
self.equations.fs.bnd_functions = self.bnd_functions['shallow_water']
# Setup slope limiters
if self.solve_tracer or sediment_options.solve_suspended_sediment:
if self.options.use_limiter_for_tracers and self.options.polynomial_degree > 0:
self.tracer_limiter = limiter.VertexBasedP1DGLimiter(self.function_spaces.Q_2d)
else:
self.tracer_limiter = None
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.get_swe_timestepper")
def get_swe_timestepper(self, integrator):
"""
Gets shallow water timestepper object with appropriate parameters
"""
fields = {
'linear_drag_coefficient': self.options.linear_drag_coefficient,
'quadratic_drag_coefficient': self.options.quadratic_drag_coefficient,
'manning_drag_coefficient': self.options.manning_drag_coefficient,
'nikuradse_bed_roughness': self.options.nikuradse_bed_roughness,
'viscosity_h': self.options.horizontal_viscosity,
'lax_friedrichs_velocity_scaling_factor': self.options.lax_friedrichs_velocity_scaling_factor,
'coriolis': self.options.coriolis_frequency,
'wind_stress': self.options.wind_stress,
'atmospheric_pressure': self.options.atmospheric_pressure,
'momentum_source': self.options.momentum_source_2d,
'volume_source': self.options.volume_source_2d,
}
bnd_conditions = self.bnd_functions['shallow_water']
if self.options.swe_timestepper_type == 'PressureProjectionPicard':
u_test = TestFunction(self.function_spaces.U_2d)
self.equations.mom = shallowwater_eq.ShallowWaterMomentumEquation(
u_test, self.function_spaces.U_2d, self.function_spaces.H_2d,
self.depth,
options=self.options,
tidal_farms=self.tidal_farms
)
self.equations.mom.bnd_functions = bnd_conditions
return integrator(self.equations.sw, self.equations.mom, self.fields.solution_2d,
fields, self.dt, self.options.swe_timestepper_options, bnd_conditions)
else:
return integrator(self.equations.sw, self.fields.solution_2d, fields, self.dt,
self.options.swe_timestepper_options, bnd_conditions)
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.get_tracer_timestepper")
def get_tracer_timestepper(self, integrator, system):
"""
Gets tracer timestepper object with appropriate parameters
"""
uv, elev = self.fields.solution_2d.subfunctions
fields = {
'elev_2d': elev,
'uv_2d': uv,
'lax_friedrichs_tracer_scaling_factor': self.options.lax_friedrichs_tracer_scaling_factor,
'tracer_advective_velocity_factor': self.options.tracer_advective_velocity_factor,
}
for label in system.split(','):
fields[f'diffusivity_h-{label}'] = self.options.tracer[label].diffusivity
fields[f'source-{label}'] = self.options.tracer[label].source
bcs = {}
if system in self.bnd_functions:
bcs = self.bnd_functions[system]
elif system[:-3] in self.bnd_functions:
# TODO: Is this safe for monolithic systems?
bcs = self.bnd_functions[system[:-3]]
# TODO: Different timestepper options for different systems
return integrator(self.equations[system], self.fields[system], fields, self.dt,
self.options.tracer_timestepper_options, bcs)
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.get_sediment_timestepper")
def get_sediment_timestepper(self, integrator):
"""
Gets sediment timestepper object with appropriate parameters
"""
uv, elev = self.fields.solution_2d.subfunctions
fields = {
'elev_2d': elev,
'uv_2d': uv,
'diffusivity_h-sediment_2d': self.options.sediment_model_options.horizontal_diffusivity,
'lax_friedrichs_tracer_scaling_factor': self.options.lax_friedrichs_tracer_scaling_factor,
'tracer_advective_velocity_factor': self.sediment_model.get_advective_velocity_correction_factor(),
}
return integrator(self.equations.sediment, self.fields.sediment_2d, fields, self.dt,
self.options.sediment_model_options.sediment_timestepper_options,
self.bnd_functions['sediment'])
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.get_exner_timestepper")
def get_exner_timestepper(self, integrator):
"""
Gets exner timestepper object with appropriate parameters
"""
uv, elev = self.fields.solution_2d.subfunctions
if not self.options.sediment_model_options.solve_suspended_sediment:
self.fields.sediment_2d = Function(elev.function_space()).interpolate(Constant(0.0))
fields = {
'elev_2d': elev,
'sediment': self.fields.sediment_2d,
'morfac': self.options.sediment_model_options.morphological_acceleration_factor,
'porosity': self.options.sediment_model_options.porosity,
}
# only pass SWE bcs, used to determine closed boundaries in bedload term
return integrator(self.equations.exner, self.fields.bathymetry_2d, fields, self.dt,
self.options.sediment_model_options.exner_timestepper_options,
self.bnd_functions['shallow_water'])
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.get_fs_timestepper")
def get_fs_timestepper(self, integrator):
"""
Gets free-surface correction timestepper object with appropriate parameters
"""
fields_fs = {
'uv': self.fields.uv_2d,
'volume_source': self.options.volume_source_2d,
}
return integrator(self.equations.fs, self.fields.elev_2d, fields_fs, self.dt,
self.options.nh_model_options.free_surface_timestepper_options,
self.bnd_functions['shallow_water'])
[docs]
@unfrozen
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.create_timestepper")
def create_timestepper(self):
"""
Creates time stepper instance
"""
if not hasattr(self, 'equations'):
self.create_equations()
self.compute_mesh_stats()
self.set_time_step()
# ----- Time integrators
steppers = {
'SSPRK33': rungekutta.SSPRK33,
'ForwardEuler': timeintegrator.ForwardEuler,
'SteadyState': timeintegrator.SteadyState,
'BackwardEuler': rungekutta.BackwardEulerUForm,
'DIRK22': rungekutta.DIRK22UForm,
'DIRK33': rungekutta.DIRK33UForm,
'CrankNicolson': timeintegrator.CrankNicolson,
'PressureProjectionPicard': timeintegrator.PressureProjectionPicard,
'SSPIMEX': implicitexplicit.IMEXLPUM2,
}
if self.options.nh_model_options.solve_nonhydrostatic_pressure:
self.poisson_solver = DepthIntegratedPoissonSolver(
self.fields.q_2d, self.fields.uv_2d, self.fields.w_2d,
self.fields.elev_2d, self.depth, self.dt, self.bnd_functions,
solver_parameters=self.options.nh_model_options.solver_parameters
)
self.timestepper = coupled_timeintegrator_2d.NonHydrostaticTimeIntegrator2D(
weakref.proxy(self), steppers[self.options.swe_timestepper_type],
steppers[self.options.nh_model_options.free_surface_timestepper_type]
)
elif self.solve_tracer:
self.timestepper = coupled_timeintegrator_2d.GeneralCoupledTimeIntegrator2D(
weakref.proxy(self), {
'shallow_water': steppers[self.options.swe_timestepper_type],
'tracer': steppers[self.options.tracer_timestepper_type],
},
)
elif self.options.sediment_model_options.solve_suspended_sediment or self.options.sediment_model_options.solve_exner:
self.timestepper = coupled_timeintegrator_2d.GeneralCoupledTimeIntegrator2D(
weakref.proxy(self), {
'shallow_water': steppers[self.options.swe_timestepper_type],
'sediment': steppers[self.options.sediment_model_options.sediment_timestepper_type],
'exner': steppers[self.options.sediment_model_options.exner_timestepper_type],
},
)
else:
self.timestepper = self.get_swe_timestepper(steppers[self.options.swe_timestepper_type])
print_output('Using time integrator: {:}'.format(self.timestepper.__class__.__name__))
[docs]
@unfrozen
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.create_exporters")
def create_exporters(self):
"""
Creates file exporters
"""
if not hasattr(self, 'timestepper'):
self.create_timestepper()
self.exporters = OrderedDict()
if not self.options.no_exports:
e = exporter.ExportManager(self.options.output_directory,
self.options.fields_to_export,
self.fields,
field_metadata,
export_type='vtk',
verbose=self.options.verbose > 0,
preproc_funcs=self._field_preproc_funcs)
self.exporters['vtk'] = e
hdf5_dir = os.path.join(self.options.output_directory, 'hdf5')
e = exporter.ExportManager(hdf5_dir,
self.options.fields_to_export_hdf5,
self.fields,
field_metadata,
export_type='hdf5',
verbose=self.options.verbose > 0,
preproc_funcs=self._field_preproc_funcs)
self.exporters['hdf5'] = e
[docs]
def initialize(self):
"""
Creates function spaces, equations, time stepper and exporters
"""
if not hasattr(self.function_spaces, 'U_2d'):
self.create_function_spaces()
if not hasattr(self, 'equations'):
self.create_equations()
if not hasattr(self, 'timestepper'):
self.create_timestepper()
if not hasattr(self, 'exporters'):
self.create_exporters()
self._initialized = True
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.assign_initial_conditions")
def assign_initial_conditions(self, elev=None, uv=None, sediment=None, **tracers):
r"""
Assigns initial conditions
:kwarg elev: Initial condition for water elevation
:type elev: scalar :class:`Function`, :class:`Constant`, or an expression
:kwarg uv: Initial condition for depth averaged velocity
:type uv: vector valued :class:`Function`, :class:`Constant`, or an expression
:kwarg sediment: Initial condition for sediment concantration
:type sediment: scalar valued :class:`Function`, :class:`Constant`, or an expression
:kwarg tracers: Initial conditions for tracer fields
:type tracers: scalar valued :class:`Function`\s, :class:`Constant`\s, or an expressions
"""
if not self._initialized:
self.initialize()
uv_2d, elev_2d = self.fields.solution_2d.subfunctions
if elev is not None:
elev_2d.project(elev)
if uv is not None:
uv_2d.project(uv)
for l, func in tracers.items():
label = l if len(l) > 3 and l[-3:] == '_2d' else l + '_2d'
assert label in self.options.tracer, f"Unknown tracer label {label}"
self.fields[label].project(func)
sediment_options = self.options.sediment_model_options
if self.sediment_model is not None:
# update sediment model based on initial conditions for uv and elev
self.sediment_model.update()
if sediment_options.solve_suspended_sediment:
if sediment is not None:
self.fields.sediment_2d.project(sediment)
else:
sediment = self.sediment_model.get_equilibrium_tracer()
if sediment_options.use_sediment_conservative_form:
sediment = sediment * self.depth.get_total_depth(elev_2d)
self.fields.sediment_2d.project(sediment)
self.timestepper.initialize(self.fields.solution_2d)
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.add_callback")
def add_callback(self, callback, eval_interval='export'):
"""
Adds callback to solver object
:arg callback: :class:`.DiagnosticCallback` instance
:kwarg string eval_interval: Determines when callback will be evaluated,
either 'export' or 'timestep' for evaluating after each export or
time step.
"""
self.callbacks.add(callback, eval_interval)
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.export")
def export(self):
"""
Export all fields to disk
Also evaluates all callbacks set to 'export' interval.
"""
self.callbacks.evaluate(mode='export')
for e in self.exporters.values():
e.export()
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.load_state")
def load_state(self, i_stored, outputdir=None, t=None, iteration=None,
i_export=None, legacy_mode=False):
"""
Loads simulation state from hdf5 outputs.
Replaces :meth:`.assign_initial_conditions` in model initialization.
For example, continue simulation from export 40,
.. code-block:: python
solver_obj.load_state(40)
Continue simulation from another output directory,
.. code-block:: python
solver_obj.load_state(40, outputdir='outputs_spinup')
Continue simulation from another output directory and reset the
counters,
.. code-block:: python
solver_obj.load_state(40, outputdir='outputs_spinup',
iteration=0, t=0, i_export=0)
Model state can be loaded if all prognostic state variables have been
stored in hdf5 format. For the hydrodynamics the required state
variables are: `elev_2d`, and `uv_2d`.
Simulation time and iteration can be recovered automatically provided
that the model setup is the same (e.g. time step and export_time).
:arg int i_stored: export index to load
:kwarg string outputdir: (optional) directory where files are read from.
By default ``options.output_directory``.
:kwarg float t: simulation time. Overrides the time stamp inferred from
model options.
:kwarg int iteration: Overrides the iteration count inferred from model
options.
:kwarg int i_export: Set initial export index for the present run. By
default, `i_stored` is used.
:kwarg bool legacy_mode: Load legacy `DumbCheckpoint` files.
"""
if not self._initialized:
self.initialize()
if outputdir is None:
outputdir = self.options.output_directory
# create new ExportManager with desired outputdir
state_fields = ['uv_2d', 'elev_2d']
hdf5_dir = os.path.join(outputdir, 'hdf5')
e = exporter.ExportManager(hdf5_dir,
state_fields,
self.fields,
field_metadata,
export_type='hdf5',
legacy_mode=legacy_mode,
verbose=self.options.verbose > 0)
e.exporters['uv_2d'].load(i_stored, self.fields.uv_2d)
e.exporters['elev_2d'].load(i_stored, self.fields.elev_2d)
self.assign_initial_conditions()
# time stepper bookkeeping for export time step
if i_export is None:
i_export = i_stored
self.i_export = i_export
self.next_export_t = self.i_export*self.options.simulation_export_time
if iteration is None:
iteration = int(numpy.ceil(self.next_export_t/self.dt))
if t is None:
t = iteration*self.dt
self.iteration = iteration
self.simulation_time = t
# for next export
self.export_initial_state = outputdir != self.options.output_directory
if self.export_initial_state:
offset = 0
else:
offset = 1
self.next_export_t += self.options.simulation_export_time
for e in self.exporters.values():
e.set_next_export_ix(self.i_export + offset)
[docs]
def print_state(self, cputime, print_header=False):
"""
Print a summary of the model state on stdout
:arg float cputime: Measured CPU time in seconds
:kwarg print_header: Whether to print column header first
"""
# list of entries to print: (header, value, format)
entries = [
('exp', self.i_export, '5d'),
('iter', self.iteration, '5d'),
]
# generate simulation time string
if self.options.simulation_initial_date is not None:
now = self.options.simulation_initial_date + datetime.timedelta(seconds=self.simulation_time)
date_str = f'{now:%Y-%m-%d}'.rjust(11)
time_str = f'{now:%H:%M:%S}'.rjust(9)
entries += [
('date', date_str, '11s'),
('time', time_str, '9s'),
]
else:
time_str = f'{self.simulation_time:.2f}'.rjust(15)
entries += [
('time', time_str, '15s'),
]
if self.options.tracer_only:
for label in self.options.tracer:
norm_q = norm(self.fields[label])
e = (label, norm_q, '10.4f')
entries.append(e)
else:
norm_h = norm(self.fields.solution_2d.subfunctions[1])
norm_u = norm(self.fields.solution_2d.subfunctions[0])
entries += [
('eta norm', norm_h, '14.4f'),
('u norm', norm_u, '14.4f'),
]
entries.append(('Tcpu', cputime, '6.2f'))
if print_header:
# generate header
header = ' '.join([e[0].rjust(len(f'{e[1]:{e[2]}}')) for e in entries])
print_output(header)
# generate line
line = ' '.join([f'{e[1]:{e[2]}}' for e in entries])
print_output(line)
sys.stdout.flush()
[docs]
@PETSc.Log.EventDecorator("thetis.FlowSolver2d.iterate")
def iterate(self, update_forcings=None,
export_func=None):
"""
Runs the simulation
Iterates over the time loop until time ``options.simulation_end_time`` is reached.
Exports fields to disk on ``options.simulation_export_time`` intervals.
:kwarg update_forcings: User-defined function that takes simulation
time as an argument and updates time-dependent boundary conditions
(if any).
:kwarg export_func: User-defined function (with no arguments) that will
be called on every export.
"""
if not self._initialized:
self.initialize()
self.options.use_limiter_for_tracers &= self.options.polynomial_degree > 0
t_epsilon = 1.0e-5
cputimestamp = time_mod.perf_counter()
next_export_t = self.simulation_time + self.options.simulation_export_time
dump_hdf5 = self.options.export_diagnostics and not self.options.no_exports
if self.options.check_volume_conservation_2d:
c = callback.VolumeConservation2DCallback(self,
export_to_hdf5=dump_hdf5,
append_to_log=True)
self.add_callback(c)
if self.options.check_tracer_conservation:
for label, tracer in self.options.tracer.items():
if tracer.use_conservative_form:
c = callback.ConservativeTracerMassConservation2DCallback(label,
self,
export_to_hdf5=dump_hdf5,
append_to_log=True)
else:
c = callback.TracerMassConservation2DCallback(label,
self,
export_to_hdf5=dump_hdf5,
append_to_log=True)
self.add_callback(c, eval_interval='export')
if self.options.sediment_model_options.check_sediment_conservation:
if self.options.sediment_model_options.use_sediment_conservative_form:
c = callback.ConservativeTracerMassConservation2DCallback('sediment_2d',
self,
export_to_hdf5=dump_hdf5,
append_to_log=True)
else:
c = callback.TracerMassConservation2DCallback('sediment_2d',
self,
export_to_hdf5=dump_hdf5,
append_to_log=True)
self.add_callback(c, eval_interval='export')
if self.options.check_tracer_overshoot:
for label in self.options.tracer:
c = callback.TracerOvershootCallBack(label,
self,
export_to_hdf5=dump_hdf5,
append_to_log=True)
self.add_callback(c, eval_interval='export')
if self.options.sediment_model_options.check_sediment_overshoot:
c = callback.TracerOvershootCallBack('sediment_2d',
self,
export_to_hdf5=dump_hdf5,
append_to_log=True)
self.add_callback(c, eval_interval='export')
initial_simulation_time = self.simulation_time
internal_iteration = 0
init_date = self.options.simulation_initial_date
end_date = self.options.simulation_end_date
if (init_date is not None and end_date is not None):
now = init_date + datetime.timedelta(initial_simulation_time)
assert end_date > now, f'Simulation end date must be greater than initial time {now}'
print_output(
f'Running simulation\n'
f' from {now:%Y-%m-%d %H:%M:%S %Z}\n'
f' to {end_date:%Y-%m-%d %H:%M:%S %Z}'
)
end_time = (end_date - now).total_seconds() + initial_simulation_time
if self.options.simulation_end_time is not None:
warning('Both simulation_end_date and simulation_end_time have been set, ignoring simulation_end_time.')
self.options.simulation_end_time = end_time
assert self.options.simulation_end_time is not None, 'simulation_end_time must be set'
# initial export
self.print_state(0.0, print_header=True)
if self.export_initial_state:
self.export()
if export_func is not None:
export_func()
if 'vtk' in self.exporters and isinstance(self.fields.bathymetry_2d, Function):
self.exporters['vtk'].export_bathymetry(self.fields.bathymetry_2d)
while self.simulation_time <= self.options.simulation_end_time - t_epsilon:
self.timestepper.advance(self.simulation_time, update_forcings)
# Move to next time step
self.iteration += 1
internal_iteration += 1
self.simulation_time = initial_simulation_time + internal_iteration*self.dt
self.callbacks.evaluate(mode='timestep')
# Write the solution to file
if self.simulation_time >= next_export_t - t_epsilon:
self.i_export += 1
next_export_t += self.options.simulation_export_time
cputime = time_mod.perf_counter() - cputimestamp
cputimestamp = time_mod.perf_counter()
self.print_state(cputime)
self.export()
if export_func is not None:
export_func()