"""Classes and functions to compute the dynamic mode decomposition (DMD) of a data matrix.
"""
# standard library packages
from typing import Tuple, Set, Union
# third party packages
import torch as pt
from numpy import pi
# flowtorch packages
from .svd import SVD
from flowtorch.data.utils import format_byte_size
def _dft_properties(dt: float, n_times: int) -> Tuple[float, float, float]:
"""Compute general properties of a discrete Fourier transformation.
DFT properties like maximum frequency and frequency resolution can
be a helpful guidance for building sensible data matrices used for
modal decomposition.
:param dt: timestep between two samples; assumed constant
:type dt: float
:param n_times: number of timesteps
:type n_times: int
:return: sampling frequency, maximum frequency, frequency resolution
:rtype: Tuple[float, float, float]
"""
fs = 1.0 / dt
return fs, 0.5 * fs, fs / n_times
[docs]
class DMD(object):
"""Class computing the exact DMD of a data matrix.
Examples
>>> from flowtorch import DATASETS
>>> from flowtorch.data import FOAMDataloader
>>> from flowtorch.analysis import DMD
>>> path = DATASETS["of_cavity_binary"]
>>> loader = FOAMDataloader(path)
>>> data_matrix = loader.load_snapshot("p", loader.write_times)
>>> dmd = DMD(data_matrix, dt=0.1, rank=3)
>>> dmd.frequency
tensor([0., 5., 0.])
>>> dmd.growth_rate
tensor([-2.3842e-06, -4.2345e+01, -1.8552e+01])
>>> dmd.amplitude
tensor([10.5635+0.j, -0.0616+0.j, -0.0537+0.j])
>>> dmd = DMD(data_matrix, dt=0.1, rank=3, robust=True)
>>> dmd = DMD(data_matrix, dt=0.1, rank=3, robust={"tol": 1.0e-5, "verbose" : True})
"""
def __init__(self, data_matrix: pt.Tensor, dt: float, rank: int = None,
robust: Union[bool, dict] = False, unitary: bool = False,
optimal: bool = False, tlsq: bool = False, usecols: pt.Tensor = None):
"""Create DMD instance based on data matrix and time step.
:param data_matrix: data matrix whose columns are formed by the individual snapshots
:type data_matrix: pt.Tensor
:param dt: time step between two snapshots
:type dt: float
:param rank: rank for SVD truncation, defaults to None
:type rank: int, optional
:param robust: data_matrix is split into low rank and sparse contributions
if True or if dictionary with options for Inexact ALM algorithm; the SVD
is computed only on the low rank matrix
:type robust: Union[bool,dict]
:param unitary: enforce the linear operator to be unitary; refer to piDMD_
by Peter Baddoo for more information
:type unitary: bool, optional
:param optimal: compute mode amplitudes based on a least-squares problem
as described in spDMD_ article by M. Janovic et al. (2014); in contrast
to the original spDMD implementation, the exact DMD modes are used in
the optimization problem as outlined in an article_ by R. Taylor
:type optimal: bool, optional
:param tlsq: de-biasing of the linear operator by solving a total least-squares
problem instead of a standard least-squares problem; the rank is selected
automatically or specified by the `rank` parameter; more information can be
found in the TDMD_ article by M. Hemati et al.
:type tlsq: bool, optional
.. _piDMD: https://github.com/baddoo/piDMD
.. _spDMD: https://hal-polytechnique.archives-ouvertes.fr/hal-00995141/document
.. _article: http://www.pyrunner.com/weblog/2016/08/03/spdmd-python/
.. _TDMD: http://cwrowley.princeton.edu/papers/Hemati-2017a.pdf
"""
self._dm = data_matrix
self._complex = self._dm.dtype in (pt.complex32, pt.complex64, pt.complex128)
self._rows, self._cols = self._dm.shape
self._dt = dt
self._unitary = unitary
self._optimal = optimal
self._tlsq = tlsq
if self._tlsq:
svd = SVD(pt.vstack((self._dm[:, :-1], self._dm[:, 1:])),
rank, robust)
P = svd.V @ svd.V.conj().T
self._X = self._dm[:, :-1] @ P
self._Y = self._dm[:, 1:] @ P
self._svd = SVD(self._X, svd.rank)
del svd
else:
self._svd = SVD(self._dm[:, :-1], rank, robust)
self._X = self._dm[:, :-1]
self._Y = self._dm[:, 1:]
self._eigvals, self._eigvecs, self._modes = self._compute_mode_decomposition()
self._amplitude = self._compute_amplitudes()
def _compute_operator(self):
"""Compute the approximate linear (DMD) operator.
"""
if self._unitary:
Xp = self._svd.U.conj().T @ self._X
Yp = self._svd.U.conj().T @ self._Y
U, _, VT = pt.linalg.svd(Yp @ Xp.conj().T, full_matrices=False)
return U @ VT
else:
s_inv = pt.diag(1.0 / self._svd.s.type(self._dm.dtype))
return self._svd.U.conj().T @ self._Y @ self._svd.V @ s_inv
def _compute_mode_decomposition(self):
"""Compute reduced operator, eigen-decomposition, and DMD modes.
"""
s_inv = pt.diag(1.0 / self._svd.s)
operator = self._compute_operator()
val, vec = pt.linalg.eig(operator)
phi = (
self._Y.type(val.dtype) @ self._svd.V.type(val.dtype)
@ s_inv.type(val.dtype) @ vec
)
return val, vec, phi
def _compute_amplitudes(self):
"""Compute amplitudes for exact DMD modes.
If *optimal* is False, the amplitudes are computed based on the first
snapshot in the data matrix; otherwise, a least-squares problem as
introduced by Janovic et al. is solved (refer to the documentation
in the constructor for more information).
"""
if self._optimal:
vander = pt.linalg.vander(self.eigvals, N=self._cols - 1)
P = (self._modes.conj().T @ self._modes) * \
(vander @ vander.conj().T).conj()
q = pt.diag(vander @ self._X.type(P.dtype).conj().T @
self._modes).conj()
else:
P = self._modes
q = self._dm[:, 0].type(P.dtype)
return pt.linalg.lstsq(P, q).solution
[docs]
def partial_reconstruction(self, mode_indices: Set[int]) -> pt.Tensor:
"""Reconstruct data matrix with limited number of modes.
:param mode_indices: mode indices to keep
:type mode_indices: Set[int]
:return: reconstructed data matrix
:rtype: pt.Tensor
"""
rows, cols = self._modes.shape
mode_mask = pt.zeros(cols, dtype=pt.complex64)
mode_indices = pt.tensor(list(mode_indices), dtype=pt.int64)
mode_mask[mode_indices] = 1.0
rec = (self.modes * mode_mask) @ self.dynamics
if not self._complex:
rec = rec.real
return rec
[docs]
def top_modes(self, n: int = 10, integral: bool = False,
f_min: float = -float("inf"),
f_max: float = float("inf")) -> pt.Tensor:
"""Get the indices of the first n most important modes.
Note that the conjugate complex modes for real data matrices are
not filtered out by default. However, by setting the lower frequency
threshold to a positive number, only modes with positive imaginary
part are considered.
:param n: number of indices to return; defaults to 10
:type n: int
:param integral: if True, the modes are sorted according to their
integral contribution; defaults to False
:type integral: bool, optional
:param f_min: consider only modes with a frequency larger or equal
to f_min; defaults to -inf
:type f_min: float, optional
:param f_max: consider only modes with a frequency smaller than f_max;
defaults to -inf
:type f_max: float, optional
:return: indices of top n modes sorted by amplitude or integral
contribution
:rtype: pt.Tensor
"""
importance = self.integral_contribution if integral else self.amplitude
modes_in_range = pt.logical_and(self.frequency >= f_min,
self.frequency < f_max)
mode_indices = pt.tensor(range(modes_in_range.shape[0]),
dtype=pt.int64)[modes_in_range]
n = min(n, mode_indices.shape[0])
top_n = importance[mode_indices].abs().topk(n).indices
return mode_indices[top_n]
[docs]
def predict(self, initial_condition: pt.Tensor, n_steps: int) -> pt.Tensor:
"""Predict evolution over N steps starting from used-defined initial conditions.
:param initial_condition: initial state vector
:type initial_condition: pt.Tensor
:param n_steps: number of steps to predict
:type n_steps: int
:return: predicted evolution including the initial state (N+1 states are returned)
:rtype: pt.Tensor
"""
b = pt.linalg.pinv(self._modes) @ initial_condition.type(self._modes.dtype)
prediction = self._modes @ pt.diag(b) @ pt.linalg.vander(self.eigvals, N=n_steps+1)
if not self._complex:
prediction = prediction.real
return prediction
@property
def required_memory(self) -> int:
"""Compute the memory size in bytes of the DMD.
:return: cumulative size of SVD, eigen values/vectors, and
DMD modes in bytes
:rtype: int
"""
return (self._svd.required_memory +
self._eigvals.element_size() * self._eigvals.nelement() +
self._eigvecs.element_size() * self._eigvecs.nelement() +
self._modes.element_size() * self._modes.nelement())
@property
def svd(self) -> SVD:
return self._svd
@property
def operator(self) -> pt.Tensor:
return self._compute_operator()
@property
def modes(self) -> pt.Tensor:
return self._modes
@property
def eigvals(self) -> pt.Tensor:
return self._eigvals
@property
def eigvals_cont(self) -> pt.Tensor:
return pt.log(self._eigvals) / self._dt
@property
def eigvecs(self) -> pt.Tensor:
return self._eigvecs
@property
def frequency(self) -> pt.Tensor:
return pt.log(self._eigvals).imag / (2.0 * pi * self._dt)
@property
def growth_rate(self) -> pt.Tensor:
return (pt.log(self._eigvals) / self._dt).real
@property
def amplitude(self) -> pt.Tensor:
return self._amplitude
@property
def dynamics(self) -> pt.Tensor:
return pt.diag(self.amplitude) @ pt.linalg.vander(self.eigvals, N=self._cols)
@property
def integral_contribution(self) -> pt.Tensor:
"""Integral contribution of individual modes according to J. Kou et al. 2017.
DOI: https://doi.org/10.1016/j.euromechflu.2016.11.015
"""
return self.modes.norm(dim=0)**2 * self.dynamics.abs().sum(dim=1)
@property
def reconstruction(self) -> pt.Tensor:
"""Reconstruct an approximation of the original data matrix.
:return: reconstructed data matrix
:rtype: pt.Tensor
"""
rec = self.modes @ self.dynamics
if not self._complex:
rec = rec.real
return rec
@property
def reconstruction_error(self) -> pt.Tensor:
"""Compute the reconstruction error.
:return: difference between reconstruction and data matrix
:rtype: pt.Tensor
"""
return self.reconstruction - self._dm
@property
def projection_error(self) -> pt.Tensor:
"""Compute the difference between Y and AX.
:return: projection error
:rtype: pt.Tensor
"""
YH = (self._modes @ pt.diag(self.eigvals)) @ \
(pt.linalg.pinv(self._modes) @ self._X.type(self._modes.dtype))
if self._complex:
return YH - self._dm[:, 1:]
else:
return YH.real.type(self._Y.dtype) - self._dm[:, 1:]
@property
def tlsq_error(self) -> Tuple[pt.Tensor, pt.Tensor]:
"""Compute the *noise* in X and Y.
:return: noise in X and Y
:rtype: Tuple[pt.Tensor, pt.Tensor]
"""
if not self._tlsq:
print("Warning: noise is only removed if tlsq=True")
return self._dm[:, :-1] - self._X, self._dm[:, 1:] - self._Y
@property
def dft_properties(self) -> Tuple[float, float, float]:
return _dft_properties(self._dt, self._cols - 1)
def __repr__(self):
return f"{self.__class__.__qualname__}(data_matrix, rank={self._svd.rank})"
def __str__(self):
ms = ["SVD:", str(self.svd), "LSQ:"]
size, unit = format_byte_size(self.required_memory)
ms.append("Overall DMD size: {:1.4f}{:s}".format(size, unit))
ms.append("DFT frequencies (sampling, max., res.):")
ms.append("{:1.4f}Hz, {:1.4f}Hz, {:1.4f}Hz".format(*self.dft_properties))
ms.append("")
return "\n".join(ms)