from .utility import *
from .log import warning
[docs]
class CorrectiveVelocityFactor:
def __init__(self, depth, ksp, settling_velocity, ustar, a):
"""
Set up advective velocity factor `self.velocity_correction_factor`
which accounts for mismatch between depth-averaged product of
velocity with sediment and product of depth-averaged velocity
with depth-averaged sediment.
:arg depth: Depth of fluid
:type depth: :class:`Function`
:arg ksp: Grain roughness coefficient
:type ksp: :class:`Constant`
:arg settling_velocity: Settling velocity of the sediment particles
:type settling_velocity: :class:`Constant`
:arg ustar: Shear velocity
:type ustar: :class:`Expression of functions`
:arg a: Factor of bottom bed reference height
:type a: :class:`Constant`
"""
kappa = physical_constants['von_karman']
# correction factor to advection velocity in sediment concentration equation
Bconv = conditional(depth > Constant(1.1)*ksp, ksp/depth, Constant(1/1.1))
Aconv = conditional(depth > Constant(1.1)*a, a/depth, Constant(1/1.1))
# take max of value calculated either by ksp or depth
Amax = conditional(Aconv > Bconv, Aconv, Bconv)
r1conv = Constant(1) - (1/kappa)*conditional(settling_velocity/ustar < Constant(1),
settling_velocity/ustar, Constant(1))
Ione = conditional(r1conv > Constant(1e-8), (Constant(1) - Amax**r1conv)/r1conv,
conditional(r1conv < Constant(- 1e-8), (Constant(1) - Amax**r1conv)/r1conv, ln(Amax)))
Itwo = conditional(r1conv > Constant(1e-8), -(Ione + (ln(Amax)*(Amax**r1conv)))/r1conv,
conditional(r1conv < Constant(- 1e-8), -(Ione + (ln(Amax)*(Amax**r1conv)))/r1conv,
Constant(-0.5)*ln(Amax)**2))
self.alpha = -(Itwo - (ln(Amax) - ln(30))*Ione)/(Ione * ((ln(Amax) - ln(30)) + Constant(1)))
# final correction factor
self.velocity_correction_factor = Function(depth.function_space(), name='velocity correction factor')
self.velocity_correction_factor_expr = max_value(min_value(self.alpha, Constant(1)), Constant(0.))
self.update()
[docs]
def update(self):
"""
Update `self.velocity_correction_factor` using the updated values for velocity
"""
# final correction factor
self.velocity_correction_factor.interpolate(self.velocity_correction_factor_expr)
[docs]
class SedimentModel(object):
def __init__(self, options, mesh2d, uv, elev, depth):
"""
Set up a full morphological model simulation based on provided velocity and elevation functions.
:arg options: Model options.
:type options: :class:`.ModelOptions2d` instance
:arg mesh2d: :class:`Mesh` object of the 2D mesh
:arg uv: the velocity solution during the simulation
:type uv: :class:`Function`
:arg elev: the elevation solution during the simulation
:type elev: :class:`Function`
:arg depth: a :class:`DepthExpression` instance to evaluate the current depth
The :class:`.SedimentModel` provides various expressions to be used in a suspended sediment and/or
the Exner equation. NOTE that the functions used in these expressions need to be updated
with the current values of the uv and elev fields by calling :func:`.SedimentModel.update`. This is not done
in the initialisation of the sediment model, so that the :class:`.SedimentModel` can be created before
initial conditions have been assigned to uv and elev. After the initial conditions have been
assigned a call to update is required to ensure that the initial values are reflected in
the :class:`.SedimentModel` terms.
"""
self.uv = uv
self.elev = elev
self.depth = depth
self.options = options
self.solve_suspended_sediment = options.sediment_model_options.solve_suspended_sediment
self.use_bedload = options.sediment_model_options.use_bedload
self.use_sediment_slide = options.sediment_model_options.use_sediment_slide
self.use_angle_correction = options.sediment_model_options.use_angle_correction
self.use_slope_mag_correction = options.sediment_model_options.use_slope_mag_correction
self.use_advective_velocity_correction = options.sediment_model_options.use_advective_velocity_correction
self.use_secondary_current = options.sediment_model_options.use_secondary_current
self.mesh2d = mesh2d
if not self.use_bedload:
if self.use_angle_correction:
warning('Slope effect angle correction only applies to bedload transport which is not used in this simulation')
if self.use_slope_mag_correction:
warning('Slope effect magnitude correction only applies to bedload transport which is not used in this simulation')
if self.use_secondary_current:
warning('Secondary current only applies to bedload transport which is not used in this simulation')
self.average_size = options.sediment_model_options.average_sediment_size
self.bed_reference_height = options.sediment_model_options.bed_reference_height
self.rhos = options.sediment_model_options.sediment_density
# define function spaces
self.P1DG_2d = get_functionspace(mesh2d, "DG", 1)
self.P1_2d = get_functionspace(mesh2d, "CG", 1)
self.R_1d = get_functionspace(mesh2d, "R", 0)
self.P1v_2d = VectorFunctionSpace(mesh2d, "CG", 1)
self.n = FacetNormal(mesh2d)
# define parameters
self.g = physical_constants['g_grav']
self.rhow = physical_constants['rho0']
kappa = physical_constants['von_karman']
ksp = Function(self.P1_2d).interpolate(3*self.average_size)
self.a = Function(self.P1_2d).interpolate(self.bed_reference_height/2)
if self.options.sediment_model_options.morphological_viscosity is None:
self.viscosity = self.options.horizontal_viscosity
else:
self.viscosity = self.options.sediment_model_options.morphological_viscosity
# magnitude slope effect parameter
self.beta = self.options.sediment_model_options.slope_effect_parameter
# angle correction slope effect parameters
self.surbeta2 = self.options.sediment_model_options.slope_effect_angle_parameter
# secondary current parameter
self.alpha_secc = self.options.sediment_model_options.secondary_current_parameter
# calculate critical shields parameter thetacr
self.R = self.rhos/self.rhow - Constant(1)
self.dstar = Function(self.P1_2d).interpolate(self.average_size*((self.g*self.R)/(self.viscosity**2))**(1/3))
if float(max(self.dstar.dat.data[:])) < 1:
raise ValueError('dstar value less than 1')
self.thetacr = Function(self.P1_2d).interpolate(conditional(self.dstar < 4, 0.24*(self.dstar**(-1)),
conditional(self.dstar < 10, 0.14*(self.dstar**(-0.64)),
conditional(self.dstar < 20, 0.04*(self.dstar**(-0.1)),
conditional(self.dstar < 150, 0.013*(self.dstar**(0.29)), 0.055)))))
# critical bed shear stress
self.taucr = Function(self.P1_2d).interpolate((self.rhos-self.rhow)*self.g*self.average_size*self.thetacr)
# calculate settling velocity
self.settling_velocity = Function(self.P1_2d).interpolate(conditional(self.average_size <= 1e-04,
self.g*(self.average_size**2)*self.R/(18*self.viscosity),
conditional(self.average_size <= 1e-03, (10*self.viscosity/self.average_size)
* (sqrt(1 + 0.01*((self.R*self.g*(self.average_size**3))
/ (self.viscosity**2)))-1), 1.1*sqrt(self.g*self.average_size*self.R))))
# first step: project velocity to CG
self.uv_cg = Function(self.P1v_2d).project(self.uv)
self.old_bathymetry_2d = Function(self.P1_2d).interpolate(self.depth.bathymetry_2d)
self.depth_tot = Function(self.P1_2d).project(self.depth.get_total_depth(self.elev))
self.u = self.uv_cg[0]
self.v = self.uv_cg[1]
# define bed friction
hc = conditional(self.depth_tot > Constant(0.001), self.depth_tot, Constant(0.001))
aux = conditional(11.036*hc/self.bed_reference_height > Constant(1.001),
11.036*hc/self.bed_reference_height, Constant(1.001))
self.qfc = Constant(2)/(ln(aux)/kappa)**2
# skin friction coefficient
cfactor = conditional(self.depth_tot > ksp, Constant(2)
* (((1/kappa)*ln(11.036*self.depth_tot/ksp))**(-2)), Constant(0.0))
# mu - ratio between skin friction and normal friction
self.mu = conditional(self.qfc > Constant(0), cfactor/self.qfc, Constant(0))
# calculate bed shear stress
self.unorm = (self.u**2) + (self.v**2)
self.bed_stress = Function(self.P1_2d).interpolate(self.rhow*Constant(0.5)*self.qfc*self.unorm)
if self.solve_suspended_sediment:
# deposition flux - calculating coefficient to account for stronger conc at bed
self.B = conditional(self.a > self.depth_tot, Constant(1.0), self.a/self.depth_tot)
ustar = sqrt(Constant(0.5)*self.qfc*self.unorm)
self.rouse_number = (self.settling_velocity/(kappa*ustar)) - Constant(1)
self.intermediate_step = conditional(abs(self.rouse_number) > Constant(1e-04),
self.B*(Constant(1)-self.B**min_value(self.rouse_number, Constant(3)))/min_value(self.rouse_number,
Constant(3)), -self.B*ln(self.B))
self.integrated_rouse = max_value(conditional(self.intermediate_step > Constant(1e-12), Constant(1)/self.intermediate_step,
Constant(1e12)), Constant(1))
# erosion flux - above critical velocity bed is eroded
self.transport_stage_param = conditional(self.rhow*Constant(0.5)*self.qfc*self.unorm*self.mu > Constant(0),
(self.rhow*Constant(0.5)*self.qfc*self.unorm*self.mu - self.taucr)/self.taucr,
Constant(-1))
self.erosion_concentration = Function(self.P1DG_2d).project(Constant(0.015)*(self.average_size/self.a)
* ((max_value(self.transport_stage_param, Constant(0)))**1.5)
/ (self.dstar**0.3))
if self.use_advective_velocity_correction:
self.correction_factor_model = CorrectiveVelocityFactor(self.depth_tot, ksp,
self.settling_velocity, ustar, self.a)
self.velocity_correction_factor = self.correction_factor_model.velocity_correction_factor
self.equilibrium_tracer = Function(self.P1DG_2d).interpolate(self.erosion_concentration/self.integrated_rouse)
# get individual terms
self._deposition = self.settling_velocity*self.integrated_rouse
self._erosion = self.settling_velocity*self.erosion_concentration
if self.use_bedload:
# calculate angle of flow
self.calfa = Function(self.P1_2d).interpolate(self.uv_cg[0]/sqrt(self.unorm))
self.salfa = Function(self.P1_2d).interpolate(self.uv_cg[1]/sqrt(self.unorm))
if self.use_angle_correction:
# slope effect angle correction due to gravity
self.stress = Function(self.P1DG_2d).interpolate(self.rhow*Constant(0.5)*self.qfc*self.unorm)
[docs]
def get_bedload_term(self, bathymetry):
"""
Returns expression for bedload transport :math:`(qbx, qby)` to be used in the Exner equation.
Note bathymetry is the function which is solved for in the exner equation.
:arg bathymetry: Bathymetry of the domain. Bathymetry stands for
the bedlevel (positive downwards).
"""
# define bed gradient
dzdx = self.old_bathymetry_2d.dx(0)
dzdy = self.old_bathymetry_2d.dx(1)
if self.use_slope_mag_correction:
# slope effect magnitude correction due to gravity where beta is a parameter normally set to 1.3
# we use z_n1 and equals so that we can use an implicit method in Exner
slopecoef = Constant(1) + self.beta*(bathymetry.dx(0)*self.calfa + bathymetry.dx(1)*self.salfa)
else:
slopecoef = Constant(1.0)
if self.use_angle_correction:
# slope effect angle correction due to gravity
cparam = Function(self.P1_2d).interpolate((self.rhos-self.rhow)*self.g*self.average_size*(self.surbeta2**2))
tt1 = conditional(self.stress > Constant(1e-10), sqrt(cparam/self.stress), sqrt(cparam/Constant(1e-10)))
# add on a factor of the bed gradient to the normal
aa = self.salfa + tt1*dzdy
bb = self.calfa + tt1*dzdx
comb = sqrt(aa**2 + bb**2)
angle_norm = conditional(comb > Constant(1e-10), comb, Constant(1e-10))
# we use z_n1 and equals so that we can use an implicit method in Exner
calfamod = (self.calfa + (tt1*bathymetry.dx(0)))/angle_norm
salfamod = (self.salfa + (tt1*bathymetry.dx(1)))/angle_norm
if self.use_secondary_current:
# accounts for helical flow effect in a curver channel
# use z_n1 and equals so can use an implicit method in Exner
free_surface_dx = self.depth_tot.dx(0) - bathymetry.dx(0)
free_surface_dy = self.depth_tot.dx(1) - bathymetry.dx(1)
velocity_slide = (self.u*free_surface_dy)-(self.v*free_surface_dx)
tandelta_factor = Constant(7)*self.g*self.rhow*self.depth_tot*self.qfc\
/ (Constant(2)*self.alpha_secc*((self.u**2) + (self.v**2)))
# accounts for helical flow effect in a curver channel
if self.use_angle_correction:
# if angle has already been corrected we must alter the corrected angle to obtain the corrected secondary current angle
t_1 = (self.bed_stress*slopecoef*calfamod) + (self.v*tandelta_factor*velocity_slide)
t_2 = (self.bed_stress*slopecoef*salfamod) - (self.u*tandelta_factor*velocity_slide)
else:
t_1 = (self.bed_stress*slopecoef*self.calfa) + (self.v*tandelta_factor*velocity_slide)
t_2 = ((self.bed_stress*slopecoef*self.salfa) - (self.u*tandelta_factor*velocity_slide))
# calculated to normalise the new angles
t4 = sqrt((t_1**2) + (t_2**2))
# updated magnitude correction and angle corrections
slopecoef_secc = t4/self.bed_stress
calfanew = t_1/t4
salfanew = t_2/t4
# implement meyer-peter-muller bedload transport formula
thetaprime = self.mu*(self.rhow*Constant(0.5)*self.qfc*self.unorm)/((self.rhos-self.rhow)*self.g*self.average_size)
# if velocity above a certain critical value then transport occurs
phi = conditional(thetaprime < self.thetacr, Constant(0), Constant(8)*(thetaprime-self.thetacr)**1.5)
# bedload transport flux with magnitude correction
if self.use_secondary_current:
qb_total = slopecoef_secc*phi*sqrt(self.g*self.R*self.average_size**3)
else:
qb_total = slopecoef*phi*sqrt(self.g*self.R*self.average_size**3)
# formulate bedload transport flux with correct angle depending on corrections implemented
if self.use_angle_correction and self.use_secondary_current is False:
qbx = qb_total*calfamod
qby = qb_total*salfamod
elif self.use_secondary_current:
qbx = qb_total*calfanew
qby = qb_total*salfanew
else:
qbx = qb_total*self.calfa
qby = qb_total*self.salfa
return qbx, qby
[docs]
def get_sediment_slide_term(self, bathymetry):
# add component to bedload transport to ensure the slope angle does not exceed a certain value
# maximum gradient allowed by sediment slide mechanism
self.tanphi = tan(self.options.sediment_model_options.max_angle*pi/180)
# approximate mesh step size for sediment slide mechanism
L = self.options.sediment_model_options.sed_slide_length_scale
degree_h = self.P1_2d.ufl_element().degree()
if degree_h == 0:
self.sigma = 1.5 / CellSize(self.mesh2d)
else:
self.sigma = 5.0*degree_h*(degree_h + 1)/CellSize(self.mesh2d)
# define bed gradient
x, y = SpatialCoordinate(self.mesh2d)
if self.options.sediment_model_options.slide_region is not None:
dzdx = self.options.sediment_model_options.slide_region*bathymetry.dx(0)
dzdy = self.options.sediment_model_options.slide_region*bathymetry.dx(1)
else:
dzdx = bathymetry.dx(0)
dzdy = bathymetry.dx(1)
# calculate normal to the bed
nz = 1/sqrt(1 + (dzdx**2 + dzdy**2))
self.betaangle = asin(sqrt(1 - (nz**2)))
self.tanbeta = sqrt(1 - (nz**2))/nz
morfac = self.options.sediment_model_options.morphological_acceleration_factor
# calculating magnitude of added component
qaval = conditional(self.tanbeta - self.tanphi > 0, (1-self.options.sediment_model_options.porosity)
* 0.5*(L**2)*(self.tanbeta - self.tanphi)/(cos(self.betaangle*self.options.timestep
* morfac)), 0)
# multiplying by direction
alphaconst = conditional(sqrt(1 - (nz**2)) > 0, - qaval*(nz**2)/sqrt(1 - (nz**2)), 0)
diff_tensor = as_matrix([[alphaconst, 0, ], [0, alphaconst, ]])
return diff_tensor
[docs]
def get_deposition_coefficient(self):
"""Returns coefficient :math:`C` such that :math:`C/H*sediment` is deposition term in sediment equation
If sediment field is depth-averaged, :math:`C*sediment` is (total) deposition (over the column)
as it appears in the Exner equation, but deposition term in sediment equation needs
averaging: :math:`C*sediment/H`
If sediment field is depth-integrated, :math:`C*sediment/H` is (total) deposition (over the column)
as it appears in the Exner equation, and is the same in the sediment equation."""
return self._deposition
[docs]
def get_erosion_term(self):
"""Returns expression for (depth-integrated) erosion."""
return self._erosion
[docs]
def get_equilibrium_tracer(self):
"""Returns expression for (depth-averaged) equilibrium tracer."""
return self.equilibrium_tracer
[docs]
def get_advective_velocity_correction_factor(self):
"""Returns correction factor for the advective velocity in the sediment equations
With :attr:`.SedimentModelOptions.use_advective_velocity_correction`, this applies a correction to
the supplied velocity solution `uv` to take into account the mismatch between
depth-averaged product of velocity with sediment and product of depth-averaged
velocity with depth-averaged sediment.
"""
if self.use_advective_velocity_correction:
return self.velocity_correction_factor
else:
return 1
[docs]
def update(self):
"""Update all functions used by :class:`.SedimentModel`
This repeats all projection and interpolations steps based on the current values
of the `uv` and `elev` functions, provided in __init__."""
self.uv_cg.project(self.uv)
self.old_bathymetry_2d.interpolate(self.depth.bathymetry_2d)
self.depth_tot.project(self.depth.get_total_depth(self.elev))
self.bed_stress.interpolate(self.rhow*Constant(0.5)*self.qfc*self.unorm)
if self.solve_suspended_sediment:
self.erosion_concentration.project(Constant(0.015)*(self.average_size/self.a)
* ((max_value(self.transport_stage_param, Constant(0)))**1.5)
/ (self.dstar**0.3))
self.equilibrium_tracer.interpolate(self.erosion_concentration/self.integrated_rouse)
if self.use_bedload:
# calculate angle of flow
self.calfa.interpolate(self.uv_cg[0]/sqrt(self.unorm))
self.salfa.interpolate(self.uv_cg[1]/sqrt(self.unorm))
if self.use_angle_correction:
# slope effect angle correction due to gravity
self.stress.interpolate(self.rhow*Constant(0.5)*self.qfc*self.unorm)
if self.use_advective_velocity_correction:
self.correction_factor_model.update()