Skip to content

otf.syncd.utils

Utilities for running synchronized simulations and parameter updates.

This module provides helpers to run a System forward in time using a time_integration.BaseSolver and to perform periodic parameter updates (for example via a callable optimizer or an instance of optim.base.BaseOptimizer). The primary public API is run_update which returns the parameter history, relative errors, the times of updates, and the simulated true and assimilated trajectories (either for the final relaxation window or for the whole simulation when return_all=True).

Functions:

Name Description
run_update

Run system forward and perform periodic parameter updates.

run_update

run_update(
    system: System_ModelKnown,
    solver: BaseSolver,
    dt: float,
    T0: float,
    Tf: float,
    t_relax: float,
    true0: jndarray,
    assimilated0: jndarray,
    optimizer: Callable[[jndarray, jndarray], jndarray]
    | BaseOptimizer
    | None = None,
    lr_scheduler: LRScheduler = lr_scheduler.DummyLRScheduler(),
    t_begin_updates: float | None = None,
    return_all: bool = False,
) -> tuple[
    jndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray
]

Run system forward and perform periodic parameter updates.

The function advances system from time T0 toward Tf in blocks of approximately t_relax (rounded to multiples of dt) using solver. After each block it updates system.cs using optimizer (a callable or a optim.base.BaseOptimizer), optionally adjusting the learning rate via lr_scheduler.

Parameters:

Name Type Description Default
system System_ModelKnown

The system to simulate.

required
solver BaseSolver

A time_integration.BaseSolver instance for stepping the dynamics.

required
dt float

Time-step size passed to solver.

required
T0 float

Initial and (approximate) final times for the simulation.

required
Tf float

Initial and (approximate) final times for the simulation.

required
t_relax float

Approximate duration between parameter updates.

required
true0 jndarray

Initial states for the true and assimilated systems (arrays compatible with solver).

required
assimilated0 jndarray

Initial states for the true and assimilated systems (arrays compatible with solver).

required
optimizer Callable[[jndarray, jndarray], jndarray] | BaseOptimizer | None

Callable or optim.base.BaseOptimizer used to compute the next system.cs from observed portions of the trajectories. If None a default opt.LevenbergMarquardt is used.

None
lr_scheduler LRScheduler

An lr_scheduler.LRScheduler instance to step each update (defaults to a dummy no-op scheduler).

DummyLRScheduler()
t_begin_updates float | None

If provided, updates are skipped until simulation time exceeds this value.

None
return_all bool

When True, return the full simulated trajectories for all time blocks; otherwise return only the last block's trajectories.

False

Returns:

Type Description
tuple

(cs, errors, tls, true, assimilated) where - cs is an array of parameter vectors, shape (N+1, d); - errors is a 1-D array of relative errors, shape (N,); - tls is the time array for update times, shape (N+1,); - true and assimilated are the final true assimilated states for the last interval or the full concatenated states if return_all is True.

Source code in src/otf/syncd/utils.py
def run_update(
    system: System_ModelKnown,
    solver: ti_base.BaseSolver,
    dt: float,
    T0: float,
    Tf: float,
    t_relax: float,
    true0: jndarray,
    assimilated0: jndarray,
    optimizer: Callable[[jndarray, jndarray], jndarray]
    | optim_base.BaseOptimizer
    | None = None,
    lr_scheduler: lr_scheduler.LRScheduler = lr_scheduler.DummyLRScheduler(),
    t_begin_updates: float | None = None,
    return_all: bool = False,
) -> tuple[jndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Run `system` forward and perform periodic parameter updates.

    The function advances `system` from time `T0` toward `Tf` in blocks of
    approximately `t_relax` (rounded to multiples of `dt`) using `solver`. After
    each block it updates `system.cs` using `optimizer` (a callable or a
    `optim.base.BaseOptimizer`), optionally adjusting the learning rate via
    `lr_scheduler`.

    Parameters
    ----------
    system
        The system to simulate.
    solver
        A `time_integration.BaseSolver` instance for stepping the dynamics.
    dt
        Time-step size passed to `solver`.
    T0, Tf
        Initial and (approximate) final times for the simulation.
    t_relax
        Approximate duration between parameter updates.
    true0, assimilated0
        Initial states for the true and assimilated systems (arrays compatible
        with `solver`).
    optimizer
        Callable or `optim.base.BaseOptimizer` used to compute the next
        `system.cs` from observed portions of the trajectories. If None a
        default `opt.LevenbergMarquardt` is used.
    lr_scheduler
        An `lr_scheduler.LRScheduler` instance to step each update (defaults to
        a dummy no-op scheduler).
    t_begin_updates
        If provided, updates are skipped until simulation time exceeds this
        value.
    return_all
        When True, return the full simulated trajectories for all time blocks;
        otherwise return only the last block's trajectories.

    Returns
    -------
    tuple
        `(cs, errors, tls, true, assimilated)` where
            - `cs` is an array of parameter vectors, shape `(N+1, d)`;
            - `errors` is a 1-D array of relative errors, shape `(N,)`;
            - `tls` is the time array for update times, shape `(N+1,)`;
            - `true` and `assimilated` are the final true assimilated states for
              the last interval or the full concatenated states if `return_all`
              is True.
    """
    if optimizer is None:
        optimizer = opt.LevenbergMarquardt(system)

    if isinstance(solver, (ti_base.SinglestepSolver, ti_base.MultistageSolver)):
        return _run_update_not_multistep(
            system,
            solver,
            dt,
            T0,
            Tf,
            t_relax,
            true0,
            assimilated0,
            optimizer,
            lr_scheduler,
            t_begin_updates,
            return_all,
        )
    elif isinstance(solver, ti_base.MultistepSolver):
        return _run_update_multistep(
            system,
            solver,
            dt,
            T0,
            Tf,
            t_relax,
            true0,
            assimilated0,
            optimizer,
            lr_scheduler,
            t_begin_updates,
            return_all,
        )
    else:
        raise NotImplementedError(
            "`solver` should be instance of subclass of "
            "`SinglestepSolver`, `MultistageSolver` or `MultistepSolver`"
        )