Source code for thetis.timeintegrator

"""
Generic time integration schemes to advance equations in time.
"""
from .utility import *
from abc import ABC, abstractmethod
import numpy
from pyop2.profiling import timed_region, timed_stage

CFL_UNCONDITIONALLY_STABLE = numpy.inf
# CFL coefficient for unconditionally stable methods


[docs] class TimeIntegratorBase(ABC): """ Abstract class that defines the API for all time integrators Both :class:`TimeIntegrator` and :class:`CoupledTimeIntegrator` inherit from this class. """
[docs] @abstractmethod def advance(self, t, update_forcings=None): """ Advances equations for one time step :arg t: simulation time :type t: float :arg update_forcings: user-defined function that takes the simulation time and updates any time-dependent boundary conditions """ pass
[docs] @abstractmethod def initialize(self, init_solution): """ Initialize the time integrator :arg init_solution: initial solution """ pass
[docs] class TimeIntegrator(TimeIntegratorBase): """ Base class for all time integrator objects that march a single equation """ def __init__(self, equation, solution, fields, dt, options): """ :arg equation: the equation to solve :type equation: :class:`Equation` object :arg solution: :class:`Function` where solution will be stored :arg fields: Dictionary of fields that are passed to the equation :type fields: dict of :class:`Function` or :class:`Constant` objects :arg float dt: time step in seconds :arg options: :class:`TimeStepperOptions` instance containing parameter values """ super(TimeIntegrator, self).__init__() self.equation = equation self.solution = solution self.fields = fields self.dt = dt # FIXME this might not be correctly updated ... self.dt_const = Constant(dt) # unique identifier for solver self.name = '-'.join([self.__class__.__name__, self.equation.__class__.__name__]) self.ad_block_tag = options.ad_block_tag or self.name self.solver_parameters = options.solver_parameters
[docs] def set_dt(self, dt): """Update time step""" self.dt = dt self.dt_const.assign(dt)
[docs] def advance_picard(self, t, update_forcings=None, update_lagged=True, update_fields=True): """ Advances equations for one time step within a Picard iteration :arg t: simulation time :type t: float :arg update_forcings: user-defined function that takes the simulation time and updates any time-dependent boundary conditions :kwarg update_lagged: should the old solution be updated? :kwarg update_fields: should the fields be updated? """ raise NotImplementedError(f"Picard iterations are not supported for {self} time integrators.")
[docs] def create_fields_old(self): """ Create a copy of fields dictionary to store beginning of timestep values """ # create functions to hold the values of previous time step self.fields_old = {} for k in sorted(self.fields): if self.fields[k] is not None: if isinstance(self.fields[k], Function): self.fields_old[k] = Function( self.fields[k].function_space()) elif isinstance(self.fields[k], Constant): # Although Constants may be changed by the user, we just # use the same value here, as assigning to domain-less Constants # causes issues in the adjoint. Constants with a domain, created by Constaint(.., domain=...), # are now just Functions on the Real space, so are dealt with under isinstance(..., Function) self.fields_old[k] = self.fields[k]
[docs] def update_fields_old(self): """ Update the values of fields_old with those of fields """ for k in sorted(self.fields): if isinstance(self.fields[k], Function): self.fields_old[k].assign(self.fields[k])
[docs] class ForwardEuler(TimeIntegrator): """Standard forward Euler time integration scheme.""" cfl_coeff = 1.0 @PETSc.Log.EventDecorator("thetis.ForwardEuler.__init__") def __init__(self, equation, solution, fields, dt, options, bnd_conditions): """ :arg equation: the equation to solve :type equation: :class:`Equation` object :arg solution: :class:`Function` where solution will be stored :arg fields: Dictionary of fields that are passed to the equation :type fields: dict of :class:`Function` or :class:`Constant` objects :arg float dt: time step in seconds :arg options: :class:`TimeStepperOptions` instance containing parameter values. :arg dict bnd_conditions: Dictionary of boundary conditions passed to the equation """ super(ForwardEuler, self).__init__(equation, solution, fields, dt, options) self.solution_old = Function(self.equation.function_space) self.create_fields_old() u_old = self.solution_old u_tri = self.equation.trial self.A = self.equation.mass_term(u_tri) self.L = (self.equation.mass_term(u_old) + self.dt_const*self.equation.residual('all', u_old, u_old, self.fields_old, self.fields_old, bnd_conditions) ) self.update_solver()
[docs] @PETSc.Log.EventDecorator("thetis.ForwardEuler.update_solver") def update_solver(self): prob = LinearVariationalProblem(self.A, self.L, self.solution) self.solver = LinearVariationalSolver(prob, options_prefix=self.name, solver_parameters=self.solver_parameters, ad_block_tag=self.ad_block_tag)
[docs] @PETSc.Log.EventDecorator("thetis.ForwardEuler.initialize") def initialize(self, solution): """Assigns initial conditions to all required fields.""" self.solution_old.assign(solution) self.update_fields_old()
[docs] @PETSc.Log.EventDecorator("thetis.ForwardEuler.advance") def advance(self, t, update_forcings=None): """Advances equations for one time step.""" if update_forcings is not None: update_forcings(t + self.dt) self.solution_old.assign(self.solution) self.solver.solve() self.update_fields_old()
[docs] class CrankNicolson(TimeIntegrator): """Standard Crank-Nicolson time integration scheme.""" cfl_coeff = CFL_UNCONDITIONALLY_STABLE @PETSc.Log.EventDecorator("thetis.CrankNicolson.__init__") def __init__(self, equation, solution, fields, dt, options, bnd_conditions): """ :arg equation: the equation to solve :type equation: :class:`Equation` object :arg solution: :class:`Function` where solution will be stored :arg fields: Dictionary of fields that are passed to the equation :type fields: dict of :class:`Function` or :class:`Constant` objects :arg float dt: time step in seconds :arg options: :class:`TimeStepperOptions` instance containing parameter values. :arg dict bnd_conditions: Dictionary of boundary conditions passed to the equation """ super(CrankNicolson, self).__init__(equation, solution, fields, dt, options) theta = options.implicitness_theta semi_implicit = options.use_semi_implicit_linearization if semi_implicit: self.solver_parameters.setdefault('snes_type', 'ksponly') else: self.solver_parameters.setdefault('snes_type', 'newtonls') self.solution_old = Function(self.equation.function_space, name='solution_old') self.create_fields_old() u = self.solution u_old = self.solution_old if semi_implicit: # linearize around last timestep using the fact that all terms are # written in the form A(u_nl) u u_nl = u_old else: # solve the full nonlinear residual form u_nl = u bnd = bnd_conditions f = self.fields f_old = self.fields_old # Crank-Nicolson theta_const = Constant(theta) self.F = (self.equation.mass_term(u) - self.equation.mass_term(u_old) - self.dt_const*(theta_const*self.equation.residual('all', u, u_nl, f, f, bnd) + (1-theta_const)*self.equation.residual('all', u_old, u_old, f_old, f_old, bnd)) ) self.update_solver()
[docs] @PETSc.Log.EventDecorator("thetis.CrankNicolson.update_solver") def update_solver(self): """Create solver objects""" # Ensure LU assembles monolithic matrices if self.solver_parameters.get('pc_type') == 'lu': self.solver_parameters['mat_type'] = 'aij' prob = NonlinearVariationalProblem(self.F, self.solution) self.solver = NonlinearVariationalSolver(prob, solver_parameters=self.solver_parameters, options_prefix=self.name, ad_block_tag=self.ad_block_tag)
[docs] @PETSc.Log.EventDecorator("thetis.CrankNicolson.initialize") def initialize(self, solution): """Assigns initial conditions to all required fields.""" self.solution_old.assign(solution) self.update_fields_old()
[docs] @PETSc.Log.EventDecorator("thetis.CrankNicolson.advance") def advance(self, t, update_forcings=None): """Advances equations for one time step.""" if update_forcings is not None: update_forcings(t + self.dt) self.solution_old.assign(self.solution) self.solver.solve() self.update_fields_old()
[docs] @PETSc.Log.EventDecorator("thetis.CrankNicolson.advance_picard") def advance_picard(self, t, update_forcings=None, update_lagged=True, update_fields=True): """Advances equations for one time step in a Picard iteration.""" if update_forcings is not None: update_forcings(t + self.dt) if update_lagged: self.solution_old.assign(self.solution) self.solver.solve() if update_fields: self.update_fields_old()
[docs] class SteadyState(TimeIntegrator): """ Time integrator that solves the steady state equations, leaving out the mass terms """ cfl_coeff = CFL_UNCONDITIONALLY_STABLE @PETSc.Log.EventDecorator("thetis.SteadyState.__init__") def __init__(self, equation, solution, fields, dt, options, bnd_conditions): """ :arg equation: the equation to solve :type equation: :class:`Equation` object :arg solution: :class:`Function` where solution will be stored :arg fields: Dictionary of fields that are passed to the equation :type fields: dict of :class:`Function` or :class:`Constant` objects :arg float dt: time step in seconds :arg options: :class:`TimeStepperOptions` instance containing parameter values. :arg dict bnd_conditions: Dictionary of boundary conditions passed to the equation """ super(SteadyState, self).__init__(equation, solution, fields, dt, options) self.solver_parameters.setdefault('snes_type', 'newtonls') self.F = self.equation.residual('all', solution, solution, fields, fields, bnd_conditions) self.update_solver()
[docs] @PETSc.Log.EventDecorator("thetis.SteadyState.update_solver") def update_solver(self): """Create solver objects""" # Ensure LU assembles monolithic matrices if self.solver_parameters.get('pc_type') == 'lu': self.solver_parameters['mat_type'] = 'aij' prob = NonlinearVariationalProblem(self.F, self.solution) self.solver = NonlinearVariationalSolver(prob, solver_parameters=self.solver_parameters, options_prefix=self.name, ad_block_tag=self.ad_block_tag)
[docs] @PETSc.Log.EventDecorator("thetis.SteadyState.initialize") def initialize(self, solution): """Assigns initial conditions to all required fields.""" # nothing to do here as the initial condition is passed in via solution return
[docs] @PETSc.Log.EventDecorator("thetis.SteadyState.advance") def advance(self, t, update_forcings=None): """Advances equations for one time step.""" if update_forcings is not None: update_forcings(t + self.dt) self.solver.solve()
[docs] class PressureProjectionPicard(TimeIntegrator): """ Pressure projection scheme with Picard iteration for shallow water equations """ cfl_coeff = 1.0 # FIXME what is the right value? # TODO add more documentation @PETSc.Log.EventDecorator("thetis.PressureProjectionPicard.__init__") def __init__(self, equation, equation_mom, solution, fields, dt, options, bnd_conditions): """ :arg equation: free surface equation :type equation: :class:`Equation` object :arg equation_mom: momentum equation :type equation_mom: :class:`Equation` object :arg solution: :class:`Function` where solution will be stored :arg fields: Dictionary of fields that are passed to the equation :type fields: dict of :class:`Function` or :class:`Constant` objects :arg float dt: time step in seconds :arg options: :class:`TimeStepperOptions` instance containing parameter values. :arg dict bnd_conditions: Dictionary of boundary conditions passed to the equation """ super(PressureProjectionPicard, self).__init__(equation, solution, fields, dt, options) theta = options.implicitness_theta semi_implicit = options.use_semi_implicit_linearization solver_parameters = options.solver_parameters_pressure solver_parameters_mom = options.solver_parameters_momentum iterations = options.picard_iterations self.equation_mom = equation_mom self.solver_parameters_mom = {} if solver_parameters_mom is not None: self.solver_parameters_mom.update(solver_parameters_mom) if semi_implicit: # solve a preliminary linearized momentum equation before # solving the linearized wave equation terms in a coupled system self.solver_parameters.setdefault('snes_type', 'ksponly') self.solver_parameters_mom.setdefault('snes_type', 'ksponly') else: # not sure this combination makes much sense: keep both systems nonlinear self.solver_parameters.setdefault('snes_type', 'newtonls') self.solver_parameters_mom.setdefault('snes_type', 'newtonls') self.iterations = iterations self.solution_old = Function(self.equation.function_space) if iterations > 1: self.solution_lagged = Function(self.equation.function_space) else: self.solution_lagged = self.solution_old uv_lagged, eta_lagged = self.solution_lagged.subfunctions uv_old, eta_old = self.solution_old.subfunctions if (solver_parameters['ksp_type'] == 'preonly' and 'fieldsplit_H_2d' in solver_parameters and solver_parameters['fieldsplit_H_2d']['ksp_type'] == 'preonly' and solver_parameters['fieldsplit_H_2d']['pc_python_type'] == 'thetis.AssembledSchurPC' and element_continuity(eta_old.function_space().ufl_element()).horizontal != 'cg'): # the default settings use AssembledSchurPC which assumes that the velocity block is only a dg mass matrix # Under these assumptions this preconditioner assembles the exact Schur complement. If this is not true, we either need # iterations with the unassembled Schur complement (fieldsplit_H_2d_ksp_type), or alternatively iterations outside # the fieldsplit to deal with the fact that we haven't solved the Schur complement exactly. # Currently only the dg-cg element pair, gives a pure DG mass matrix velocity block: for dg-dg the pressure gradient adds a # Riemann term in the velocity block, for rt-dg the velocity block cannot be explicitly inverted either. raise Exception("The timestepper PressureProjectionPicard is only recommended in combination with the " "dg-cg element_family. If you want to use it in combination with dg-dg or rt-dg you need to adjust the solver_parameters_pressure option.") self.create_fields_old() # for the mom. eqn. the 'eta' field is just one of the 'other' fields fields_mom = self.fields.copy() fields_mom_old = self.fields_old.copy() fields_mom['eta'] = eta_lagged fields_mom_old['eta'] = eta_old # the velocity solved for in the preliminary mom. solve: self.uv_star = Function(self.equation_mom.function_space) if semi_implicit: uv_star_nl = uv_lagged solution_nl = self.solution_lagged else: uv_star_nl = self.uv_star solution_nl = self.solution # form for mom. eqn.: theta_const = Constant(theta) self.F_mom = ( self.equation_mom.mass_term(self.uv_star)-self.equation_mom.mass_term(uv_old) - self.dt_const*( theta_const*self.equation_mom.residual('all', self.uv_star, uv_star_nl, fields_mom, fields_mom, bnd_conditions) + (1-theta_const)*self.equation_mom.residual('all', uv_old, uv_old, fields_mom_old, fields_mom_old, bnd_conditions) ) ) # form for wave eqn. system: # M (u^n+1 - u^*) + G (eta^n+theta - eta_lagged) = 0 # M (eta^n+1 - eta^n) + C (u^n+theta) = 0 # the 'implicit' terms are the gradient (G) and divergence term (C) in the mom. and continuity eqn. resp. # where u^* is the velocity solved for in the mom. eqn., and G eta_lagged the gradient term in that eqn. uv_test, eta_test = split(self.equation.test) mass_term_star = inner(uv_test, self.uv_star)*dx + inner(eta_test, eta_old)*dx self.F = ( self.equation.mass_term(self.solution) - mass_term_star - self.dt_const*( theta_const*self.equation.residual('implicit', self.solution, solution_nl, self.fields, self.fields, bnd_conditions) + (1-theta_const)*self.equation.residual('implicit', self.solution_old, self.solution_old, self.fields_old, self.fields_old, bnd_conditions) ) ) # subtract G eta_lagged: G is the implicit term in the mom. eqn. for key in self.equation_mom.terms: if self.equation_mom.labels[key] == 'implicit': term = self.equation.terms[key] self.F += -self.dt_const*( - theta_const*term.residual(self.uv_star, eta_lagged, uv_star_nl, eta_lagged, self.fields, self.fields, bnd_conditions) - (1-theta_const)*term.residual(uv_old, eta_old, uv_old, eta_old, self.fields_old, self.fields_old, bnd_conditions) ) self.update_solver()
[docs] @PETSc.Log.EventDecorator("thetis.PressureProjectionPicard.update_solver") def update_solver(self): """Create solver objects""" prob = NonlinearVariationalProblem(self.F_mom, self.uv_star) self.solver_mom = NonlinearVariationalSolver(prob, solver_parameters=self.solver_parameters_mom, options_prefix=self.name+'_mom', ad_block_tag=self.ad_block_tag + '_mom') # Ensure LU assembles monolithic matrices if self.solver_parameters.get('pc_type') == 'lu': self.solver_parameters['mat_type'] = 'aij' prob = NonlinearVariationalProblem(self.F, self.solution) self.solver = NonlinearVariationalSolver(prob, appctx={'a': derivative(self.F, self.solution)}, solver_parameters=self.solver_parameters, options_prefix=self.name, ad_block_tag=self.ad_block_tag)
[docs] @PETSc.Log.EventDecorator("thetis.PressureProjectionPicard.initialize") def initialize(self, solution): """Assigns initial conditions to all required fields.""" self.solution_old.assign(solution) self.solution_lagged.assign(solution) self.update_fields_old()
[docs] @PETSc.Log.EventDecorator("thetis.PressureProjectionPicard.advance") def advance(self, t, update_forcings=None): """Advances equations for one time step.""" if update_forcings is not None: update_forcings(t + self.dt) self.solution_old.assign(self.solution) for it in range(self.iterations): if self.iterations > 1: self.solution_lagged.assign(self.solution) with timed_stage("Momentum solve"): self.solver_mom.solve() with timed_stage("Pressure solve"): self.solver.solve() self.update_fields_old()
[docs] class LeapFrogAM3(TimeIntegrator): """ Leap-Frog Adams-Moulton 3 ALE time integrator Defined in (2.27)-(2.30) in [1]; (2.21)-(2.22) in [2] [1] Shchepetkin and McWilliams (2005). The regional oceanic modeling system (ROMS): a split-explicit, free-surface, topography-following-coordinate oceanic model. Ocean Modelling, 9(4):347-404. http://dx.doi.org/10.1016/j.ocemod.2013.04.010 [2] Shchepetkin and McWilliams (2009). Computational Kernel Algorithms for Fine-Scale, Multiprocess, Longtime Oceanic Simulations, 14:121-183. http://dx.doi.org/10.1016/S1570-8659(08)01202-0 """ cfl_coeff = 1.5874 @PETSc.Log.EventDecorator("thetis.LeapFrogAM3.__init__") def __init__(self, equation, solution, fields, dt, options, bnd_conditions, terms_to_add='all'): """ :arg equation: equation to solve :type equation: :class:`Equation` object :arg solution: :class:`Function` where solution will be stored :arg fields: Dictionary of fields that are passed to the equation :type fields: dict of :class:`Function` or :class:`Constant` objects :arg float dt: time step in seconds :arg options: :class:`TimeStepperOptions` instance containing parameter values. :arg dict bnd_conditions: Dictionary of boundary conditions passed to the equation :kwarg terms_to_add: Defines which terms of the equation are to be added to this solver. Default 'all' implies ['implicit', 'explicit', 'source']. :type terms_to_add: 'all' or list of 'implicit', 'explicit', 'source'. """ super(LeapFrogAM3, self).__init__(equation, solution, fields, dt, options) self.gamma = 1./12. self.gamma_const = Constant(self.gamma) fs = self.equation.function_space self.solution_old = Function(fs, name='old solution') self.msolution_old = Function(fs, name='dual solution') self.rhs_func = Function(fs, name='rhs linear form') # fully explicit evaluation self.a = self.equation.mass_term(self.equation.trial) self.l = self.dt_const*self.equation.residual(terms_to_add, self.solution, self.solution, self.fields, self.fields, bnd_conditions) self.mass_new = inner(self.solution, self.equation.test)*dx self.mass_old = inner(self.solution_old, self.equation.test)*dx a = 0.5 - 2*self.gamma b = 0.5 + 2*self.gamma c = 1.0 - 2*self.gamma self.l_prediction = a*self.mass_old + b*self.mass_new + c*self.l self._nontrivial = self.l != 0
[docs] @PETSc.Log.EventDecorator("thetis.LeapFrogAM3.initialize") def initialize(self, solution): """Assigns initial conditions to all required fields.""" self.mass_matrix = assemble(self.a) self.solution.assign(solution) self.solution_old.assign(solution) assemble(self.mass_new, self.msolution_old) self.lin_solver = LinearSolver(self.mass_matrix)
# TODO: Linear solver is not annotated and does not accept ad_block_tag def _solve_system(self): """ Solves system mass_matrix*solution = rhs_func If the function space is fully discontinuous, the mass matrix is inverted in place for efficiency. """ self.lin_solver.solve(self.solution, self.rhs_func)
[docs] @PETSc.Log.EventDecorator("thetis.LeapFrogAM3.predice") def predict(self): r""" Prediction step from :math:`t_{n-1/2}` to :math:`t_{n+1/2}` Let :math:`M_n` denote the mass matrix at time :math:`t_{n}`. The prediction step is .. math:: T_{n-1/2} &= (1/2 - 2\gamma) T_{n-1} + (1/2 + 2 \gamma) T_{n} \\ M_n T_{n+1/2} &= M_n T_{n-1/2} + \Delta t (1 - 2\gamma) M_n L_{n} This is computed in a fixed mesh: all terms are evaluated in :math:`\Omega_n`. """ if self._nontrivial: with timed_region('lf_pre_asmb_sol'): assemble(self.mass_new, self.msolution_old) # store current solution with timed_region('lf_pre_asmb_rhs'): assemble(self.l_prediction, self.rhs_func) with timed_region('lf_pre_asgn_sol'): self.solution_old.assign(self.solution) # time shift with timed_region('lf_pre_solve'): self._solve_system()
[docs] def eval_rhs(self): if self._nontrivial: with timed_region('lf_cor_asmb_rhs'): assemble(self.l, self.rhs_func)
[docs] @PETSc.Log.EventDecorator("thetis.LeapFrogAM3.correct") def correct(self): r""" Correction step from :math:`t_{n}` to :math:`t_{n+1}` Let :math:`M_n` denote the mass matrix at time :math:`t_{n}`. The correction step is .. math:: M_{n+1} T_{n+1} = M_{n} T_{n} + \Delta t L_{n+1/2} This is Euler ALE step: LHS is evaluated in :math:`\Omega_{n+1}`, RHS in :math:`\Omega_n`. """ if self._nontrivial: # NOTE must call eval_rhs in the old mesh first with timed_region('lf_cor_incr_rhs'): self.rhs_func += self.msolution_old with timed_region('lf_cor_asmb_mat'): assemble(self.a, self.mass_matrix) with timed_region('lf_cor_solve'): self._solve_system()
[docs] @PETSc.Log.EventDecorator("thetis.LeapFrogAM3.advance") def advance(self, t, update_forcings=None): """Advances equations for one time step.""" if self._nontrivial: if update_forcings is not None: update_forcings(t + self.dt) self.predict() self.eval_rhs() self.correct()
[docs] class SSPRK22ALE(TimeIntegrator): r""" SSPRK(2,2) ALE time integrator for 3D fields The scheme is .. math:: u^{(1)} &= u^{n} + \Delta t F(u^{n}) \\ u^{n+1} &= u^{n} + \frac{\Delta t}{2}(F(u^{n}) + F(u^{(1)})) Both stages are implemented as ALE updates from geometry :math:`\Omega_n` to :math:`\Omega_{(1)}`, and :math:`\Omega_{n+1}`. """ cfl_coeff = 1.0 @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.__init__") def __init__(self, equation, solution, fields, dt, options, bnd_conditions, terms_to_add='all'): """ :arg equation: equation to solve :type equation: :class:`Equation` object :arg solution: :class:`Function` where solution will be stored :arg fields: Dictionary of fields that are passed to the equation :type fields: dict of :class:`Function` or :class:`Constant` objects :arg float dt: time step in seconds :arg options: :class:`TimeStepperOptions` instance containing parameter values. :arg dict bnd_conditions: Dictionary of boundary conditions passed to the equation :kwarg terms_to_add: Defines which terms of the equation are to be added to this solver. Default 'all' implies ['implicit', 'explicit', 'source']. :type terms_to_add: 'all' or list of 'implicit', 'explicit', 'source'. """ super(SSPRK22ALE, self).__init__(equation, solution, fields, dt, options) fs = self.equation.function_space self.mu = Function(fs, name='dual solution') self.mu_old = Function(fs, name='dual solution') self.tendency = Function(fs, name='tendency') # fully explicit evaluation self.a = self.equation.mass_term(self.equation.trial) self.l = self.dt_const*self.equation.residual(terms_to_add, self.solution, self.solution, self.fields, self.fields, bnd_conditions) self.mu_form = inner(self.solution, self.equation.test)*dx self._nontrivial = self.l != 0 self._initialized = False self.n_stages = 2 self.c = [0, 1]
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.initialize") def initialize(self, solution): """Assigns initial conditions to all required fields.""" self.solution.assign(solution) mass_matrix = assemble(self.a) self.lin_solver = LinearSolver(mass_matrix, solver_parameters=self.solver_parameters) # TODO: Linear solver is not annotated and does not accept ad_block_tag self._initialized = True
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.stage_one_prep") def stage_one_prep(self): """ Preprocess first stage: compute all forms on the old geometry """ if self._nontrivial: # Compute $Mu$ and assign $q_{old} = Mu$ with timed_region('pre1_asseble_mu'): assemble(self.mu_form, self.mu_old) # Evaluate $k = \Delta t F(u)$ with timed_region('pre1_asseble_f'): assemble(self.l, self.tendency) # $q = q_{old} + k$ with timed_region('pre1_incr_rhs'): self.mu.assign(self.mu_old + self.tendency)
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.stage_one_solve") def stage_one_solve(self): r""" First stage: solve :math:`u^{(1)}` given previous solution :math:`u^n`. This is a forward Euler ALE step between domains :math:`\Omega^n` and :math:`\Omega^{(1)}`: .. math:: \int_{\Omega^{(1)}} u^{(1)} \psi dx = \int_{\Omega^n} u^n \psi dx + \Delta t \int_{\Omega^n} F(u^n) \psi dx """ if self._nontrivial: # Solve $u = M^{-1}q$ with timed_region('sol1_assemble_A'): assemble(self.a, self.lin_solver.A) with timed_region('sol1_solve'): self.lin_solver.solve(self.solution, self.mu)
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.stage_two_prep") def stage_two_prep(self): """ Preprocess 2nd stage: compute all forms on the old geometry """ if self._nontrivial: # Evaluate $k = \Delta t F(u)$ with timed_region('pre2_asseble_f'): assemble(self.l, self.tendency) # $q = \frac{1}{2}q + \frac{1}{2}q_{old} + \frac{1}{2}k$ with timed_region('pre2_incr_rhs'): self.mu.assign(0.5*self.mu + 0.5*self.mu_old + 0.5*self.tendency)
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.stage_two_solve") def stage_two_solve(self): r""" 2nd stage: solve :math:`u^{n+1}` given previous solutions :math:`u^n, u^{(1)}`. This is an ALE step: .. math:: \int_{\Omega^{n+1}} u^{n+1} \psi dx &= \int_{\Omega^n} u^n \psi dx \\ &+ \frac{\Delta t}{2} \int_{\Omega^n} F(u^n) \psi dx \\ &+ \frac{\Delta t}{2} \int_{\Omega^{(1)}} F(u^{(1)}) \psi dx """ if self._nontrivial: # Solve $u = M^{-1}q$ with timed_region('sol2_assemble_A'): assemble(self.a, self.lin_solver.A) with timed_region('sol2_solve'): self.lin_solver.solve(self.solution, self.mu)
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.solve_stage") def solve_stage(self, i_stage): """Solves i-th stage""" if i_stage == 0: self.stage_one_solve() else: self.stage_two_solve()
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.prepare_stage") def prepare_stage(self, i_stage, t, update_forcings=None): """ Preprocess stage i_stage. This must be called prior to updating mesh geometry. """ if update_forcings is not None: update_forcings(t + self.c[i_stage]*self.dt) if i_stage == 0: self.stage_one_prep() else: self.stage_two_prep()
[docs] @PETSc.Log.EventDecorator("thetis.SSPRK22ALE.advance") def advance(self, t, update_forcings=None): """Advances equations for one time step.""" if not self._initialized: self.initialize(self.solution) for i_stage in range(self.n_stages): self.prepare_stage(i_stage, t, update_forcings) self.solve_stage(i_stage)