"""Higher-order optimized DMD version.
"""
# standard library packages
from typing import Set
# third party packages
import torch as pt
# flowtorch packages
from .svd import SVD
from .optdmd import OptDMD
from .hodmd import _create_time_delays
[docs]
class HOOptDMD(OptDMD):
"""Optimized DMD extended with data projection and time delay embedding.
:param OptDMD: Optimized DMD base class
:type OptDMD: `flowtorch.analysis.OptDMD`
"""
def __init__(self, data_matrix: pt.Tensor, dt: float, delay: int = 1,
rank_dr: int = None, **dmd_options: dict):
"""Project data on POD basis and create time delay embedding.
:param data_matrix: data matrix with snapshots organized as column vectors
:type data_matrix: pt.Tensor
:param dt: time step between two consecutive snapshots
:type dt: float
:param delay: number of time delay (history), defaults to 1 (no time delays)
:type delay: int, optional
:param rank_dr: number of POD modes to project the data on; an optimal value
is determined by the `SVD` instance if None; defaults to None
:type rank_dr: int, optional
"""
self._dm_org = data_matrix
self._rows_org, self._cols_org = data_matrix.shape
self._svd_dr = SVD(data_matrix, rank_dr)
self._delay = delay
super(HOOptDMD, self).__init__(
_create_time_delays(self._svd_dr.U.T @ self._dm_org, delay),
dt, **dmd_options
)
[docs]
def partial_reconstruction(self, mode_indices: Set[int]) -> pt.Tensor:
modes = self.modes
mode_mask = pt.zeros(modes.shape[1], dtype=modes.dtype)
mode_indices = pt.tensor(list(mode_indices), dtype=pt.int64)
mode_mask[mode_indices] = 1.0
rec = (modes * mode_mask) @ self.dynamics
if not self._dmd._complex:
rec = rec.real
return rec
[docs]
def predict(self, initial_condition: pt.Tensor, n_steps: int) -> pt.Tensor:
"""Predict evolution over N steps starting from used-defined initial conditions.
The prediction is performed as follows:
1) the initial conditions are projected on the first r POD modes
2) the time delay embedding is computed in the reduced space
3) the prediction is computed in the reduced space
4) the first r rows of the prediction are reconstructed
:param initial_condition: initial sequence of state vectors without time delay
embedding; for d delays and a state vector of size M, the initial conditions
should be given as a M x d matrix sorted such that the most recent state
is contained in the last column
:type initial_condition: pt.Tensor
:param n_steps: number of future steps to predict
:type n_steps: int
:return: predicted states; due to the structure of the linear operator
some states are predicted multiple times; only the first M rows of the
reconstruction are returned
:rtype: pt.Tensor
"""
ic = self.svd_dr.U.T @ initial_condition
ic = _create_time_delays(ic, self._delay, 1).squeeze()
prediction = super().predict(ic, n_steps)
r = self.svd_dr.rank
return self.svd_dr.U @ prediction[-r:]
@property
def dynamics(self) -> pt.Tensor:
vander = pt.linalg.vander(self.eigvals, N=self._cols_org)
return pt.diag(self.amplitude.type(vander.dtype)) @ vander
@property
def modes(self):
modes = self.eigvecs / self.amplitude
r = self._svd_dr.rank
return self._svd_dr.U.type(modes.dtype) @ modes[:r]
@property
def reconstruction(self) -> pt.Tensor:
rec = self.modes @ self.dynamics
if not self._dmd._complex:
rec = rec.real
return rec
@property
def reconstruction_error(self) -> pt.Tensor:
return self._dm_org - self.reconstruction
@property
def svd_dr(self) -> SVD:
return self._svd_dr