"""
Classes related to tidal turbine farms in Thetis.
"""
from firedrake import *
from firedrake.petsc import PETSc
from .log import *
from .callback import DiagnosticCallback
from .optimisation import DiagnosticOptimisationCallback
import pyadjoint
import numpy
[docs]
class TidalTurbine:
def __init__(self, options, upwind_correction=False):
self.diameter = options.diameter
self.C_support = options.C_support
self.A_support = options.A_support
self.upwind_correction = upwind_correction
def _thrust_area(self, uv):
C_T = self.thrust_coefficient(uv)
A_T = pi * self.diameter**2 / 4
fric = C_T * A_T
if self.C_support:
fric += self.C_support * self.A_support
return fric
[docs]
def velocity_correction(self, uv, depth):
fric = self._thrust_area(uv)
if self.upwind_correction:
return 0.5*(1+sqrt(1-fric/(self.diameter*depth)))
else:
return 1
[docs]
def friction_coefficient(self, uv, depth):
thrust_area = self._thrust_area(uv)
alpha = self.velocity_correction(uv, depth)
return thrust_area/2./alpha**2
[docs]
def power(self, uv, depth):
# ratio of discrete to upstream velocity (NOTE: should include support drag!)
alpha = self.velocity_correction(uv, depth)
A_T = pi * self.diameter**2 / 4
uv3 = dot(uv, uv)**1.5 / alpha**3 # upwind cubed velocity
C_T = self.thrust_coefficient(uv3**(1/3))
# this assumes the velocity through the turbine does not change due to the support (is this correct?)
return 0.25*C_T*A_T*(1+sqrt(1-C_T))*uv3
[docs]
class ConstantThrustTurbine(TidalTurbine):
def __init__(self, options, upwind_correction=False):
super().__init__(options, upwind_correction=upwind_correction)
self.C_T = options.thrust_coefficient
[docs]
def thrust_coefficient(self, uv):
return self.C_T
[docs]
def linearly_interpolate_table(x_list, y_list, y_final, x):
"""Return UFL expression that linearly interpolates between y-values in x-points
:param x_list: (1D) x-points
:param y_list: y-values in those points
:param y_final: value for x>x_list[-1]
:param x: point to interpolate (assumed x>x_list[0])
"""
# below x1, interpolate between x0 and x1:
below_x1 = ((x_list[1]-x)*y_list[0] + (x-x_list[0])*y_list[1])/(x_list[1]-x_list[0])
# above x1, interpolate from rest of the table, or take final value:
if len(x_list) > 2:
above_x1 = linearly_interpolate_table(x_list[1:], y_list[1:], y_final, x)
else:
above_x1 = y_final
return conditional(x < x_list[1], below_x1, above_x1)
[docs]
class TabulatedThrustTurbine(TidalTurbine):
def __init__(self, options, upwind_correction=False):
super().__init__(options, upwind_correction=upwind_correction)
self.C_T = options.thrust_coefficients
self.speeds = options.thrust_speeds
if not len(self.C_T) == len(self.speeds):
raise ValueError("In tabulated thrust curve the number of thrust coefficients and speed values should be the same.")
[docs]
def thrust_coefficient(self, uv):
umag = dot(uv, uv)**0.5
return conditional(umag < self.speeds[0], 0, linearly_interpolate_table(self.speeds, self.C_T, 0, umag))
[docs]
class TidalTurbineFarm:
def __init__(self, turbine_density, dx, options):
"""
:arg turbine_density: turbine distribution density field or expression
:arg dx: measure to integrate power output, n/o turbines
:arg options: a :class:`TidalTurbineFarmOptions` options dictionary
"""
upwind_correction = getattr(options, 'upwind_correction', False)
if options.turbine_type == 'constant':
self.turbine = ConstantThrustTurbine(options.turbine_options, upwind_correction=upwind_correction)
elif options.turbine_type == 'table':
self.turbine = TabulatedThrustTurbine(options.turbine_options, upwind_correction=upwind_correction)
self.dx = dx
self.turbine_density = turbine_density
self.break_even_wattage = options.break_even_wattage
[docs]
def number_of_turbines(self):
return assemble(self.turbine_density * self.dx)
[docs]
def power_output(self, uv, depth):
return assemble(self.turbine.power(uv, depth) * self.turbine_density * self.dx)
[docs]
def friction_coefficient(self, uv, depth):
return self.turbine.friction_coefficient(uv, depth)
[docs]
class DiscreteTidalTurbineFarm(TidalTurbineFarm):
"""
Class that can be used for the addition of turbines in the turbine density field
"""
def __init__(self, mesh, dx, options):
"""
:arg mesh: mesh domain
:arg dx: measure to integrate power output, n/o turbines
:arg options: a :class:`TidalTurbineFarmOptions` options dictionary
"""
# Preliminaries
self.mesh = mesh
# this sets self.turbine_expr=0
super().__init__(0, dx, options)
# Adding turbine distribution in the domain
self.add_turbines(options.turbine_coordinates)
[docs]
def add_turbines(self, coordinates):
"""
:param coords: Array with turbine coordinates to be positioned
:param function: turbine density function to be adapted
:param mesh: computational mesh domain
:param radius: radius where the bump will be applied
:return: updated turbine density field
"""
x = SpatialCoordinate(self.mesh)
radius = self.turbine.diameter * 0.5
for coord in coordinates:
dx0 = (x[0] - coord[0])/radius
dx1 = (x[1] - coord[1])/radius
psi_x = conditional(lt(abs(dx0), 1), exp(1-1/(1-dx0**2)), 0)
psi_y = conditional(lt(abs(dx1), 1), exp(1-1/(1-dx1**2)), 0)
bump = psi_x * psi_y
unit_bump_integral = 1.45661 # integral of bump function for radius=1 (copied from OpenTidalFarm who used Wolfram)
self.turbine_density = self.turbine_density + bump/(radius**2 * unit_bump_integral)
[docs]
class TurbineFunctionalCallback(DiagnosticCallback):
"""
:class:`.DiagnosticCallback` that evaluates the performance of each tidal turbine farm."""
name = 'turbine' # this name will be used in the hdf5 file
variable_names = ['current_power', 'average_power', 'average_profit']
@PETSc.Log.EventDecorator("thetis.TurbineFunctionalCallback.__init__")
def __init__(self, solver_obj, **kwargs):
"""
:arg solver_obj: a :class:`.FlowSolver2d` object containing the tidal_turbine_farms
:arg kwargs: see :class:`DiagnosticCallback`"""
if not hasattr(solver_obj, 'tidal_farms'):
solver_obj.create_equations()
self.farms = solver_obj.tidal_farms
nfarms = len(self.farms)
super().__init__(solver_obj, array_dim=nfarms, **kwargs)
self.uv, eta = split(solver_obj.fields.solution_2d)
self.dt = solver_obj.options.timestep
self.depth = solver_obj.fields.bathymetry_2d
self.cost = [farm.number_of_turbines() for farm in self.farms]
if self.append_to_log:
print_output('Number of turbines = {}'.format(sum(self.cost)))
self.break_even_wattage = [farm.break_even_wattage for farm in self.farms]
# time-integrated quantities:
self.integrated_power = [0] * nfarms
self.average_power = [0] * nfarms
self.average_profit = [0] * nfarms
self.time_period = 0.
def _evaluate_timestep(self):
"""Perform time integration and return current power and time-averaged power and profit."""
self.time_period = self.time_period + self.dt
current_power = []
for i, farm in enumerate(self.farms):
power = farm.power_output(self.uv, self.depth)
current_power.append(power)
self.integrated_power[i] += power * self.dt
self.average_power[i] = self.integrated_power[i] / self.time_period
self.average_profit[i] = self.average_power[i] - self.break_even_wattage[i] * self.cost[i]
return current_power, self.average_power, self.average_profit
def __call__(self):
return self._evaluate_timestep()
[docs]
def message_str(self, current_power, average_power, average_profit):
return 'Current power, average power and profit for each farm: {}, {}, {}'.format(current_power, average_power, average_profit)
[docs]
class TurbineOptimisationCallback(DiagnosticOptimisationCallback):
"""
:class:`DiagnosticOptimisationCallback` that evaluates the performance of each tidal turbine farm during an optimisation.
See the :py:mod:`optimisation` module for more info about the use of OptimisationCallbacks."""
name = 'farm_optimisation'
variable_names = ['cost', 'average_power', 'average_profit']
def __init__(self, solver_obj, turbine_functional_callback, **kwargs):
"""
:arg solver_obj: a :class:`.FlowSolver2d` object
:arg turbine_functional_callback: a :class:`.TurbineFunctionalCallback` used in the forward model
:args kwargs: see :class:`.DiagnosticOptimisationCallback`"""
self.tfc = turbine_functional_callback
super().__init__(solver_obj, **kwargs)
[docs]
def compute_values(self, *args):
costs = [x.block_variable.saved_output for x in self.tfc.cost]
powers = [x.block_variable.saved_output for x in self.tfc.average_power]
profits = [x.block_variable.saved_output for x in self.tfc.average_profit]
return costs, powers, profits
[docs]
def message_str(self, cost, average_power, average_profit):
return 'Costs, average power and profit for each farm: {}, {}, {}'.format(cost, average_power, average_profit)
[docs]
class MinimumDistanceConstraints(pyadjoint.InequalityConstraint):
"""This class implements minimum distance constraints between turbines.
.. note:: This class subclasses ``pyadjoint.InequalityConstraint``. The
following methods must be implemented:
* ``length(self)``
* ``function(self, m)``
* ``jacobian(self, m)``
"""
def __init__(self, turbine_positions, minimum_distance):
"""Create MinimumDistanceConstraints
:param turbine_positions: list of [x,y] where x and y are either float or Constant
:param minimum_distance: The minimum distance allowed between turbines.
"""
self._turbines = [float(xi) for xy in turbine_positions for xi in xy]
self._minimum_distance = minimum_distance
self._nturbines = len(turbine_positions)
[docs]
def length(self):
"""Returns the number of constraints ``len(function(m))``."""
return int(self._nturbines*(self._nturbines-1)/2)
[docs]
def function(self, m):
"""Return an object which must be positive for the point to be feasible.
:param m: The serialized paramaterisation of the turbines.
:type m: numpy.ndarray.
:returns: numpy.ndarray -- each entry must be positive for the positions to be
feasible.
"""
print_output("Calculating minimum distance constraints.")
inequality_constraints = []
for i in range(self._nturbines):
for j in range(self._nturbines):
if i <= j:
continue
inequality_constraints.append((m[2*i]-m[2*j])**2 + (m[2*i+1]-m[2*j+1])**2 - self._minimum_distance**2)
inequality_constraints = numpy.array(inequality_constraints)
if any(inequality_constraints <= 0):
print_output(
"Minimum distance inequality constraints (should all "
"be > 0): %s" % inequality_constraints)
return inequality_constraints
[docs]
def jacobian(self, m):
"""Returns the gradient of the constraint function.
Return a list of vector-like objects representing the gradient of the
constraint function with respect to the parameter m.
:param m: The serialized paramaterisation of the turbines.
:type m: numpy.ndarray.
:returns: numpy.ndarray -- the gradient of the constraint function with
respect to each input parameter m.
"""
print_output("Calculating gradient of equality constraint")
grad_h = numpy.zeros((self.length(), self._nturbines*2))
row = 0
for i in range(self._nturbines):
for j in range(self._nturbines):
if i <= j:
continue
grad_h[row, 2*i] = 2*(m[2*i] - m[2*j])
grad_h[row, 2*j] = -2*(m[2*i] - m[2*j])
grad_h[row, 2*i+1] = 2*(m[2*i+1] - m[2*j+1])
grad_h[row, 2*j+1] = -2*(m[2*i+1] - m[2*j+1])
row += 1
return grad_h