Source code for thetis.forcing

"""
Routines for interpolating forcing fields for the 3D solver.
"""
from firedrake import *
from firedrake.petsc import PETSc
import scipy.spatial.qhull as qhull
import thetis.timezone as timezone
import thetis.interpolation as interpolation
from .log import *
import netCDF4
import thetis.physical_constants as physical_constants
import uptide
import uptide.tidal_netcdf
from abc import ABC, abstractmethod, abstractproperty
import os
import numpy


[docs] def compute_wind_stress(wind_u, wind_v, method='LargeYeager2009'): r""" Compute wind stress from atmospheric 10 m wind. wind stress is defined as .. math: tau_w = C_D \rho_{air} \|U_{10}\| U_{10} where :math:`C_D` is the drag coefficient, :math:`\rho_{air}` is the density of air, and :math:`U_{10}` is wind speed 10 m above the sea surface. In practice `C_D` depends on the wind speed. Two formulation are currently implemented: - "LargePond1981": Wind stress formulation by [1] - "SmithBanke1975": Wind stress formulation by [2] - "LargeYeager2009": Wind stress formulation by [3] [1] Large and Pond (1981). Open Ocean Momentum Flux Measurements in Moderate to Strong Winds. Journal of Physical Oceanography, 11(3):324-336. https://doi.org/10.1175/1520-0485(1981)011%3C0324:OOMFMI%3E2.0.CO;2 [2] Smith and Banke (1975). Variation of the sea surface drag coefficient with wind speed. Q J R Meteorol Soc., 101(429):665-673. https://doi.org/10.1002/qj.49710142920 [3] Large and Yeager (2009). The global climatology of an interannually varying air–sea flux data set. Climate Dynamics, 33:341–364. http://dx.doi.org/10.1007/s00382-008-0441-3 :arg wind_u, wind_v: Wind u and v components as numpy arrays :kwarg method: Choose the stress formulation. Currently supports: 'LargeYeager2009' (default), 'LargePond1981', or 'SmithBanke1975'. :returns: (tau_x, tau_y) wind stress x and y components as numpy arrays """ rho_air = float(physical_constants['rho_air']) wind_mag = numpy.hypot(wind_u, wind_v) if method == 'LargePond1981': CD_LOW = 1.2e-3 C_D = numpy.ones_like(wind_u)*CD_LOW high_wind = wind_mag > 11.0 C_D[high_wind] = 1.0e-3*(0.49 + 0.065*wind_mag[high_wind]) elif method == 'SmithBanke1975': C_D = (0.63 + 0.066 * wind_mag)/1000. elif method == 'LargeYeager2009': # NOTE wind velocity should be shifted to 10 m neutral equivalent # but it requires air temperature, humidity and iteration, see [3] high_wind = wind_mag > 33.0 eps = 1e-3 C_D = 1.e-3 * (2.7 / (wind_mag + eps) + 0.142 + wind_mag / 13.09 - 3.14807e-10*wind_mag**6) C_D[high_wind] = 2.34e-3 tau = C_D*rho_air*wind_mag tau_x = tau*wind_u tau_y = tau*wind_v return tau_x, tau_y
[docs] class ATMNetCDFTime(interpolation.NetCDFTimeParser): """ A TimeParser class for reading atmosphere model output files. """ @PETSc.Log.EventDecorator("thetis.ATMNetCDFTime.__init__") def __init__(self, filename, max_duration=None, verbose=False): """ :arg filename: :kwarg max_duration: Time span to read from each file (in seconds). E.g. forecast files can consist of daily files with > 1 day of data. In this case max_duration should be set to 24 h. If None, all time steps are loaded. Default: None. :kwarg bool verbose: Se True to print debug information. """ super(ATMNetCDFTime, self).__init__(filename, time_variable_name='time') # NOTE these are daily forecast files, limit time steps to one day self.start_time = timezone.epoch_to_datetime(float(self.time_array[0])) self.end_time_raw = timezone.epoch_to_datetime(float(self.time_array[-1])) self.time_step = numpy.mean(numpy.diff(self.time_array)) if max_duration is not None: self.max_steps = int(max_duration / self.time_step) else: self.max_steps = self.nb_steps self.time_array = self.time_array[:self.max_steps] self.end_time = timezone.epoch_to_datetime(float(self.time_array[-1])) if verbose: print_output('Parsed file {:}'.format(filename)) print_output(' Time span: {:} -> {:}'.format(self.start_time, self.end_time_raw)) print_output(' Time step: {:} h'.format(self.time_step/3600.)) if max_duration is not None: print_output(' Restricting duration to {:} h -> keeping {:} steps'.format(max_duration/3600., self.max_steps)) print_output(' New time span: {:} -> {:}'.format(self.start_time, self.end_time))
[docs] class ATMInterpolator(object): """ Interpolates WRF/NAM atmospheric model data on 2D fields. """ @PETSc.Log.EventDecorator("thetis.ATMInterpolator.__init__") def __init__(self, function_space, wind_stress_field, atm_pressure_field, coord_system, ncfile_pattern, init_date, vect_rotator=None, east_wind_var_name='uwind', north_wind_var_name='vwind', pressure_var_name='prmsl', fill_mode=None, fill_value=numpy.nan, verbose=False): """ :arg function_space: Target (scalar) :class:`FunctionSpace` object onto which data will be interpolated. :arg wind_stress_field: A 2D vector :class:`Function` where the output wind stress will be stored. :arg atm_pressure_field: A 2D scalar :class:`Function` where the output atmospheric pressure will be stored. :arg coord_system: :class:`CoordinateSystem` object :arg ncfile_pattern: A file name pattern for reading the atmospheric model output files. E.g. 'forcings/nam_air.local.2006_*.nc' :arg init_date: A :class:`datetime` object that indicates the start date/time of the Thetis simulation. Must contain time zone. E.g. 'datetime(2006, 5, 1, tzinfo=pytz.utc)' :kwarg vect_rotator: function that rotates vectors from ENU coordinates to target function space (optional). :kwarg east_wind_var_name, north_wind_var_name, pressure_var_name: wind component and pressure field names in netCDF file. :kwarg fill_mode: Determines how points outside the source grid will be treated. If 'nearest', value of the nearest source point will be used. Otherwise a constant fill value will be used (default). :kwarg float fill_value: Set the fill value (default: NaN) :kwarg bool verbose: Se True to print debug information. """ self.function_space = function_space self.wind_stress_field = wind_stress_field self.atm_pressure_field = atm_pressure_field # construct interpolators self.grid_interpolator = interpolation.NetCDFLatLonInterpolator2d( self.function_space, coord_system, fill_mode=fill_mode, fill_value=fill_value) var_list = [east_wind_var_name, north_wind_var_name, pressure_var_name] self.reader = interpolation.NetCDFSpatialInterpolator( self.grid_interpolator, var_list) self.timesearch_obj = interpolation.NetCDFTimeSearch(ncfile_pattern, init_date, ATMNetCDFTime, verbose=verbose) self.time_interpolator = interpolation.LinearTimeInterpolator(self.timesearch_obj, self.reader) lon = self.grid_interpolator.mesh_lonlat[:, 0] lat = self.grid_interpolator.mesh_lonlat[:, 1] if vect_rotator is None: self.vect_rotator = coord_system.get_vector_rotator(lon, lat) else: self.vect_rotator = vect_rotator
[docs] @PETSc.Log.EventDecorator("thetis.ATMInterpolator.set_fields") def set_fields(self, time): """ Evaluates forcing fields at the given time. Performs interpolation and updates the output wind stress and atmospheric pressure fields in place. :arg float time: Thetis simulation time in seconds. """ east_wind, north_wind, prmsl = self.time_interpolator(time) east_strs, north_strs = compute_wind_stress(east_wind, north_wind) if self.wind_stress_field.geometric_dimension() == 3: u_strs, v_strs, z_strs = self.vect_rotator(east_strs, north_strs) self.wind_stress_field.dat.data_with_halos[:, 0] = u_strs self.wind_stress_field.dat.data_with_halos[:, 1] = v_strs self.wind_stress_field.dat.data_with_halos[:, 2] = z_strs else: u_strs, v_strs = self.vect_rotator(east_strs, north_strs) self.wind_stress_field.dat.data_with_halos[:, 0] = u_strs self.wind_stress_field.dat.data_with_halos[:, 1] = v_strs self.atm_pressure_field.dat.data_with_halos[:] = prmsl
[docs] class SpatialInterpolatorNCOMBase(interpolation.SpatialInterpolator): """ Base class for 2D and 3D NCOM spatial interpolators. """ @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOMBase.__init__") def __init__(self, function_space, coord_system, grid_path): """ :arg function_space: Target (scalar) :class:`FunctionSpace` object onto which data will be interpolated. :arg coord_system: :class:`CoordinateSystem` object :arg grid_path: File path where the NCOM model grid files ('model_lat.nc', 'model_lon.nc', 'model_zm.nc') are located. """ self.function_space = function_space self.grid_path = grid_path self._initialized = False @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOMBase._create_2d_mapping") def _create_2d_mapping(self, ncfile): """ Create map for 2D nodes. """ # read source lat lon grid lat_full = self._get_forcing_grid('model_lat.nc', 'Lat') lon_full = self._get_forcing_grid('model_lon.nc', 'Long') x_ind = ncfile['X_Index'][:].astype(int) y_ind = ncfile['Y_Index'][:].astype(int) lon = lon_full[y_ind, :][:, x_ind] lat = lat_full[y_ind, :][:, x_ind] # find where data values are not defined varkey = None for k in ncfile.variables.keys(): if k not in ['X_Index', 'Y_Index', 'level']: varkey = k break assert varkey is not None, 'Could not find variable in file' vals = ncfile[varkey][:] # shape (nz, lat, lon) or (lat, lon) is3d = len(vals.shape) == 3 land_mask = numpy.all(vals.mask, axis=0) if is3d else vals.mask # build 2d mask mask_good_values = ~land_mask # neighborhood mask with bounding box mask_cover = numpy.zeros_like(mask_good_values) buffer = 0.2 lat_min = self.latlonz_array[:, 0].min() - buffer lat_max = self.latlonz_array[:, 0].max() + buffer lon_min = self.latlonz_array[:, 1].min() - buffer lon_max = self.latlonz_array[:, 1].max() + buffer mask_cover[(lat >= lat_min) * (lat <= lat_max) * (lon >= lon_min) * (lon <= lon_max)] = True mask_cover *= mask_good_values # include nearest valid neighbors # needed for nearest neighbor filling from scipy.spatial import cKDTree good_lat = lat[mask_good_values] good_lon = lon[mask_good_values] ll = numpy.vstack([good_lat.ravel(), good_lon.ravel()]).T dist, ix = cKDTree(ll).query(self.latlonz_array[:, :2]) ix = numpy.unique(ix) ix = numpy.nonzero(mask_good_values.ravel())[0][ix] a, b = numpy.unravel_index(ix, lat.shape) mask_nn = numpy.zeros_like(mask_good_values) mask_nn[a, b] = True # final mask mask = mask_cover + mask_nn self.nodes = numpy.nonzero(mask.ravel())[0] self.ind_lat, self.ind_lon = numpy.unravel_index(self.nodes, lat.shape) lat_subset = lat[self.ind_lat, self.ind_lon] lon_subset = lon[self.ind_lat, self.ind_lon] assert len(lat_subset) > 0, 'rank {:} has no source lat points' assert len(lon_subset) > 0, 'rank {:} has no source lon points' return lon_subset, lat_subset, x_ind, y_ind, vals def _get_forcing_grid(self, filename, varname): """ Helper function to load NCOM grid files. """ v = None with netCDF4.Dataset(os.path.join(self.grid_path, filename), 'r') as ncfile: v = ncfile[varname][:] return v
[docs] class SpatialInterpolatorNCOM3d(SpatialInterpolatorNCOMBase): """ Spatial interpolator class for interpolating NCOM ocean model 3D fields. """ @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOM3d.__init__") def __init__(self, function_space, coord_system, grid_path): """ :arg function_space: Target (scalar) :class:`FunctionSpace` object onto which data will be interpolated. :arg coord_system: :class:`CoordinateSystem` object :arg grid_path: File path where the NCOM model grid files ('model_lat.nc', 'model_lon.nc', 'model_zm.nc') are located. """ super().__init__(function_space, coord_system, grid_path) # construct local coordinates xyz = SpatialCoordinate(self.function_space.mesh()) tmp_func = self.function_space.get_work_function() xyz_array = numpy.zeros((tmp_func.dat.data_with_halos.shape[0], 3)) for i in range(3): tmp_func.interpolate(xyz[i]) xyz_array[:, i] = tmp_func.dat.data_with_halos[:] self.function_space.restore_work_function(tmp_func) self.latlonz_array = numpy.zeros_like(xyz_array) lon, lat = coord_system.to_lonlat( xyz_array[:, 0], xyz_array[:, 1], positive_lon=True ) self.latlonz_array[:, 0] = lat self.latlonz_array[:, 1] = lon self.latlonz_array[:, 2] = xyz_array[:, 2] @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOM3d._create_interpolator") def _create_interpolator(self, ncfile): """ Create a compact interpolator by finding the minimal necessary support """ lon_subset, lat_subset, x_ind, y_ind, vals = self._create_2d_mapping(ncfile) # find 3d mask where data is not defined vals = vals[:, self.ind_lat, self.ind_lon] self.good_mask_3d = ~vals.mask # construct vertical grid zm = self._get_forcing_grid('model_zm.nc', 'zm') zm = zm[:, y_ind, :][:, :, x_ind] grid_z = zm[:, self.ind_lat, self.ind_lon] # shape (nz, nlatlon) grid_z = grid_z.filled(-5000.) # nudge water surface higher for interpolation grid_z[0, :] = 1.5 nz = grid_z.shape[0] # data shape is [nz, neta*nxi] grid_lat = numpy.tile(lat_subset, (nz, 1))[self.good_mask_3d] grid_lon = numpy.tile(lon_subset, (nz, 1))[self.good_mask_3d] grid_z = grid_z[self.good_mask_3d] if numpy.ma.isMaskedArray(grid_lat): grid_lat = grid_lat.filled(0.0) if numpy.ma.isMaskedArray(grid_lon): grid_lon = grid_lon.filled(0.0) if numpy.ma.isMaskedArray(grid_z): grid_z = grid_z.filled(0.0) grid_latlonz = numpy.vstack((grid_lat, grid_lon, grid_z)).T # building 3D interpolator, this can take a long time (minutes) print_output('Constructing 3D GridInterpolator...') self.interpolator = interpolation.GridInterpolator( grid_latlonz, self.latlonz_array, normalize=True, fill_mode='nearest', dont_raise=True ) print_output('done.') self._initialized = True
[docs] @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOM3d.interpolate") def interpolate(self, nc_filename, variable_list, itime): """ Calls the interpolator object """ with netCDF4.Dataset(nc_filename, 'r') as ncfile: if not self._initialized: self._create_interpolator(ncfile) output = [] for var in variable_list: assert var in ncfile.variables grid_data = ncfile[var][:][:, self.ind_lat, self.ind_lon][self.good_mask_3d] data = self.interpolator(grid_data) output.append(data) return output
[docs] class SpatialInterpolatorNCOM2d(SpatialInterpolatorNCOMBase): """ Spatial interpolator class for interpolating NCOM ocean model 2D fields. """ @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOM2d.__init__") def __init__(self, function_space, coord_system, grid_path): """ :arg function_space: Target (scalar) :class:`FunctionSpace` object onto which data will be interpolated. :arg coord_system: :class:`CoordinateSystem` object :arg grid_path: File path where the NCOM model grid files ('model_lat.nc', 'model_lon.nc', 'model_zm.nc') are located. """ super().__init__(function_space, coord_system, grid_path) # construct local coordinates xyz = SpatialCoordinate(self.function_space.mesh()) tmp_func = self.function_space.get_work_function() xy_array = numpy.zeros((tmp_func.dat.data_with_halos.shape[0], 2)) for i in range(2): tmp_func.interpolate(xyz[i]) xy_array[:, i] = tmp_func.dat.data_with_halos[:] self.function_space.restore_work_function(tmp_func) self.latlonz_array = numpy.zeros_like(xy_array) lon, lat = coord_system.to_lonlat( xy_array[:, 0], xy_array[:, 1], positive_lon=True ) self.latlonz_array[:, 0] = lat self.latlonz_array[:, 1] = lon @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOM2d._create_interpolator") def _create_interpolator(self, ncfile): """ Create a compact interpolator by finding the minimal necessary support """ lon_subset, lat_subset, x_ind, y_ind, vals = self._create_2d_mapping(ncfile) grid_lat = lat_subset grid_lon = lon_subset if numpy.ma.isMaskedArray(grid_lat): grid_lat = grid_lat.filled(0.0) if numpy.ma.isMaskedArray(grid_lon): grid_lon = grid_lon.filled(0.0) grid_latlon = numpy.vstack((grid_lat, grid_lon)).T # building 3D interpolator, this can take a long time (minutes) self.interpolator = interpolation.GridInterpolator( grid_latlon, self.latlonz_array, normalize=False, fill_mode='nearest', dont_raise=True ) self._initialized = True
[docs] @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorNCOM2d.interpolate") def interpolate(self, nc_filename, variable_list, itime): """ Calls the interpolator object """ with netCDF4.Dataset(nc_filename, 'r') as ncfile: if not self._initialized: self._create_interpolator(ncfile) output = [] for var in variable_list: assert var in ncfile.variables grid_data = ncfile[var][:][self.ind_lat, self.ind_lon] data = self.interpolator(grid_data) output.append(data) return output
[docs] class NCOMInterpolator(object): """ Interpolates NCOM model data on 3D fields. .. note:: The following NCOM output files must be present: ./forcings/ncom/model_h.nc ./forcings/ncom/model_lat.nc ./forcings/ncom/model_ang.nc ./forcings/ncom/model_lon.nc ./forcings/ncom/model_zm.nc ./forcings/ncom/2006/s3d/s3d.glb8_2f_2006041900.nc ./forcings/ncom/2006/s3d/s3d.glb8_2f_2006042000.nc ./forcings/ncom/2006/t3d/t3d.glb8_2f_2006041900.nc ./forcings/ncom/2006/t3d/t3d.glb8_2f_2006042000.nc ./forcings/ncom/2006/u3d/u3d.glb8_2f_2006041900.nc ./forcings/ncom/2006/u3d/u3d.glb8_2f_2006042000.nc ./forcings/ncom/2006/v3d/v3d.glb8_2f_2006041900.nc ./forcings/ncom/2006/v3d/v3d.glb8_2f_2006042000.nc ./forcings/ncom/2006/ssh/ssh.glb8_2f_2006041900.nc ./forcings/ncom/2006/ssh/ssh.glb8_2f_2006042000.nc """ @PETSc.Log.EventDecorator("thetis.NCOMInterpolator.__init__") def __init__(self, function_space_2d, function_space_3d, fields, field_names, field_fnstr, coord_system, basedir, file_pattern, init_date, verbose=False): """ :arg function_space_2d: Target (scalar) :class:`FunctionSpace` object onto which 2D data will be interpolated. :arg function_space_3d: Target (scalar) :class:`FunctionSpace` object onto which 3D data will be interpolated. :arg fields: list of :class:`Function` objects where data will be stored. :arg field_names: List of netCDF variable names for the fields. E.g. ['Salinity', 'Temperature']. :arg field_fnstr: List of variables in netCDF file names. E.g. ['s3d', 't3d']. :arg coord_system: :class:`CoordinateSystem` object :arg basedir: Root dir where NCOM files are stored. E.g. '/forcings/ncom'. :arg file_pattern: A file name pattern for reading the NCOM output files (excluding the basedir). E.g. {year:04d}/{fieldstr:}/{fieldstr:}.glb8_2f_{year:04d}{month:02d}{day:02d}00.nc'. :arg init_date: A :class:`datetime` object that indicates the start date/time of the Thetis simulation. Must contain time zone. E.g. 'datetime(2006, 5, 1, tzinfo=pytz.utc)' :kwarg bool verbose: Se True to print debug information. """ self.function_space_2d = function_space_2d self.function_space_3d = function_space_3d for f in fields: assert f.function_space() in [self.function_space_2d, self.function_space_3d], 'field \'{:}\' does not belong to given function space.'.format(f.name()) assert len(fields) == len(field_names) assert len(fields) == len(field_fnstr) self.field_names = field_names self.fields = dict(zip(self.field_names, fields)) # construct interpolators self.grid_interpolator_2d = SpatialInterpolatorNCOM2d(self.function_space_2d, coord_system, basedir) self.grid_interpolator_3d = SpatialInterpolatorNCOM3d(self.function_space_3d, coord_system, basedir) # each field is in different file # construct time search and interp objects separately for each self.time_interpolator = {} for ncvarname, fnstr in zip(field_names, field_fnstr): gi = self.grid_interpolator_2d if fnstr == 'ssh' else self.grid_interpolator_3d r = interpolation.NetCDFSpatialInterpolator(gi, [ncvarname]) pat = file_pattern.replace('{fieldstr:}', fnstr) pat = os.path.join(basedir, pat) ts = interpolation.DailyFileTimeSearch(pat, init_date, verbose=verbose) ti = interpolation.LinearTimeInterpolator(ts, r) self.time_interpolator[ncvarname] = ti # construct velocity rotation object self.rotate_velocity = ('U_Velocity' in field_names and 'V_Velocity' in field_names) self.scalar_field_names = list(self.field_names) if self.rotate_velocity: self.scalar_field_names.remove('U_Velocity') self.scalar_field_names.remove('V_Velocity') lat = self.grid_interpolator_3d.latlonz_array[:, 0] lon = self.grid_interpolator_3d.latlonz_array[:, 1] self.vect_rotator = coord_system.get_vector_rotator(lon, lat)
[docs] @PETSc.Log.EventDecorator("thetis.NCOMInterpolator.set_fields") def set_fields(self, time): """ Evaluates forcing fields at the given time """ if self.rotate_velocity: # water_u (meter/sec) = Eastward Water Velocity # water_v (meter/sec) = Northward Water Velocity lon_vel = self.time_interpolator['U_Velocity'](time)[0] lat_vel = self.time_interpolator['V_Velocity'](time)[0] u, v = self.vect_rotator(lon_vel, lat_vel) self.fields['U_Velocity'].dat.data_with_halos[:] = u self.fields['V_Velocity'].dat.data_with_halos[:] = v for fname in self.scalar_field_names: vals = self.time_interpolator[fname](time)[0] self.fields[fname].dat.data_with_halos[:] = vals
[docs] class SpatialInterpolatorROMS3d(interpolation.SpatialInterpolator): """ Abstract spatial interpolator class that can interpolate onto a Function """ @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorROMS3d.__init__") def __init__(self, function_space, coord_system): """ :arg function_space: target Firedrake FunctionSpace :arg coord_system: :class:`CoordinateSystem` object """ self.function_space = function_space # construct local coordinates xyz = SpatialCoordinate(self.function_space.mesh()) tmp_func = self.function_space.get_work_function() xyz_array = numpy.zeros((tmp_func.dat.data_with_halos.shape[0], 3)) for i in range(3): tmp_func.interpolate(xyz[i]) xyz_array[:, i] = tmp_func.dat.data_with_halos[:] self.function_space.restore_work_function(tmp_func) self.latlonz_array = numpy.zeros_like(xyz_array) lon, lat = coord_system.to_lonlat( xyz_array[:, 0], xyz_array[:, 1] ) self.latlonz_array[:, 0] = lat self.latlonz_array[:, 1] = lon self.latlonz_array[:, 2] = xyz_array[:, 2] self._initialized = False def _get_subset_nodes(self, grid_x, grid_y, target_x, target_y): """ Returns grid nodes that are necessary for intepolating onto target_x,y """ orig_shape = grid_x.shape grid_xy = numpy.array((grid_x.ravel(), grid_y.ravel())).T target_xy = numpy.array((target_x.ravel(), target_y.ravel())).T tri = qhull.Delaunay(grid_xy) simplex = tri.find_simplex(target_xy) vertices = numpy.take(tri.simplices, simplex, axis=0) nodes = numpy.unique(vertices.ravel()) nodes_x, nodes_y = numpy.unravel_index(nodes, orig_shape) return nodes, nodes_x, nodes_y @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorROMS3d._compute_roms_z_coord") def _compute_roms_z_coord(self, ncfile, constant_zeta=None): zeta = ncfile['zeta'][0, :, :] bath = ncfile['h'][:] # NOTE compute z coordinates for full levels (w) cs = ncfile['Cs_w'][:] s = ncfile['s_w'][:] hc = ncfile['hc'][:] # ROMS transformation ver. 2: # z(x, y, sigma, t) = zeta(x, y, t) + (zeta(x, y, t) + h(x, y))*S(x, y, sigma) zeta = zeta[self.ind_lat, self.ind_lon][self.mask].filled(0.0) bath = bath[self.ind_lat, self.ind_lon][self.mask] if constant_zeta: zeta = numpy.ones_like(bath)*constant_zeta ss = (hc*s[:, numpy.newaxis] + bath[numpy.newaxis, :]*cs[:, numpy.newaxis])/(hc + bath[numpy.newaxis, :]) grid_z_w = zeta[numpy.newaxis, :]*(1 + ss) + bath[numpy.newaxis, :]*ss grid_z = 0.5*(grid_z_w[1:, :] + grid_z_w[:-1, :]) grid_z[0, :] = grid_z_w[0, :] grid_z[-1, :] = grid_z_w[-1, :] return grid_z @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorROMS3d._create_interpolator") def _create_interpolator(self, ncfile): """ Create compact interpolator by finding the minimal necessary support """ lat = ncfile['lat_rho'][:] lon = ncfile['lon_rho'][:] self.mask = ncfile['mask_rho'][:].astype(bool) self.nodes, self.ind_lat, self.ind_lon = self._get_subset_nodes(lat, lon, self.latlonz_array[:, 0], self.latlonz_array[:, 1]) lat_subset = lat[self.ind_lat, self.ind_lon] lon_subset = lon[self.ind_lat, self.ind_lon] self.mask = self.mask[self.ind_lat, self.ind_lon] # COMPUTE z coords for constant elevation=0.1 grid_z = self._compute_roms_z_coord(ncfile, constant_zeta=0.1) # omit land mask lat_subset = lat_subset[self.mask] lon_subset = lon_subset[self.mask] nz = grid_z.shape[0] # data shape is [nz, neta, nxi] grid_lat = numpy.tile(lat_subset, (nz, 1, 1)).ravel() grid_lon = numpy.tile(lon_subset, (nz, 1, 1)).ravel() grid_z = grid_z.ravel() if numpy.ma.isMaskedArray(grid_lat): grid_lat = grid_lat.filled(0.0) if numpy.ma.isMaskedArray(grid_lon): grid_lon = grid_lon.filled(0.0) if numpy.ma.isMaskedArray(grid_z): grid_z = grid_z.filled(0.0) grid_latlonz = numpy.vstack((grid_lat, grid_lon, grid_z)).T # building 3D interpolator, this can take a long time (minutes) print_output('Constructing 3D GridInterpolator...') self.interpolator = interpolation.GridInterpolator( grid_latlonz, self.latlonz_array, normalize=True, fill_mode='nearest' ) print_output('done.') self._initialized = True
[docs] @PETSc.Log.EventDecorator("thetis.SpatialInterpolatorROMS3d.interpolate") def interpolate(self, nc_filename, variable_list, itime): """ Calls the interpolator object """ with netCDF4.Dataset(nc_filename, 'r') as ncfile: if not self._initialized: self._create_interpolator(ncfile) output = [] for var in variable_list: assert var in ncfile.variables grid_data = ncfile[var][itime, :, :, :][:, self.ind_lat, self.ind_lon][:, self.mask].filled(numpy.nan).ravel() data = self.interpolator(grid_data) output.append(data) return output
[docs] class LiveOceanInterpolator(object): """ Interpolates LiveOcean (ROMS) model data on 3D fields """ @PETSc.Log.EventDecorator("thetis.LiveOceanInterpolator.__init__") def __init__(self, function_space, fields, field_names, ncfile_pattern, init_date, coord_system): self.function_space = function_space for f in fields: assert f.function_space() == self.function_space, 'field \'{:}\' does not belong to given function space {:}.'.format(f.name(), self.function_space.name) assert len(fields) == len(field_names) self.fields = fields self.field_names = field_names # construct interpolators self.grid_interpolator = SpatialInterpolatorROMS3d(self.function_space, coord_system) self.reader = interpolation.NetCDFSpatialInterpolator(self.grid_interpolator, field_names) self.timesearch_obj = interpolation.NetCDFTimeSearch(ncfile_pattern, init_date, interpolation.NetCDFTimeParser, time_variable_name='ocean_time', verbose=False) self.time_interpolator = interpolation.LinearTimeInterpolator(self.timesearch_obj, self.reader)
[docs] @PETSc.Log.EventDecorator("thetis.LiveOceanInterpolator.set_fields") def set_fields(self, time): """ Evaluates forcing fields at the given time """ vals = self.time_interpolator(time) for i in range(len(self.fields)): self.fields[i].dat.data_with_halos[:] = vals[i]
[docs] class GenericSpatialInterpolator2D(interpolation.SpatialInterpolator2d): """ Spatial interpolator class for interpolating netCDF 2D fields. """ def _get_nc_var_name(self, ncfile, standard_name): """ Find netCDF variable name that matches CF standard_name. Raises an AssertionError if standard_name is not found. :arg ncfile: netCDF Dataset object :arg standard_name: standard_name to look for :returns: name of the netCDF variable """ name = None for var in ncfile.variables.values(): if hasattr(var, 'standard_name') and var.standard_name == standard_name: name = var.name break msg = f'Variable {standard_name} not found in {ncfile.filepath()}' assert name is not None, msg return name def _get_subset_nodes(self, grid_x, grid_y, target_x, target_y): """ Returns grid nodes that are necessary for intepolating onto target_x,y """ orig_shape = grid_x.shape grid_xy = numpy.array((grid_x.ravel(), grid_y.ravel())).T target_xy = numpy.array((target_x.ravel(), target_y.ravel())).T tri = qhull.Delaunay(grid_xy) simplex = tri.find_simplex(target_xy) vertices = numpy.take(tri.simplices, simplex, axis=0) nodes = numpy.unique(vertices.ravel()) nodes_x, nodes_y = numpy.unravel_index(nodes, orig_shape) return nodes, nodes_x, nodes_y @PETSc.Log.EventDecorator("thetis.GenericSpatialInterpolator2D._create_interpolator") def _create_interpolator(self, ncfile): """ Create compact interpolator by finding the minimal necessary support """ lat_name = self._get_nc_var_name(ncfile, 'latitude') lon_name = self._get_nc_var_name(ncfile, 'longitude') lat1d = ncfile[lat_name][:] lon1d = ncfile[lon_name][:] lat, lon = numpy.meshgrid(lat1d, lon1d, indexing='ij') # find valid mask self.valid_mask = None # read a variable and take inverse of its mask for name, var in ncfile.variables.items(): if len(var.shape) == 3: v = var[0, ...] # read first time index self.valid_mask = ~v.mask break assert self.valid_mask is not None, 'could not determine mask' assert self.valid_mask.shape == lat.shape, 'mask has wrong shape {self.mask.shape} {lat.shape}' self.nodes, self.ind_lat, self.ind_lon = self._get_subset_nodes(lat, lon, self.mesh_lonlat[:, 1], self.mesh_lonlat[:, 0]) lat_subset = lat[self.ind_lat, self.ind_lon] lon_subset = lon[self.ind_lat, self.ind_lon] self.valid_mask = self.valid_mask[self.ind_lat, self.ind_lon] # omit land mask lat_subset = lat_subset[self.valid_mask] lon_subset = lon_subset[self.valid_mask] grid_lat = lat_subset.ravel() grid_lon = lon_subset.ravel() if numpy.ma.isMaskedArray(grid_lat): grid_lat = grid_lat.filled(0.0) if numpy.ma.isMaskedArray(grid_lon): grid_lon = grid_lon.filled(0.0) grid_latlon = numpy.vstack((grid_lat, grid_lon)).T # building 2D interpolator, this can take a long time (minutes) print_output('Constructing 2D GridInterpolator...') mesh_latlon = self.mesh_lonlat[:, [1, 0]] self.interpolator = interpolation.GridInterpolator( grid_latlon, mesh_latlon, normalize=False, fill_mode='nearest' ) print_output('done.') self._initialized = True
[docs] @PETSc.Log.EventDecorator("thetis.GenericSpatialInterpolator2D.interpolate") def interpolate(self, nc_filename, variable_list, itime): """ Calls the interpolator object """ with netCDF4.Dataset(nc_filename, 'r') as ncfile: if not self._initialized: self._create_interpolator(ncfile) output = [] for var in variable_list: assert var in ncfile.variables grid_data = ncfile[var][itime, ...][self.ind_lat, self.ind_lon][self.valid_mask] data = self.interpolator(grid_data) output.append(data) return output
[docs] class GenericInterpolator2D(object): """ Interpolates 2D fields from netCDF files. The grid latitude, longitude coordinates must be defined in 1D arrays with CF standard_name attributes "latitude" and "longitude". Time must be defined with cftime compliant units and metadata. """ @PETSc.Log.EventDecorator("thetis.GenericInterpolator2D.__init__") def __init__(self, function_space, fields, field_names, ncfile_pattern, init_date, coord_system, vector_field=None, vector_components=None, vector_rotator=None): self.function_space = function_space for f in fields: assert f.function_space() == self.function_space, 'field \'{:}\' does not belong to given function space {:}.'.format(f.name(), self.function_space.name) assert len(fields) == len(field_names) self.fields = fields self.field_names = list(field_names) self.scalar_field_index = list(range(len(field_names))) # construct interpolators self.grid_interpolator = GenericSpatialInterpolator2D(self.function_space, coord_system) self.reader = interpolation.NetCDFSpatialInterpolator(self.grid_interpolator, self.field_names) # TODO generalize _get_nc_var_name and use it for time dimension as well self.timesearch_obj = interpolation.NetCDFTimeSearch(ncfile_pattern, init_date, interpolation.NetCDFTimeParser, time_variable_name='time', verbose=False) self.time_interpolator = interpolation.LinearTimeInterpolator(self.timesearch_obj, self.reader) self.rotate_velocity = vector_components is not None if self.rotate_velocity: assert vector_field is not None, 'vector_field must be provided' self.field_names += list(vector_components) self.vector_field_index = [self.field_names.index(c) for c in vector_components] self.vector_field = vector_field lon = self.grid_interpolator.mesh_lonlat[:, 0] lat = self.grid_interpolator.mesh_lonlat[:, 1] if vector_rotator is None: self.vect_rotator = coord_system.get_vector_rotator(lon, lat) else: self.vect_rotator = vector_rotator
[docs] @PETSc.Log.EventDecorator("thetis.GenericInterpolator2D.set_fields") def set_fields(self, time): """ Evaluates forcing fields at the given time """ vals = self.time_interpolator(time) if self.rotate_velocity: i, j = self.vector_field_index east_comp = vals[i] north_comp = vals[j] if self.vector_field.geometric_dimension() == 3: u, v, w = self.vect_rotator(east_comp, north_comp) self.vector_field.dat.data_with_halos[:, 0] = u self.vector_field.dat.data_with_halos[:, 1] = v self.vector_field.dat.data_with_halos[:, 2] = w else: u, v = self.vect_rotator(east_comp, north_comp) self.vector_field.dat.data_with_halos[:, 0] = u self.vector_field.dat.data_with_halos[:, 1] = v for i in self.scalar_field_index: self.fields[i].dat.data_with_halos[:] = vals[i]
[docs] class TidalBoundaryForcing(ABC): """Base class for tidal boundary interpolators.""" @abstractproperty def coord_layout(): """ Data layout in the netcdf files. Either 'lon,lat' or 'lat,lon'. """ return 'lon,lat' @abstractproperty def compute_velocity(): """If True, compute tidal currents as well.""" return False @PETSc.Log.EventDecorator("thetis.TidalBoundaryForcing.__init__") def __init__(self, elev_field, init_date, coord_system, vect_rotator=None, uv_field=None, constituents=None, boundary_ids=None, data_dir=None): """ :arg elev_field: Function where tidal elevation will be interpolated. :arg init_date: Datetime object defining the simulation init time. :arg coord_system: :class:`CoordinateSystem` object :kwarg vect_rotator: User-defined vector rotator function :kwarg uv_field: Function where tidal transport will be interpolated. :kwarg constituents: list of tidal constituents, e.g. ['M2', 'K1'] :kwarg boundary_ids: list of boundary_ids where tidal data will be evaluated. If not defined, tides will be in evaluated in the entire domain. :kwarg data_dir: path to directory where tidal model netCDF files are located. """ assert init_date.tzinfo is not None, 'init_date must have time zone information' if constituents is None: constituents = ['Q1', 'O1', 'P1', 'K1', 'N2', 'M2', 'S2', 'K2'] self.data_dir = data_dir if data_dir is not None else '' if not self.compute_velocity and uv_field is not None: warning('{:}: uv_field is defined but velocity computation is not supported. uv_field will be ignored.'.format(__class__.__name__)) self.compute_velocity = self.compute_velocity and uv_field is not None # determine nodes at the boundary self.elev_field = elev_field self.uv_field = uv_field function_space = elev_field.function_space() if boundary_ids is None: # interpolate in the whole domain self.nodes = numpy.arange(self.elev_field.dat.data_with_halos.shape[0]) else: bc = DirichletBC(function_space, 0., boundary_ids) self.nodes = bc.nodes self._empty_set = self.nodes.size == 0 # construct local coordinates on_sphere = function_space.mesh().geometric_dimension() == 3 if on_sphere: x, y, z = SpatialCoordinate(function_space.mesh()) fsx = Function(function_space).interpolate(x).dat.data_with_halos fsy = Function(function_space).interpolate(y).dat.data_with_halos fsz = Function(function_space).interpolate(z).dat.data_with_halos coords = (fsx, fsy, fsz) else: x, y = SpatialCoordinate(function_space.mesh()) fsx = Function(function_space).interpolate(x).dat.data_with_halos fsy = Function(function_space).interpolate(y).dat.data_with_halos coords = (fsx, fsy) lon, lat = coord_system.to_lonlat(*coords, positive_lon=True) self.latlon = numpy.array([lat, lon]).T if not self._empty_set: # compute bounding box bounds_lat = [self.latlon[:, 0].min(), self.latlon[:, 0].max()] bounds_lon = [self.latlon[:, 1].min(), self.latlon[:, 1].max()] if self.coord_layout == 'lon,lat': self.ranges = (bounds_lon, bounds_lat) else: self.ranges = (bounds_lat, bounds_lon) self.tide = uptide.Tides(constituents) self.tide.set_initial_time(init_date) self._create_readers() if self.compute_velocity: lat = self.latlon[:, 0] lon = self.latlon[:, 1] if vect_rotator is None: self.vect_rotator = coord_system.get_vector_rotator(lon, lat) else: self.vect_rotator = vect_rotator @abstractmethod def _create_readers(self, ): """Create uptide netcdf reader objects.""" pass
[docs] @PETSc.Log.EventDecorator("thetis.TidalBoundaryForcing.set_tidal_field") def set_tidal_field(self, t): elev_data = self.elev_field.dat.data_with_halos if self.compute_velocity: uv_data = self.uv_field.dat.data_with_halos if self._empty_set: return self.tnci.set_time(t) if self.compute_velocity: self.tnciu.set_time(t) self.tnciv.set_time(t) if self.compute_velocity: lat_vel = numpy.zeros_like(elev_data) lon_vel = numpy.zeros_like(elev_data) for i, node in enumerate(self.nodes): lat, lon = self.latlon[node, :] point = (lon, lat) if self.coord_layout == 'lon,lat' else (lat, lon) try: elev = self.tnci.get_val(point, allow_extrapolation=True) elev_data[node] = elev except uptide.netcdf_reader.CoordinateError: elev_data[node] = 0. if self.compute_velocity: try: lon_vel[node] = self.tnciu.get_val(point, allow_extrapolation=True) lat_vel[node] = self.tnciv.get_val(point, allow_extrapolation=True) except uptide.netcdf_reader.CoordinateError: uv_data[node, 0] = 0 uv_data[node, 1] = 0 if self.compute_velocity: uv = self.vect_rotator(lon_vel, lat_vel) uv_data[:, 0] = uv[0] uv_data[:, 1] = uv[1]
[docs] class TPXOTidalBoundaryForcing(TidalBoundaryForcing): """ Interpolator for TPXO global tidal model data. See the `TPXO website <https://www.tpxo.net/global/tpxo9-atlas>`_ for data access. By default, this routine assumes the `tpxo9v5a` data set in NetCDF format. Other versions can be used by setting correct `elev_file`, `uv_file`, and `grid_file` arguments. """ coord_layout = 'lon,lat' compute_velocity = True @PETSc.Log.EventDecorator("thetis.TPXOTidalBoundaryForcing.__init__") def __init__(self, elev_field, init_date, coord_system, vect_rotator=None, uv_field=None, constituents=None, boundary_ids=None, data_dir=None, elev_file='h_tpxo9.v5a.nc', uv_file='u_tpxo9.v5a.nc', grid_file='gridtpxo9v5a.nc'): """ :arg elev_field: Function where tidal elevation will be interpolated. :arg init_date: Datetime object defining the simulation init time. :arg coord_system: :class:`CoordinateSystem` object :kwarg vect_rotator: User-defined vector rotator function :kwarg uv_field: Function where tidal transport will be interpolated. :kwarg constituents: list of tidal constituents, e.g. ['M2', 'K1'] :kwarg boundary_ids: list of boundary_ids where tidal data will be evaluated. If not defined, tides will be in evaluated in the entire domain. :kwarg data_dir: path to directory where tidal model netCDF files are located. :kwarg elev_file: TPXO tidal elevation file :kwarg uv_file: TPXO transport/velocity file :kwarg grid_file: TPXO grid file """ self.compute_velocity = uv_field is not None self.elev_nc_file = elev_file self.uv_nc_file = uv_file self.grid_nc_file = grid_file super().__init__( elev_field, init_date, coord_system, vect_rotator=vect_rotator, uv_field=uv_field, constituents=constituents, boundary_ids=boundary_ids, data_dir=data_dir ) @PETSc.Log.EventDecorator("thetis.TPXOTidalBoundaryForcing._create_readers") def _create_readers(self, ): """Create uptide netcdf reader objects.""" msg = 'File {:} not found.' f_grid = os.path.join(self.data_dir, self.grid_nc_file) assert os.path.exists(f_grid), msg.format(f_grid) f_elev = os.path.join(self.data_dir, self.elev_nc_file) assert os.path.exists(f_elev), msg.format(f_elev) self.tnci = uptide.tidal_netcdf.OTPSncTidalInterpolator(self.tide, f_grid, f_elev, ranges=self.ranges) if self.uv_field is not None: f_uv = os.path.join(self.data_dir, self.uv_nc_file) assert os.path.exists(f_uv), msg.format(f_uv) self.tnciu = uptide.tidal_netcdf.OTPSncTidalComponentInterpolator(self.tide, f_grid, f_uv, 'u', 'u', ranges=self.ranges) self.tnciv = uptide.tidal_netcdf.OTPSncTidalComponentInterpolator(self.tide, f_grid, f_uv, 'v', 'v', ranges=self.ranges)
[docs] class FES2004TidalBoundaryForcing(TidalBoundaryForcing): """Tidal boundary interpolator for FES2004 tidal model.""" elev_nc_file = 'tide.fes2004.nc' uv_nc_file = None grid_nc_file = None coord_layout = 'lat,lon' compute_velocity = False @PETSc.Log.EventDecorator("thetis.FES2004TidalBoundaryForcing._create_readers") def _create_readers(self, ): """Create uptide netcdf reader objects.""" f_elev = os.path.join(self.data_dir, self.elev_nc_file) msg = 'File {:} not found, download it from \nftp://ftp.legos.obs-mip.fr/pub/soa/maree/tide_model/global_solution/fes2004/'.format(f_elev) assert os.path.exists(f_elev), msg self.tnci = uptide.tidal_netcdf.FESTidalInterpolator(self.tide, f_elev, ranges=self.ranges)