"""
Defines custom callback functions used to compute various metrics at runtime.
"""
from .utility import *
from abc import ABC, abstractproperty, abstractmethod
import h5py
from collections import defaultdict
from .log import *
from firedrake import *
import numpy
[docs]
class CallbackManager(defaultdict):
"""
Stores callbacks in different categories and provides methods for
evaluating them.
Create callbacks and register them under ``'export'`` mode
.. code-block:: python
cb1 = VolumeConservation3DCallback(...)
cb2 = TracerMassConservationCallback(...)
cm = CallbackManager()
cm.add(cb1, 'export')
cm.add(cb2, 'export')
Evaluate callbacks, calls :func:`evaluate` method of all callbacks
registered in the given mode.
.. code-block:: python
cm.evaluate('export')
"""
def __init__(self):
super(CallbackManager, self).__init__(OrderedDict)
[docs]
def add(self, callback, mode):
"""
Add a callback under the given mode
:arg callback: a :class:`.DiagnosticCallback` object
:arg str mode: register callback under this mode
"""
key = callback.name
self[mode][key] = callback
[docs]
def evaluate(self, mode, index=None):
"""
Evaluate all callbacks registered under the given mode
:arg str mode: evaluate all callbacks under this mode
:kwarg int index: if provided, sets the export index. Default behavior
is to append to the file or stream.
"""
for key in sorted(self[mode]):
self[mode][key].evaluate(index=index)
[docs]
class DiagnosticHDF5(object):
"""
A HDF5 file for storing diagnostic time series arrays.
"""
@PETSc.Log.EventDecorator("thetis.DiagnosticHDF5.__init__")
def __init__(self, filename, varnames, array_dim=1, attrs=None,
var_attrs=None, comm=COMM_WORLD, new_file=True,
dtype='d', include_time=True):
"""
:arg str filename: Full filename of the HDF5 file.
:arg varnames: List of variable names that the diagnostic callback
provides
:kwarg array_dim: Dimension of the output array.
Can be a tuple for multi-dimensional output. Use "1" for scalars.
:kwarg dict attrs: Global attributes to be saved in the hdf5 file.
:kwarg dict var_attrs: nested dict of variable specific attributes,
e.g. {'time': {'units': 'seconds since 1950-01-01'}}
:kwarg comm: MPI communicator
:kwarg bool new_file: Define whether to create a new hdf5 file or
append to an existing one (if any)
:kwarg dtype: array datatype
:kwarg include_time: whether to include time array in the file
"""
self.comm = comm
self.filename = filename
self.varnames = varnames
self.nvars = len(varnames)
self.array_dim = array_dim
self.include_time = include_time
if comm.rank == 0 and new_file:
# create empty file with correct datasets
with h5py.File(filename, 'w') as hdf5file:
if include_time:
ds = hdf5file.create_dataset(
'time', (0, 1), maxshape=(None, 1), dtype=dtype)
if var_attrs is not None and 'time' in var_attrs:
ds.attrs.update(var_attrs['time'])
dim_list = array_dim
if isinstance(dim_list, tuple):
dim_list = list(dim_list)
elif not isinstance(dim_list, list):
dim_list = list([dim_list])
shape = tuple([0] + dim_list)
max_shape = tuple([None] + dim_list)
for var in self.varnames:
ds = hdf5file.create_dataset(
var, shape, maxshape=max_shape, dtype=dtype)
if var_attrs is not None and var in var_attrs:
ds.attrs.update(var_attrs[var])
if attrs is not None:
hdf5file.attrs.update(attrs)
def _expand_array(self, hdf5file, varname):
"""Expands array varname by 1 entry"""
arr = hdf5file[varname]
new_shape = list(arr.shape)
new_shape[0] += 1
arr.resize(tuple(new_shape))
def _expand(self, hdf5file):
"""Expands data arrays by 1 entry"""
for var in self.varnames:
self._expand_array(hdf5file, var)
if self.include_time:
self._expand_array(hdf5file, 'time')
def _nentries(self, hdf5file):
return hdf5file[self.varnames[0]].shape[0]
[docs]
@PETSc.Log.EventDecorator("thetis.DiagnosticHDF5.export")
def export(self, variables, time=None, index=None):
"""
Appends a new entry of (time, variables) to the file.
The HDF5 is updated immediately.
:arg variables: values of entry
:type variables: tuple of float
:kwarg time: time stamp of entry
:type time: float
:kwarg int index: If provided, defines the time index in the file
"""
if self.comm.rank == 0:
with h5py.File(self.filename, 'a') as hdf5file:
if index is not None:
nentries = self._nentries(hdf5file)
assert index <= nentries, 'time index out of range {:} <= {:} \n in file {:}'.format(index, nentries, self.filename)
expand_required = index == nentries
ix = index
if index is None or expand_required:
self._expand(hdf5file)
ix = self._nentries(hdf5file) - 1
if self.include_time:
assert time is not None, 'time should be provided as 2nd argument to export()'
hdf5file['time'][ix] = time
for i in range(self.nvars):
hdf5file[self.varnames[i]][ix, :] = variables[i]
hdf5file.close()
[docs]
class DiagnosticCallback(ABC):
"""
A base class for all Callback classes
"""
def __init__(self, solver_obj, array_dim=1, attrs=None,
outputdir=None,
export_to_hdf5=True,
append_to_log=True,
include_time=True,
hdf5_dtype='d',
start_time=None,
end_time=None):
"""
:arg solver_obj: Thetis solver object
:kwarg str outputdir: Custom directory where hdf5 files will be stored.
By default solver's output directory is used.
:kwarg array_dim: Dimension of the output array.
Can be a tuple for multi-dimensional output. Use "1" for scalars.
:kwarg dict attrs: Global attributes to be saved in the hdf5 file.
:kwarg bool export_to_hdf5: If True, diagnostics will be stored in hdf5
format
:kwarg bool append_to_log: If True, callback output messages will be
printed in log
:kwarg bool include_time: whether to include time in the hdf5 file
:kwarg hdf5_dtype: Precision to use in hdf5 output: `d` for double
precision (default), and `f` for single precision
:kwarg start_time: Optional start time for callback evaluation
:kwarg end_time: Optional end time for callback evaluation
"""
if attrs is None:
attrs = {}
self.solver_obj = solver_obj
self.outputdir = outputdir or self.solver_obj.options.output_directory
self.array_dim = array_dim
self.attrs = attrs
self.var_attrs = {}
self.append_to_hdf5 = export_to_hdf5
self.append_to_log = append_to_log
self.hdf5_dtype = hdf5_dtype
self.include_time = include_time
self._create_new_file = True
self._hdf5_initialized = False
self.start_time = start_time or -numpy.inf
self.end_time = end_time or numpy.inf
init_date = self.solver_obj.options.simulation_initial_date
if init_date is not None and include_time:
time_units = 'seconds since ' + init_date.isoformat()
self.var_attrs['time'] = {'units': time_units}
[docs]
def set_write_mode(self, mode):
"""
Define whether to create a new hdf5 file or append to an existing one
:arg str mode: Either 'create' (default) or 'append'
"""
assert mode in ['create', 'append']
self._create_new_file = mode == 'create'
def _create_hdf5_file(self):
"""
Creates an empty hdf5 file with correct datasets.
"""
if self.append_to_hdf5:
comm = self.solver_obj.comm
create_directory(self.outputdir, comm=comm)
fname = 'diagnostic_{:}.hdf5'.format(self.name.replace(' ', '_'))
fname = os.path.join(self.outputdir, fname)
self.hdf_exporter = DiagnosticHDF5(fname, self.variable_names,
array_dim=self.array_dim,
new_file=self._create_new_file,
attrs=self.attrs,
var_attrs=self.var_attrs,
comm=comm, dtype=self.hdf5_dtype,
include_time=self.include_time)
self._hdf5_initialized = True
@abstractproperty
def name(self):
"""The name of the diagnostic"""
pass
@abstractproperty
def variable_names(self):
"""Names of all scalar values"""
pass
@abstractmethod
def __call__(self):
"""
Evaluate the diagnostic value.
.. note::
This method must implement all MPI reduction operations (if any).
"""
pass
[docs]
@abstractmethod
def message_str(self, *args):
"""
A string representation.
:arg args: If provided, these will be the return value from
:meth:`__call__`.
"""
return "{} diagnostic".format(self.name)
[docs]
def push_to_log(self, time, args):
"""
Push callback status message to log
:arg time: time stamp of entry
:arg args: the return value from :meth:`__call__`.
"""
print_output(self.message_str(*args))
[docs]
def push_to_hdf5(self, time, args, index=None):
"""
Append values to HDF5 file.
:arg time: time stamp of entry
:arg args: the return value from :meth:`__call__`.
"""
if not self._hdf5_initialized:
self._create_hdf5_file()
self.hdf_exporter.export(args, time=time, index=index)
[docs]
def evaluate(self, index=None):
"""
Evaluates callback and pushes values to log and hdf file (if enabled)
"""
time = self.solver_obj.simulation_time
if time < self.start_time or time > self.end_time:
return
values = self.__call__()
if self.append_to_log:
self.push_to_log(time, values)
if self.append_to_hdf5:
self.push_to_hdf5(time, values, index=index)
[docs]
class ScalarConservationCallback(DiagnosticCallback):
"""Base class for callbacks that check conservation of a scalar quantity"""
variable_names = ['integral', 'relative_difference']
def __init__(self, scalar_callback, solver_obj, **kwargs):
"""
Creates scalar conservation check callback object
:arg scalar_callback: Python function that takes the solver object as
an argument and returns a scalar quantity of interest
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
super(ScalarConservationCallback, self).__init__(solver_obj, **kwargs)
self.scalar_callback = scalar_callback
self.initial_value = None
def __call__(self):
value = self.scalar_callback()
if self.initial_value is None:
self.initial_value = value
rel_diff = (value - self.initial_value)/self.initial_value
return value, rel_diff
[docs]
def message_str(self, *args):
line = '{0:s} rel. error {1:11.4e}'.format(self.name, args[1])
return line
[docs]
class VolumeConservation3DCallback(ScalarConservationCallback):
"""Checks conservation of 3D volume (volume of 3D mesh)"""
name = 'volume3d'
def __init__(self, solver_obj, **kwargs):
"""
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
def vol3d():
return comp_volume_3d(self.solver_obj.mesh)
super(VolumeConservation3DCallback, self).__init__(vol3d, solver_obj, **kwargs)
[docs]
class VolumeConservation2DCallback(ScalarConservationCallback):
"""Checks conservation of 2D volume (integral of water elevation field)"""
name = 'volume2d'
def __init__(self, solver_obj, **kwargs):
"""
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
def vol2d():
return comp_volume_2d(self.solver_obj.fields.elev_2d,
self.solver_obj.fields.bathymetry_2d)
super(VolumeConservation2DCallback, self).__init__(vol2d, solver_obj, **kwargs)
[docs]
class TracerMassConservation2DCallback(ScalarConservationCallback):
"""
Checks conservation of depth-averaged tracer mass
Depth-averaged tracer mass is defined as the integral of 2D tracer
multiplied by total depth.
"""
name = 'tracer mass'
def __init__(self, tracer_name, solver_obj, **kwargs):
"""
:arg tracer_name: Name of the tracer. Use canonical field names as in
:class:`.FieldDict`.
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
self.name = tracer_name + ' mass' # override name for given tracer
def mass():
H = solver_obj.depth.get_total_depth(solver_obj.fields.elev_2d)
return comp_tracer_mass_2d(solver_obj.fields[tracer_name], H)
super(TracerMassConservation2DCallback, self).__init__(mass, solver_obj, **kwargs)
[docs]
class ConservativeTracerMassConservation2DCallback(ScalarConservationCallback):
"""
Checks conservation of conservative tracer mass which is depth integrated.
"""
name = 'tracer mass'
def __init__(self, tracer_name, solver_obj, **kwargs):
"""
:arg tracer_name: Name of the tracer. Use canonical field names as in
:class:`.FieldDict`.
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
self.name = tracer_name + ' mass' # override name for given tracer
def mass():
# tracer is depth-integrated already, so just integrate over domain
return assemble(solver_obj.fields[tracer_name]*dx)
super(ConservativeTracerMassConservation2DCallback, self).__init__(mass, solver_obj, **kwargs)
[docs]
class TracerMassConservationCallback(ScalarConservationCallback):
"""Checks conservation of total tracer mass"""
name = 'tracer mass'
def __init__(self, tracer_name, solver_obj, **kwargs):
"""
:arg tracer_name: Name of the tracer. Use canonical field names as in
:class:`.FieldDict`.
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
self.name = tracer_name + ' mass' # override name for given tracer
def mass():
return comp_tracer_mass_3d(self.solver_obj.fields[tracer_name])
super(TracerMassConservationCallback, self).__init__(mass, solver_obj, **kwargs)
[docs]
class MinMaxConservationCallback(DiagnosticCallback):
"""Base class for callbacks that check conservation of a minimum/maximum"""
variable_names = ['min_value', 'max_value', 'undershoot', 'overshoot']
def __init__(self, minmax_callback, solver_obj, **kwargs):
"""
:arg minmax_callback: Python function that takes the solver object as
an argument and returns a (min, max) value tuple
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
super(MinMaxConservationCallback, self).__init__(solver_obj, **kwargs)
self.minmax_callback = minmax_callback
self.initial_value = None
def __call__(self):
value = self.minmax_callback()
if self.initial_value is None:
self.initial_value = value
overshoot = max(value[1] - self.initial_value[1], 0.0)
undershoot = min(value[0] - self.initial_value[0], 0.0)
return value[0], value[1], undershoot, overshoot
[docs]
def message_str(self, *args):
l = '{0:s} {1:g} {2:g}'.format(self.name, args[2], args[3])
return l
[docs]
class TracerOvershootCallBack(MinMaxConservationCallback):
"""Checks overshoots of the given tracer field."""
name = 'tracer overshoot'
def __init__(self, tracer_name, solver_obj, **kwargs):
"""
:arg tracer_name: Name of the tracer. Use canonical field names as in
:class:`.FieldDict`.
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
self.name = tracer_name + ' overshoot'
def minmax():
tracer_min = self.solver_obj.fields[tracer_name].dat.data.min()
tracer_max = self.solver_obj.fields[tracer_name].dat.data.max()
tracer_min = self.solver_obj.comm.allreduce(tracer_min, op=MPI.MIN)
tracer_max = self.solver_obj.comm.allreduce(tracer_max, op=MPI.MAX)
return tracer_min, tracer_max
super(TracerOvershootCallBack, self).__init__(minmax, solver_obj, **kwargs)
[docs]
class DetectorsCallback(DiagnosticCallback):
"""
Callback that evaluates the specified fields at the specified locations
"""
def __init__(self, solver_obj,
detector_locations,
field_names,
name,
detector_names=None,
**kwargs):
"""
:arg solver_obj: Thetis solver object
:arg detector_locations: List of x, y locations in which fields are to
be interpolated.
:arg field_names: List of fields to be interpolated.
:arg name: Unique name for this callback and its associated set of
detectors. This determines the name of the output h5 file
(prefixed with `diagnostic_`).
:arg detector_names: List of names for each of the detectors. If not
provided automatic names of the form `detectorNNN` are created
where NNN is the (0-padded) detector number.
:arg kwargs: any additional keyword arguments, see
:class:`.DiagnosticCallback`.
"""
# printing all detector output to log is probably not a useful default:
kwargs.setdefault('append_to_log', False)
self.field_dims = [solver_obj.fields[field_name].function_space().value_size
for field_name in field_names]
attrs = {
# use null-padded ascii strings, dtype='U' not supported in hdf5, see http://docs.h5py.org/en/latest/strings.html
'field_names': numpy.array(field_names, dtype='S'),
'field_dims': self.field_dims,
}
super().__init__(solver_obj, array_dim=sum(self.field_dims), attrs=attrs, **kwargs)
ndetectors = len(detector_locations)
if detector_names is None:
fill = len(str(ndetectors))
self.detector_names = ['detector{:0{fill}d}'.format(i, fill=fill) for i in range(ndetectors)]
else:
assert ndetectors == len(detector_names), "Different number of detector locations and names"
self.detector_names = detector_names
self._variable_names = self.detector_names
self.detector_locations = detector_locations
self.field_names = field_names
self._name = name
@property
def name(self):
return self._name
@property
def variable_names(self):
return self.detector_names
def _values_per_field(self, values):
"""
Given all values evaulated in a detector location, return the values per field"""
i = 0
result = []
for dim in self.field_dims:
result.append(values[i:i+dim])
i += dim
return result
[docs]
def message_str(self, *args):
return '\n'.join(
'In {}: '.format(name) + ', '.join(
'{}={}'.format(field_name, field_val) for field_name, field_val in zip(self.field_names, self._values_per_field(values)))
for name, values in zip(self.detector_names, args))
def _evaluate_field(self, field_name):
return self.solver_obj.fields[field_name](self.detector_locations)
def __call__(self):
"""
Evaluate all current fields in all detector locations
Returns a ndetectors x ndims array, where ndims is the sum of the
dimension of the fields.
"""
ndetectors = len(self.detector_locations)
field_vals = []
for field_name in self.field_names:
field_vals.append(numpy.reshape(self._evaluate_field(field_name), (ndetectors, -1)))
return numpy.hstack(field_vals)
[docs]
class AccumulatorCallback(DiagnosticCallback):
"""
Callback that evaluates a (scalar) functional involving integrals in both
time and space.
This callback can also be used to assemble time dependent objective
functionals for adjoint simulations. Time integration is achieved using
the trapezium rule.
"""
variable_names = ['spatial integral at current timestep']
def __init__(self, scalar_callback, solver_obj, **kwargs):
"""
:arg scalar_callback: Python function that returns a list of values of an objective functional.
:arg solver_obj: Thetis solver object
:arg kwargs: any additional keyword arguments, see DiagnosticCallback
"""
kwargs.setdefault('export_to_hdf5', False)
kwargs.setdefault('append_to_log', False)
super(AccumulatorCallback, self).__init__(solver_obj, **kwargs)
self.scalar_callback = scalar_callback # Evaluate functional
self.dt = solver_obj.options.timestep
self.integrant = 0.
self.old_value = None
def __call__(self):
scalar_value = self.scalar_callback()
if self.old_value is not None:
self.integrant += 0.5 * (self.old_value + scalar_value) * self.dt
self.old_value = scalar_value
return scalar_value
[docs]
def get_val(self):
return self.integrant
[docs]
def message_str(self, *args):
line = '{0:s} value {1:11.4e}'.format(self.name, args[0])
return line
[docs]
class TimeSeriesCallback2D(DiagnosticCallback):
"""
Extract a time series of a 2D field at a given (x,y) location
Currently implemented only for the 3D model.
"""
name = 'timeseries'
variable_names = ['value']
@PETSc.Log.EventDecorator("thetis.TimeSeriesCallback2D.__init__")
def __init__(self, solver_obj, fieldnames, x, y,
location_name, z=None,
outputdir=None, export_to_hdf5=True,
append_to_log=True,
tolerance=1e-3, eval_func=None,
start_time=None, end_time=None):
"""
:arg solver_obj: Thetis :class:`FlowSolver` object
:arg fieldnames: List of fields to extract
:arg x, y: location coordinates in model coordinate system.
:arg location_name: Unique name for this location. This
determines the name of the output h5 file (prefixed with
`diagnostic_`).
:kwarg str outputdir: Custom directory where hdf5 files will be stored.
By default solver's output directory is used.
:kwarg bool export_to_hdf5: If True, diagnostics will be stored in hdf5
format
:kwarg bool append_to_log: If True, callback output messages will be
printed in log
:kwarg start_time: Optional start time for timeseries extraction
:kwarg end_time: Optional end time for timeseries extraction
"""
assert export_to_hdf5 is True
self.fieldnames = fieldnames
self.location_name = location_name
attrs = {'x': x, 'y': y}
attrs['location_name'] = self.location_name
self.on_sphere = solver_obj.mesh2d.geometric_dimension() == 3
if self.on_sphere:
assert z is not None, 'z coordinate must be defined on a manifold mesh'
attrs['z'] = z
field_short_names = [f.split('_')[0] for f in self.fieldnames]
field_str = '-'.join(field_short_names)
self.variable_names = field_short_names
self.name += '_' + self.location_name
self.name += '_' + field_str
super(TimeSeriesCallback2D, self).__init__(
solver_obj,
outputdir=outputdir,
array_dim=1,
attrs=attrs,
export_to_hdf5=export_to_hdf5,
append_to_log=append_to_log,
start_time=start_time,
end_time=end_time)
self.x = x
self.y = y
self.z = z
self.tolerance = tolerance
self.eval_func = eval_func
self._initialized = False
@PETSc.Log.EventDecorator("thetis.TimeSeriesCallback2D._initialize")
def _initialize(self):
outputdir = self.outputdir
if outputdir is None:
outputdir = self.solver_obj.options.outputdir
# construct mesh points
xyz = (self.x, self.y, self.z) if self.on_sphere else (self.x, self.y)
self.xyz = numpy.array([xyz])
# test evaluation
try:
if self.eval_func is None:
self.solver_obj.fields.bathymetry_2d.at(self.xyz, tolerance=self.tolerance)
else:
self.eval_func(self.solver_obj.fields.bathymetry_2d, self.xyz, tolerance=self.tolerance)
except PointNotInDomainError as e:
error(
'{:}: Station "{:}" out of horizontal domain'.format(
self.__class__.__name__, self.location_name)
)
raise e
self._initialized = True
@PETSc.Log.EventDecorator("thetis.TimeSeriesCallback2D.__call__")
def __call__(self):
if not self._initialized:
self._initialize()
outvals = []
for fieldname in self.fieldnames:
try:
field = self.solver_obj.fields[fieldname]
if self.eval_func is None:
val = field.at(self.xyz, tolerance=self.tolerance)
else:
val = self.eval_func(field, self.xyz, tolerance=self.tolerance)
arr = numpy.array(val)
outvals.append(arr)
except PointNotInDomainError as e:
error('{:}: Cannot evaluate data at station {:}'.format(self.__class__.__name__, self.location_name))
raise e
return tuple(outvals)
[docs]
def message_str(self, *args):
out = ''
for fieldname, value in zip(self.fieldnames, args):
out += 'Evaluated {:} at {:}: {:.3g}\n'.format(
fieldname, self.location_name, value[0])
out = out[:-1] # suppress last line break
return out
[docs]
class TimeSeriesCallback3D(DiagnosticCallback):
"""
Extract a time series of a 3D field at a given (x,y,z) location
Currently implemented only for the 3D model.
"""
name = 'timeseries'
variable_names = ['value']
@PETSc.Log.EventDecorator("thetis.TimeSeriesCallback3D.__init__")
def __init__(self, solver_obj, fieldnames, x, y, z,
location_name,
outputdir=None, export_to_hdf5=True, append_to_log=True,
start_time=None, end_time=None):
"""
:arg solver_obj: Thetis :class:`FlowSolver` object
:arg fieldnames: List of fields to extract
:arg x, y, z: location coordinates in model coordinate system.
:arg location_name: Unique name for this location. This
determines the name of the output h5 file (prefixed with
`diagnostic_`).
:kwarg str outputdir: Custom directory where hdf5 files will be stored.
By default solver's output directory is used.
:kwarg bool export_to_hdf5: If True, diagnostics will be stored in hdf5
format
:kwarg bool append_to_log: If True, callback output messages will be
printed in log
:kwarg start_time: Optional start time for timeseries extraction
:kwarg end_time: Optional end time for timeseries extraction
"""
assert export_to_hdf5 is True
self.fieldnames = fieldnames
self.location_name = location_name
attrs = {'x': x, 'y': y, 'z': z}
attrs['location_name'] = self.location_name
field_short_names = [f.split('_')[0] for f in self.fieldnames]
field_str = '-'.join(field_short_names)
self.variable_names = field_short_names
self.name += '_' + self.location_name
self.name += '_' + field_str
super(TimeSeriesCallback3D, self).__init__(
solver_obj,
outputdir=outputdir,
array_dim=1,
attrs=attrs,
export_to_hdf5=export_to_hdf5,
append_to_log=append_to_log,
start_time=start_time,
end_time=end_time)
self.x = x
self.y = y
self.z = z
self._initialized = False
@PETSc.Log.EventDecorator("thetis.TimeSeriesCallback3D._initialize")
def _initialize(self):
outputdir = self.outputdir
if outputdir is None:
outputdir = self.solver_obj.options.outputdir
try:
min_z = -self.solver_obj.fields.bathymetry_2d.at((self.x, self.y))
except PointNotInDomainError as e:
error('{:}: Station "{:}" out of horizontal domain'.format(self.__class__.__name__, self.location_name))
raise e
if self.z < min_z:
new_z = min_z + 0.1
warning('Water depth too shallow at {:}; replacing z={:} by z={:}'.format(self.location_name, self.z, new_z))
self.z = new_z
# construct mesh points
self.xyz = numpy.array([[self.x, self.y, self.z]])
self._initialized = True
@PETSc.Log.EventDecorator("thetis.TimeSeriesCallback3D.__call__")
def __call__(self):
if not self._initialized:
self._initialize()
outvals = []
for fieldname in self.fieldnames:
try:
field = self.solver_obj.fields[fieldname]
arr = numpy.array(field.at(self.xyz))
outvals.append(arr)
except PointNotInDomainError as e:
error('{:}: Cannot evaluate data at station {:}'.format(self.__class__.__name__, self.location_name))
raise e
return tuple(outvals)
[docs]
def message_str(self, *args):
out = ''
for fieldname, value in zip(self.fieldnames, args):
out += 'Evaluated {:} at {:} {:.2f} m: {:.3g}\n'.format(
fieldname, self.location_name, self.z, value[0])
out = out[:-1] # suppress last line break
return out
[docs]
class VerticalProfileCallback(DiagnosticCallback):
"""
Extract a vertical profile of a 3D field at a given (x,y) location
Only for the 3D model.
"""
name = 'vertprofile'
variable_names = ['z_coord', 'value']
@PETSc.Log.EventDecorator("thetis.VerticalProfileCallback.__init__")
def __init__(self, solver_obj, fieldnames, x, y,
location_name,
npoints=48,
outputdir=None, export_to_hdf5=True,
append_to_log=True):
"""
:arg solver_obj: Thetis :class:`FlowSolver` object
:arg fieldnames: List of fields to extract
:arg x, y: location coordinates in model coordinate system.
:arg location_name: Unique name for this location. This
determines the name of the output h5 file (prefixed with
`diagnostic_`).
:arg int npoints: Number of points along the vertical axis. The 3D
field will be interpolated on these points, ranging from the bottom
to the (time dependent) free surface.
:kwarg str outputdir: Custom directory where hdf5 files will be stored.
By default solver's output directory is used.
:kwarg bool export_to_hdf5: If True, diagnostics will be stored in hdf5
format
:kwarg bool append_to_log: If True, callback output messages will be
printed in log
"""
assert export_to_hdf5 is True
self.fieldnames = fieldnames
self.location_name = location_name
attrs = {'x': x, 'y': y}
attrs['location_name'] = self.location_name
field_short_names = [f.split('_')[0] for f in self.fieldnames]
field_str = '-'.join(field_short_names)
self.variable_names = ['z_coord'] + field_short_names
self.name += '_' + self.location_name
self.name += '_' + field_str
super(VerticalProfileCallback, self).__init__(
solver_obj,
outputdir=outputdir,
array_dim=npoints,
attrs=attrs,
export_to_hdf5=export_to_hdf5,
append_to_log=append_to_log)
self.x = x
self.y = y
self.npoints = npoints
self.xy = numpy.array([self.x, self.y])
self.xyz = numpy.zeros((self.npoints, 3))
self.xyz[:, 0] = self.x
self.xyz[:, 1] = self.y
self.epsilon = 1e-2 # nudge points to avoid libspatialindex errors
self.alpha = numpy.linspace(0, 1, self.npoints)
self._initialized = False
def _initialize(self):
outputdir = self.outputdir
if outputdir is None:
outputdir = self.solver_obj.options.outputdir
self._initialized = True
@PETSc.Log.EventDecorator("thetis.VerticalProfileCallback._construct_z_array")
def _construct_z_array(self):
# construct mesh points for func evaluation
try:
depth = self.solver_obj.fields.bathymetry_2d.at(self.xy)
elev = self.solver_obj.fields.elev_cg_2d.at(self.xy)
except PointNotInDomainError as e:
error('{:}: Station "{:}" out of horizontal domain'.format(self.__class__.__name__, self.location_name))
raise e
z_min = -(depth - self.epsilon)
z_max = elev - self.epsilon
self.xyz[:, 2] = z_max + (z_min - z_max)*self.alpha
@PETSc.Log.EventDecorator("thetis.VerticalProfileCallback.__call__")
def __call__(self):
if not self._initialized:
self._initialize()
# update time-dependent z array
self._construct_z_array()
outvals = [self.xyz[:, 2]]
for fieldname in self.fieldnames:
try:
field = self.solver_obj.fields[fieldname]
arr = numpy.array(field.at(self.xyz))
outvals.append(arr)
except PointNotInDomainError as e:
error('{:}: Cannot evaluate data at station {:}'.format(self.__class__.__name__, self.location_name))
raise e
return tuple(outvals)
[docs]
def message_str(self, *args):
out = ''
for fieldname, prof in zip(self.fieldnames, args[1:]):
minval = prof.min()
maxval = prof.max()
out += 'Evaluated {:} profile at {:}: range {:.3g} - {:.3g}\n'.format(
fieldname, self.location_name, minval, maxval)
out = out[:-1] # suppress last line break
return out
[docs]
class TransectCallback(DiagnosticCallback):
"""
Extract a vertical transect of a 3D field at a given (x,y) locations.
Only for the 3D model.
"""
name = 'transect'
variable_names = ['z_coord', 'value']
@PETSc.Log.EventDecorator("thetis.TransectCallback.__init__")
def __init__(self, solver_obj, fieldnames, x, y,
location_name,
n_points_z=48,
z_min=None, z_max=None,
outputdir=None, export_to_hdf5=True,
append_to_log=True):
"""
:arg solver_obj: Thetis :class:`FlowSolver` object
:arg fieldnames: List of fields to extract
:arg x, y: location coordinates in model coordinate system.
:arg location_name: Unique name for this location. This
determines the name of the output h5 file (prefixed with
`diagnostic_`).
:arg int n_points_z: Number of points along the vertical axis. The 3D
field will be interpolated on these points, ranging from the bottom
to the (time dependent) free surface.
:kwarg float z_min, zmax: Force min/max value of z coordinate extent.
By default, transect will cover entire depth from bed to surface.
:kwarg str outputdir: Custom directory where hdf5 files will be stored.
By default solver's output directory is used.
:kwarg bool export_to_hdf5: If True, diagnostics will be stored in hdf5
format
:kwarg bool append_to_log: If True, will print extracted min/max values
of each field to log
"""
assert export_to_hdf5 is True
self.fieldnames = fieldnames
self.location_name = location_name
field_short_names = [f.split('_')[0] for f in self.fieldnames]
field_str = '-'.join(field_short_names)
self.name += '_' + self.location_name
self.name += '_' + field_str
self.x = numpy.array([x]).ravel()
self.y = numpy.array([y]).ravel()
if len(self.x) == 1:
self.x = numpy.ones_like(self.y) * self.x
if len(self.y) == 1:
self.y = numpy.ones_like(self.x) * self.y
attrs = {'x': self.x, 'y': self.y}
attrs['location_name'] = self.location_name
assert len(self.y) == len(self.x)
self.n_points_xy = len(self.x)
self.n_points_z = n_points_z
self.value_shape = (self.n_points_z, self.n_points_xy)
self.force_z_min = z_min
self.force_z_max = z_max
self.field_dims = {}
for f in self.fieldnames:
func = solver_obj.fields[f]
self.field_dims[f] = func.function_space().value_size
self.variable_names = ['z_coord']
for f, f_short in zip(fieldnames, field_short_names):
if self.field_dims[f] == 1:
self.variable_names.append(f_short)
else:
coords = ['x', 'y', 'z']
for k in range(self.field_dims[f]):
f_comp_name = f_short + '_' + coords[k]
self.variable_names.append(f_comp_name)
super().__init__(
solver_obj,
outputdir=outputdir,
array_dim=self.value_shape,
attrs=attrs,
export_to_hdf5=export_to_hdf5,
append_to_log=append_to_log)
self._initialized = False
@PETSc.Log.EventDecorator("thetis.TransectCallback._initialize")
def _initialize(self):
outputdir = self.outputdir
if outputdir is None:
outputdir = self.solver_obj.options.outputdir
# construct mesh points for evaluation
self.xy = list(zip(self.x, self.y))
self.trans_x = numpy.tile(self.x[numpy.newaxis, :], (self.n_points_z, 1))
self.trans_y = numpy.tile(self.y[numpy.newaxis, :], (self.n_points_z, 1))
@PETSc.Log.EventDecorator("thetis.TransectCallback._update_coords")
def _update_coords(self):
try:
depth = numpy.array(self.solver_obj.fields.bathymetry_2d.at(self.xy))
elev = numpy.array(self.solver_obj.fields.elev_cg_2d.at(self.xy))
except PointNotInDomainError as e:
error('{:}: Transect "{:}" point out of horizontal domain'.format(self.__class__.__name__, self.location_name))
raise e
epsilon = 1e-5 # nudge points to avoid libspatialindex errors
z_min = -(depth - epsilon)
z_max = elev - epsilon
if self.force_z_min is not None:
z_min = numpy.maximum(z_min, self.force_z_min)
if self.force_z_max is not None:
z_max = numpy.minimum(z_max, self.force_z_max)
self.trans_z = numpy.linspace(z_max, z_min, self.n_points_z)
self.trans_z = self.trans_z.reshape(self.value_shape)
self.xyz = numpy.vstack((self.trans_x.ravel(),
self.trans_y.ravel(),
self.trans_z.ravel())).T
@PETSc.Log.EventDecorator("thetis.TransectCallback.__call__")
def __call__(self):
if not self._initialized:
self._initialize()
self._update_coords()
outvals = [self.trans_z]
for fieldname in self.fieldnames:
field = self.solver_obj.fields[fieldname]
field_dim = self.field_dims[fieldname]
try:
vals = field.at(tuple(self.xyz))
arr = numpy.array(vals)
except PointNotInDomainError as e:
error('{:}: Cannot evaluate data on transect {:}'.format(self.__class__.__name__, self.location_name))
raise e
# arr has shape (nxy, nz, ncomponents)
shape = list(self.value_shape) + [field_dim]
arr = numpy.array(vals).reshape(shape)
# convert to list of components [(nxy, nz) , ...]
components = [arr[..., i] for i in range(arr.shape[-1])]
outvals.extend(components)
return tuple(outvals)
[docs]
def message_str(self, *args):
out = 'Evaluated transect "{:}":\n'.format(self.location_name)
lines = []
for fieldname, arr in zip(self.variable_names[1:], args[1:]):
minval = arr.min()
maxval = arr.max()
line = f' {fieldname} range: {minval:.3g} - {maxval:.3g}'
lines.append(line)
out += '\n'.join(lines)
return out