"""
Methods for interpolating data from structured data sets on Thetis fields.
Simple example of an atmospheric pressure interpolator:
.. code-block:: python
class WRFInterpolator(object):
# Interpolates WRF atmospheric model data on 2D fields
def __init__(self, function_space, atm_pressure_field, ncfile_pattern, init_date):
self.atm_pressure_field = atm_pressure_field
# object that interpolates forcing data from structured grid on the local mesh
self.grid_interpolator = NetCDFLatLonInterpolator2d(function_space, coord_system)
# reader object that can read fields from netCDF files, applies spatial interpolation
self.reader = NetCDFSpatialInterpolator(self.grid_interpolator, ['prmsl'])
# object that can find previous/next time stamps in a collection of netCDF files
self.timesearch_obj = NetCDFTimeSearch(ncfile_pattern, init_date, NetCDFTimeParser)
# finally a linear intepolator class that performs linar interpolation in time
self.interpolator = LinearTimeInterpolator(self.timesearch_obj, self.reader)
def set_fields(self, time):
# Evaluates forcing fields at the given time
pressure = self.interpolator(time)
self.atm_pressure_field.dat.data_with_halos[:] = pressure
Usage:
.. code-block:: python
atm_pressure_2d = Function(solver_obj.function_spaces.P1_2d, name='atm pressure')
wrf_pattern = 'forcings/atm/wrf/wrf_air.2016_*_*.nc'
wrf_atm = WRFInterpolator(
solver_obj.function_spaces.P1_2d,
wind_stress_2d, atm_pressure_2d, wrf_pattern, init_date)
simulation_time = 3600.
wrf_atm.set_fields(simulation_time)
"""
import glob
import os
from .timezone import *
from .log import *
import scipy.spatial.qhull as qhull
import netCDF4
from abc import ABC, abstractmethod
from firedrake import *
from firedrake.petsc import PETSc
import re
import string
import numpy
import cftime
TIMESEARCH_TOL = 1e-6
[docs]
def get_ncvar_name(ncfile, standard_name=None, long_name=None, var_name=None):
"""
Look for variables that match either CF standard_name or long_name
attributes.
If both are defined, standard_name takes precedence.
Note that the attributes in the netCDF file converted to
lower case prior to checking.
:arg ncfile: netCDF4 Dataset object
:kwarg standard_name: a target standard_name, or a list of them
:kwarg long_name: a target long_name, or a list of them
:kwarg var_name: a target netCDF variable name, or a list of them
"""
assert standard_name is not None or long_name is not None, \
'Either standard_name or long_name must be defined'
# convert to list
def listify(arg):
if not isinstance(arg, (list, tuple)):
arg = [arg]
if arg is None:
arg = []
return arg
standard_name = listify(standard_name)
long_name = listify(long_name)
var_name = listify(var_name)
found = False
for name, var in ncfile.variables.items():
if 'standard_name' in var.ncattrs():
if var.standard_name.lower() in standard_name:
found = True
break
if 'long_name' in var.ncattrs():
if var.long_name.lower() in long_name:
found = True
break
if var.name.lower() in var_name:
found = True
break
if not found:
filter_str = []
if standard_name is not None:
filter_str.append(f'standard_name={standard_name}')
if long_name is not None:
filter_str.append(f'long_name={long_name}')
filter_str = ' '.join(filter_str)
msg = f'Variable matching {filter_str} not found ' \
f'in {ncfile.filepath()}'
raise ValueError(msg)
return name
[docs]
class GridInterpolator(object):
"""
A reuseable griddata interpolator object.
Usage:
.. code-block:: python
interpolator = GridInterpolator(source_xyz, target_xyz)
vals = interpolator(source_data)
Example:
.. code-block:: python
x0 = numpy.linspace(0, 10, 10)
y0 = numpy.linspace(5, 10, 10)
X, Y = numpy.meshgrid(x, y)
x = X.ravel(); y = Y.ravel()
data = x + 25.*y
x_target = numpy.linspace(1, 10, 20)
y_target = numpy.linspace(5, 10, 20)
interpolator = GridInterpolator(numpy.vstack((x, y)).T, numpy.vstack((target_x, target_y)).T)
vals = interpolator(data)
Based on
http://stackoverflow.com/questions/20915502/speedup-scipy-griddata-for-multiple-interpolations-between-two-irregular-grids
"""
@PETSc.Log.EventDecorator("thetis.GridInterpolator.__init__")
def __init__(self, grid_xyz, target_xyz, fill_mode=None, fill_value=numpy.nan,
normalize=False, dont_raise=False):
"""
:arg grid_xyz: Array of source grid coordinates, shape (npoints, 2) or
(npoints, 3)
:arg target_xyz: Array of target grid coordinates, shape (n, 2) or
(n, 3)
:kwarg fill_mode: Determines how points outside the source grid will be
treated. If 'nearest', value of the nearest source point will be
used. Otherwise a constant fill value will be used (default).
:kwarg float fill_value: Set the fill value (default: NaN)
:kwarg bool normalize: If true the data is scaled to unit cube before
interpolation. Default: False.
:kwarg bool dont_raise: Do not raise a Qhull error if triangulation
fails. In this case the data will be set to fill value or nearest
neighbor value.
"""
self.fill_value = fill_value
self.fill_mode = fill_mode
self.normalize = normalize
self.fill_nearest = self.fill_mode == 'nearest'
self.shape = (target_xyz.shape[0], )
ngrid_points = grid_xyz.shape[0]
if self.fill_nearest:
assert ngrid_points > 0, 'at least one source point is needed'
if self.normalize:
def get_norm_params(x, scale=None):
min = x.min()
max = x.max()
if scale is None:
scale = max - min
a = 1./scale
b = -min*a
return a, b
ax, bx = get_norm_params(target_xyz[:, 0])
ay, by = get_norm_params(target_xyz[:, 1])
az, bz = get_norm_params(target_xyz[:, 2])
self.norm_a = numpy.array([ax, ay, az])
self.norm_b = numpy.array([bx, by, bz])
ngrid_xyz = self.norm_a*grid_xyz + self.norm_b
ntarget_xyz = self.norm_a*target_xyz + self.norm_b
else:
ngrid_xyz = grid_xyz
ntarget_xyz = target_xyz
self.cannot_interpolate = False
try:
d = ngrid_xyz.shape[1]
tri = qhull.Delaunay(ngrid_xyz)
# NOTE this becomes expensive in 3D for npoints > 10k
simplex = tri.find_simplex(ntarget_xyz)
vertices = numpy.take(tri.simplices, simplex, axis=0)
temp = numpy.take(tri.transform, simplex, axis=0)
delta = ntarget_xyz - temp[:, d]
bary = numpy.einsum('njk,nk->nj', temp[:, :d, :], delta)
self.vtx = vertices
self.wts = numpy.hstack((bary, 1 - bary.sum(axis=1, keepdims=True)))
self.outside = numpy.any(~numpy.isfinite(self.wts), axis=1)
self.outside += numpy.any(self.wts < 0, axis=1)
self.outside = numpy.nonzero(self.outside)[0]
self.fill_nearest *= len(self.outside) > 0
if self.fill_nearest:
# find nearest neighbor in the data set
from scipy.spatial import cKDTree
dist, ix = cKDTree(ngrid_xyz).query(ntarget_xyz[self.outside])
self.outside_to_nearest = ix
except qhull.QhullError as e:
if not dont_raise:
raise e
self.cannot_interpolate = True
if self.fill_nearest:
# find nearest neighbor in the data set
from scipy.spatial import cKDTree
dist, ix = cKDTree(ngrid_xyz).query(ntarget_xyz)
self.outside_to_nearest = ix
@PETSc.Log.EventDecorator("thetis.GridInterpolator.__call__")
def __call__(self, values):
"""
Interpolate values defined on grid_xyz to target_xyz.
:arg values: Array of source values to interpolate, shape (npoints, )
:kwarg float fill_value: Fill value to use outside the source grid (default: NaN)
"""
if self.cannot_interpolate:
if self.fill_nearest:
ret = values[self.outside_to_nearest]
else:
ret = numpy.ones(self.shape)*self.fill_value
else:
ret = numpy.einsum('nj,nj->n', numpy.take(values, self.vtx), self.wts)
if self.fill_nearest:
ret[self.outside] = values[self.outside_to_nearest]
else:
ret[self.outside] = self.fill_value
return ret
[docs]
class FileTreeReader(object):
"""
Abstract base class of file tree reader object
"""
@abstractmethod
def __call__(self, filename, time_index):
"""
Reads a data for one time step from the file
:arg str filename: a filename where to find the data (e.g. filename)
:arg int time_index: time index to read
:return: a list of floats or numpy.array_like objects
"""
pass
[docs]
class NetCDFTimeSeriesReader(FileTreeReader):
"""
A simple netCDF reader that returns a time slice of the given variable.
This class does not interpolate the data in any way. Useful for
interpolating time series.
"""
def __init__(self, variable_list, time_variable_name='time'):
self.variable_list = variable_list
self.time_variable_name = time_variable_name
self.time_dim = None
self.ndims = None
def _detect_time_dim(self, ncfile):
assert self.time_variable_name in ncfile.dimensions
nc_var = ncfile[self.variable_list[0]]
assert self.time_variable_name in nc_var.dimensions
self.time_dim = nc_var.dimensions.index(self.time_variable_name)
self.ndims = len(nc_var.dimensions)
def _get_slice(self, time_index):
"""
Returns a slice object that extracts a single time index
"""
if self.ndims == 1:
return time_index
slice_list = [slice(None, None, None)]*self.ndims
slice_list[self.time_dim] = slice(time_index, time_index+1, None)
return slice_list
def __call__(self, filename, time_index):
"""
Reads a time_index from the data base
:arg str filename: netcdf file where to find the data
:arg int time_index: time index to read
:return: a float or numpy.array_like value
"""
assert os.path.isfile(filename), 'File not found: {:}'.format(filename)
with netCDF4.Dataset(filename) as ncfile:
if self.time_dim is None:
self._detect_time_dim(ncfile)
output = []
for var in self.variable_list:
values = ncfile[var][self._get_slice(time_index)]
output.append(values)
return output
def _get_subset_nodes(grid_x, grid_y, target_x, target_y):
"""
Retuns grid nodes that are necessary for intepolating onto target_x,y
"""
orig_shape = grid_x.shape
grid_xy = numpy.array((grid_x.ravel(), grid_y.ravel())).T
target_xy = numpy.array((target_x.ravel(), target_y.ravel())).T
tri = qhull.Delaunay(grid_xy)
simplex = tri.find_simplex(target_xy)
vertices = numpy.take(tri.simplices, simplex, axis=0)
nodes = numpy.unique(vertices.ravel())
nodes_x, nodes_y = numpy.unravel_index(nodes, orig_shape)
# x and y bounds for reading a subset of the netcdf data
ind_x = slice(nodes_x.min(), nodes_x.max() + 1)
ind_y = slice(nodes_y.min(), nodes_y.max() + 1)
return nodes, ind_x, ind_y
[docs]
class SpatialInterpolator(ABC):
"""
Abstract base class for spatial interpolators that read data from disk
"""
@abstractmethod
def __init__(self, function_space, coord_system):
"""
:arg function_space: target Firedrake FunctionSpace
:arg coord_system: :class:`CoordinateSystem` object
"""
pass
[docs]
@abstractmethod
def interpolate(self, filename, variable_list, itime):
"""
Interpolates data from the given file at given time step
"""
pass
[docs]
class SpatialInterpolator2d(SpatialInterpolator, ABC):
"""
Abstract spatial interpolator class that can interpolate onto a 2D Function
"""
@PETSc.Log.EventDecorator("thetis.SpatialInterpolator2d.__init__")
def __init__(self, function_space, coord_system, fill_mode=None,
fill_value=numpy.nan):
"""
:arg function_space: target Firedrake FunctionSpace
:arg coord_system: :class:`CoordinateSystem` object
:kwarg fill_mode: Determines how points outside the source grid will be
treated. If 'nearest', value of the nearest source point will be
used. Otherwise a constant fill value will be used (default).
:kwarg float fill_value: Set the fill value (default: NaN)
"""
assert function_space.ufl_element().value_shape == ()
# construct local coordinates
on_sphere = function_space.mesh().geometric_dimension() == 3
if on_sphere:
x, y, z = SpatialCoordinate(function_space.mesh())
fsx = Function(function_space).interpolate(x).dat.data_with_halos
fsy = Function(function_space).interpolate(y).dat.data_with_halos
fsz = Function(function_space).interpolate(z).dat.data_with_halos
coords = (fsx, fsy, fsz)
else:
x, y = SpatialCoordinate(function_space.mesh())
fsx = Function(function_space).interpolate(x).dat.data_with_halos
fsy = Function(function_space).interpolate(y).dat.data_with_halos
coords = (fsx, fsy)
lon, lat = coord_system.to_lonlat(*coords)
self.mesh_lonlat = numpy.array([lon, lat]).T
self.fill_mode = fill_mode
self.fill_value = fill_value
self._initialized = False
@PETSc.Log.EventDecorator("thetis.SpatialInterpolator2d._create_interpolator")
def _create_interpolator(self, lat_array, lon_array):
"""
Create compact interpolator by finding the minimal necessary support
"""
assert len(lat_array.shape) == 2, 'Latitude must be two dimensional array.'
assert len(lon_array.shape) == 2, 'longitude must be two dimensional array.'
self.nodes, self.ind_lon, self.ind_lat = _get_subset_nodes(
lon_array,
lat_array,
self.mesh_lonlat[:, 0],
self.mesh_lonlat[:, 1]
)
subset_lat = lat_array[self.ind_lon, self.ind_lat].ravel()
subset_lon = lon_array[self.ind_lon, self.ind_lat].ravel()
subset_lonlat = numpy.array((subset_lon, subset_lat)).T
self.grid_interpolator = GridInterpolator(
subset_lonlat, self.mesh_lonlat, fill_mode=self.fill_mode,
fill_value=self.fill_value)
self._initialized = True
# debug: plot subsets
# import matplotlib.pyplot as plt
# plt.plot(grid_lon_full, grid_lat_full, 'k.')
# plt.plot(grid_lonlat[:, 0], grid_lonlat[:, 1], 'b.')
# plt.plot(self.mesh_lonlat[:, 0], self.mesh_lonlat[:, 1], 'r.')
# plt.show()
[docs]
@abstractmethod
def interpolate(self, filename, variable_list, time):
"""
Calls the interpolator object
"""
pass
[docs]
class NetCDFLatLonInterpolator2d(SpatialInterpolator2d):
"""
Interpolates netCDF data on a local 2D unstructured mesh
The intepolator is constructed for a single netCDF file that defines the
source grid. Once the interpolator has been constructed, data can be read
from any file that uses the same grid.
This routine returns the data in numpy arrays.
Usage:
.. code-block:: python
fs = FunctionSpace(...)
myfunc = Function(fs, ...)
ncinterp2d = NetCDFLatLonInterpolator2d(fs, coord_system, nc_filename)
val1, val2 = ncinterp2d.interpolate(nc_filename, ['var1', 'var2'], 10)
myfunc.dat.data_with_halos[:] = val1 + val2
"""
[docs]
@PETSc.Log.EventDecorator("thetis.NetCDFLatLonInterpolator2d.interpolate")
def interpolate(self, nc_filename, variable_list, itime):
"""
Interpolates data from a netCDF file onto Firedrake function space.
:arg str nc_filename: netCDF file to read
:arg variable_list: list of netCDF variable names to read
:arg int itime: time index to read
:returns: list of numpy.arrays corresponding to variable_list
"""
with netCDF4.Dataset(nc_filename, 'r') as ncfile:
if not self._initialized:
name_lat = get_ncvar_name(
ncfile, 'latitude', 'latitude', ['latitude', 'lat'])
name_lon = get_ncvar_name(
ncfile, 'longitude', 'longitude', ['longitude', 'lon'])
grid_lat = ncfile[name_lat][:]
grid_lon = ncfile[name_lon][:]
lat_is_1d = len(grid_lat.shape) == 1
lon_is_1d = len(grid_lat.shape) == 1
assert lat_is_1d == lon_is_1d, 'Unsupported lat lon grid'
if lat_is_1d and lon_is_1d:
grid_lon, grid_lat = numpy.meshgrid(grid_lon, grid_lat)
self._create_interpolator(grid_lat, grid_lon)
output = []
for var in variable_list:
msg = f'Variable {var} not found: {nc_filename}'
assert var in ncfile.variables, msg
# TODO generalize data dimensions, sniff from netcdf file
grid_data = ncfile[var][itime, self.ind_lon, self.ind_lat].ravel()
data = self.grid_interpolator(grid_data)
output.append(data)
return output
[docs]
class NetCDFSpatialInterpolator(FileTreeReader):
"""
Wrapper class that provides FileTreeReader API for grid interpolators
"""
def __init__(self, grid_interpolator, variable_list):
self.grid_interpolator = grid_interpolator
self.variable_list = variable_list
def __call__(self, filename, time_index):
return self.grid_interpolator.interpolate(filename, self.variable_list, time_index)
[docs]
class TimeParser(object):
"""
Abstract base class for time definition objects.
Defines the time span that a file (or data set) covers and provides a time
index search routine.
"""
[docs]
@abstractmethod
def get_start_time(self):
"""Returns the first time stamp in the file/data set"""
pass
[docs]
@abstractmethod
def get_end_time(self):
"""Returns the last time stamp in the file/data set"""
pass
[docs]
@abstractmethod
def find_time_stamp(self, t, previous=False):
"""
Given time t, returns index of the next (previous) time stamp
raises IndexError if t is out of range, i.e.
t > self.get_end_time() or t < self.get_start_time()
"""
pass
[docs]
class NetCDFTimeParser(TimeParser):
"""
Describes the time stamps stored in a netCDF file.
"""
def __init__(self, filename, time_variable_name='time', allow_gaps=False,
verbose=False):
"""
Construct a new object by scraping data from the given netcdf file.
:arg str filename: name of the netCDF file to read
:kwarg str time_variable_name: name of the time variable in the netCDF
file (default: 'time')
:kwarg bool allow_gaps: if False, an error is raised if time step is
not constant.
"""
self.filename = filename
self.time_variable_name = time_variable_name
def get_datetime(time, units, calendar):
"""
Convert netcdf time value to datetime.
"""
d = cftime.num2pydate(time, units, calendar)
if d.tzinfo is None:
d = pytz.utc.localize(d) # assume UTC
return d
with netCDF4.Dataset(filename) as d:
time_var = d[self.time_variable_name]
assert 'units' in time_var.ncattrs(), \
f'Time units not defined: {self.filename}'
assert 'calendar' in time_var.ncattrs(), \
f'Time calendar not defined: {self.filename}'
units = time_var.units
calendar = time_var.calendar
dates = [get_datetime(t, units, calendar) for t in time_var[:]]
self.time_array = numpy.array([datetime_to_epoch(d) for d in dates])
self.start_time = epoch_to_datetime(float(self.time_array[0]))
self.end_time = epoch_to_datetime(float(self.time_array[-1]))
self.time_step = numpy.mean(numpy.diff(self.time_array))
self.nb_steps = len(self.time_array)
if verbose:
print_output('Parsed file {:}'.format(filename))
print_output(' Time span: {:} -> {:}'.format(self.start_time, self.end_time))
print_output(' Number of time steps: {:}'.format(self.nb_steps))
if self.nb_steps > 1:
print_output(' Time step: {:} h'.format(self.time_step/3600.))
[docs]
def get_start_time(self):
return self.start_time
[docs]
def get_end_time(self):
return self.end_time
[docs]
def find_time_stamp(self, t, previous=False):
t_epoch = datetime_to_epoch(t) if isinstance(t, datetime.datetime) else t
itime = numpy.searchsorted(self.time_array, t_epoch + TIMESEARCH_TOL) # next
if previous:
itime -= 1
if itime < 0:
raise IndexError('Requested time out of bounds {:} < {:} in {:}'.format(t_epoch, self.time_array[0], self.filename))
if itime >= len(self.time_array):
raise IndexError('Requested time out of bounds {:} > {:} in {:}'.format(t_epoch, self.time_array[0], self.filename))
return itime
[docs]
class TimeSearch(object):
"""
Base class for searching nearest time steps in a file tree or database
"""
[docs]
@abstractmethod
def find(self, time, previous=False):
"""
Find a next (previous) time stamp from a given time
:arg float time: input time stamp
:arg bool previous: if True, look for last time stamp before requested
time. Otherwise returns next time stamp.
:return: a (filename, time_index, time) tuple
"""
pass
[docs]
class NetCDFTimeSearch(TimeSearch):
"""
Finds a nearest time stamp in a collection of netCDF files.
"""
@PETSc.Log.EventDecorator("thetis.NetCDFTimeSearch.__init__")
def __init__(self, file_pattern, init_date, netcdf_class, *args, **kwargs):
all_files = glob.glob(file_pattern)
assert len(all_files) > 0, 'No files found: {:}'.format(file_pattern)
self.netcdf_class = netcdf_class
self.init_date = init_date
self.sim_start_time = datetime_to_epoch(self.init_date)
self.verbose = kwargs.get('verbose', False)
dates = []
ncfiles = []
for fn in all_files:
nc = self.netcdf_class(fn, *args, **kwargs)
ncfiles.append(nc)
dates.append(nc.get_start_time())
sort_ix = numpy.argsort(dates)
self.files = numpy.array(all_files)[sort_ix]
self.ncfiles = numpy.array(ncfiles)[sort_ix]
self.start_datetime = numpy.array(dates)[sort_ix]
self.start_times = [(s - self.init_date).total_seconds() for s in self.start_datetime]
self.start_times = numpy.array(self.start_times)
if self.verbose:
print_output('{:}: Found time index:'.format(self.__class__.__name__))
for i in range(len(self.files)):
print_output('{:} {:} {:}'.format(i, self.files[i], self.start_times[i]))
nc = self.ncfiles[i]
print_output(' {:} -> {:}'.format(nc.start_time, nc.end_time))
if nc.nb_steps > 1:
print_output(' {:} time steps, dt = {:} s'.format(nc.nb_steps, nc.time_step))
else:
print_output(' {:} time steps'.format(nc.nb_steps))
[docs]
def simulation_time_to_datetime(self, t):
return epoch_to_datetime(datetime_to_epoch(self.init_date) + t).astimezone(self.init_date.tzinfo)
[docs]
@PETSc.Log.EventDecorator("thetis.NetCDFTimeSearch.find")
def find(self, simulation_time, previous=False):
"""
Find file that contains the given simulation time
:arg float simulation_time: simulation time in seconds
:kwarg bool previous: if True finds previous existing time stamp instead
of next (default False).
:return: (filename, time index, simulation time) of found data
"""
err_msg = 'No file found for time {:}'.format(self.simulation_time_to_datetime(simulation_time))
ix = numpy.searchsorted(self.start_times, simulation_time + TIMESEARCH_TOL)
if ix > 0:
candidates = [ix-1, ix]
else:
candidates = [ix]
if ix + 1 < len(self.start_times):
candidates += [ix + 1]
itime = None
for i in candidates:
try:
nc = self.ncfiles[i]
itime = nc.find_time_stamp(self.sim_start_time + simulation_time, previous=previous)
time = nc.time_array[itime] - self.sim_start_time
break
except IndexError:
pass
if itime is None:
raise Exception(err_msg)
return self.files[i], itime, time
[docs]
class DailyFileTimeSearch(TimeSearch):
"""
Treats a list of daily files as a time series.
File name pattern must be given as a string where the 4-digit year is
tagged with "{year:04d}", and 2-digit zero-padded month and year are tagged
with "{month:02d}" and "{day:02d}", respectively. The tags can be used
multiple times.
Example pattern:
'ncom/{year:04d}/s3d.glb8_2f_{year:04d}{month:02d}{day:02d}00.nc'
In this time search method the time stamps are parsed solely from the
filename, no other metadata is used. By default the data is assumed to be
centered at 12:00 UTC every day.
"""
@PETSc.Log.EventDecorator("thetis.DailyFileTimeSearch.__init__")
def __init__(self, file_pattern, init_date, verbose=False,
center_hour=12, center_timezone=pytz.utc):
self.file_pattern = file_pattern
self.init_date = init_date
self.sim_start_time = datetime_to_epoch(self.init_date)
self.verbose = verbose
all_files = self._find_files()
dates = []
for fn in all_files:
d = self._parse_date(fn)
timestamp = datetime.datetime(d['year'], d['month'], d['day'],
center_hour, tzinfo=center_timezone)
dates.append(timestamp)
sort_ix = numpy.argsort(dates)
self.files = numpy.array(all_files)[sort_ix]
self.start_datetime = numpy.array(dates)[sort_ix]
self.start_times = [(s - self.init_date).total_seconds() for s in self.start_datetime]
self.start_times = numpy.array(self.start_times)
if self.verbose:
print_output('{:}: Found time index:'.format(self.__class__.__name__))
for i in range(len(self.files)):
print_output('{:} {:} {:}'.format(i, self.files[i], self.start_times[i]))
print_output(' {:}'.format(self.start_datetime[i]))
def _find_files(self):
"""Finds all files that match the given pattern."""
search_pattern = str(self.file_pattern)
search_pattern = search_pattern.replace(':02d}', ':}')
search_pattern = search_pattern.replace(':04d}', ':}')
search_pattern = search_pattern.format(year='*', month='*', day='*')
all_files = glob.glob(search_pattern)
assert len(all_files) > 0, 'No files found: {:}'.format(search_pattern)
return all_files
def _parse_date(self, filename):
"""
Parse year, month, day from filename using the given pattern.
"""
re_pattern = str(self.file_pattern)
re_pattern = re_pattern.replace('{year:04d}', r'(\d{4,4})')
re_pattern = re_pattern.replace('{month:02d}', r'(\d{2,2})')
re_pattern = re_pattern.replace('{day:02d}', r'(\d{2,2})')
o = re.findall(re_pattern, filename)
assert len(o) == 1, 'parsing date from filename failed\n {:}'.format(filename)
values = [int(v) for v in o[0]]
fmt = string.Formatter()
labels = [s[1] for s in fmt.parse(self.file_pattern) if s[1] is not None]
return dict(zip(labels, values))
[docs]
def simulation_time_to_datetime(self, t):
return epoch_to_datetime(datetime_to_epoch(self.init_date) + t).astimezone(self.init_date.tzinfo)
[docs]
@PETSc.Log.EventDecorator("thetis.DailyFileTimeSearch.find")
def find(self, simulation_time, previous=False):
"""
Find file that contains the given simulation time
:arg float simulation_time: simulation time in seconds
:kwarg bool previous: if True finds previous existing time stamp instead
of next (default False).
:return: (filename, time index, simulation time) of found data
"""
err_msg = 'No file found for time {:}'.format(self.simulation_time_to_datetime(simulation_time))
ix = numpy.searchsorted(self.start_times, simulation_time + TIMESEARCH_TOL)
i = ix - 1 if previous else ix
assert i >= 0, err_msg
assert i < len(self.start_times), err_msg
itime = 0
time = self.start_times[i]
return self.files[i], itime, time
[docs]
class LinearTimeInterpolator(object):
"""
Interpolates time series in time
User must provide timesearch_obj that finds time stamps from
a file tree, and a reader that can read those time stamps into numpy arrays.
Previous/next data sets are cached in memory to avoid hitting disk every
time.
"""
def __init__(self, timesearch_obj, reader):
"""
:arg timesearch_obj: TimeSearch object
:arg reader: FileTreeReader object
"""
self.timesearch = timesearch_obj
self.reader = reader
self.cache = {}
def _get_from_cache(self, key):
"""
Fetch data set from cache, read if not present
"""
if key not in self.cache:
self.cache[key] = self.reader(key[0], key[1])
return self.cache[key]
def _clean_cache(self, keys_to_keep):
"""
Remove cached data sets that are no longer needed
"""
for key in list(self.cache.keys()):
if key not in keys_to_keep:
self.cache.pop(key)
def __call__(self, t):
"""
Interpolate at time t
:retuns: list of numpy arrays
"""
prev_id = self.timesearch.find(t, previous=True)
next_id = self.timesearch.find(t, previous=False)
prev = self._get_from_cache(prev_id)
next = self._get_from_cache(next_id)
self._clean_cache([prev_id, next_id])
# interpolate
t_prev = prev_id[2]
t_next = next_id[2]
alpha = (t - t_prev)/(t_next - t_prev)
RELTOL = 1e-6
assert alpha >= 0.0 - RELTOL and alpha <= 1.0 + RELTOL, \
'Value {:} out of range {:} .. {:}'.format(t, t_prev, t_next)
val = [(1.0 - alpha)*p + alpha*n for p, n in zip(prev, next)]
return val
[docs]
class NetCDFTimeSeriesInterpolator(object):
"""
Reads and interpolates scalar time series from a sequence of netCDF files.
"""
@PETSc.Log.EventDecorator("thetis.NetCDFTimeSeriesInterpolator.__init__")
def __init__(self, ncfile_pattern, variable_list, init_date,
time_variable_name='time', scalars=None, allow_gaps=False):
"""
:arg str ncfile_pattern: file search pattern, e.g. "mydir/foo_*.nc"
:arg variable_list: list if netCDF variable names to read
:arg datetime.datetime init_date: simulation start time
:kwarg scalars: (optional) list of scalars; scale output variables by
a factor.
.. note::
All the variables must have the same dimensions in the netCDF files.
If the shapes differ, create separate interpolator instances.
"""
self.reader = NetCDFTimeSeriesReader(
variable_list, time_variable_name=time_variable_name)
self.timesearch_obj = NetCDFTimeSearch(
ncfile_pattern, init_date, NetCDFTimeParser,
time_variable_name=time_variable_name, allow_gaps=allow_gaps)
self.time_interpolator = LinearTimeInterpolator(self.timesearch_obj, self.reader)
if scalars is not None:
assert len(scalars) == len(variable_list)
self.scalars = scalars
@PETSc.Log.EventDecorator("thetis.NetCDFTimeSeriesInterpolator.__call__")
def __call__(self, time):
"""
Time series at the given time
:returns: list of scalars or numpy.arrays
"""
vals = self.time_interpolator(time)
if self.scalars is not None:
for i in range(len(vals)):
vals[i] *= self.scalars[i]
return vals