Source code for flowtorch.rom.cnm

"""Implementation of cluster-based network modeling (CNM).

A reference implementation by Daniel Fernex is available on
Github_. Theoretical concepts are covered in the accompanying
publication_.

.. _Github: https://github.com/fernexda/cnm
.. _publication: https://advances.sciencemag.org/content/7/25/eabf5006
"""

# standard library packages
from typing import Dict, Tuple
from collections import defaultdict, deque
from itertools import groupby
# third party packages
import numpy as np
import torch as pt
from sklearn.cluster import KMeans
from sklearn.neighbors import KDTree
from scipy.interpolate import InterpolatedUnivariateSpline
# flowtorch packages
from .base import ROM, Encoder
from .utils import (check_larger_than, check_int_larger_than,
                    remove_sequential_duplicates)


[docs]class CNM(ROM): """Cluster-based network modeling implementation. In contrast to the original implementation_, the clustering, the computation of transition probabilities, and the propagation in time are condensed in a single class. The class also relies on Numpy and Scikit-Learn for K-means, KD-tree, and interpolation. These components might be replaced with PyTorch implementations in future releases. The cluster attribute has a member called labels. Each label corresponds to the cluster id associated with each sample of the input data. Since the input data are sorted according to the time at which they were sampled, we can interpret the labels as a sequence of visited clusters over time However, if two or more consecutive snapshots belong to the same cluster, we are only interested in the next state in the sequence that is different from the current cluster because we want to model the temporal behavior as a transition between cluster centroids. Therefore, sequential duplicates must be removed. .. _implementation: https://github.com/fernexda/cnm/tree/main/cnm :param dt: time step between two snapshots :type dt: float :param n_clusters: number of clusters; must be larger than one :type n_clusters: int, optional :param model_order: number of past cluster to consider when predicting the next cluster :type model_order: int, optional :param Q: transition probabilities :type Q: Dict[str, np.ndarray] :param T: transition times :type T: Dict[str, float] :param times: times at which cluster centroids were visited :type times: List[float] :param visited_clusters: clusters visited during the temporal evolution :type visited_clusters: List[int] :param cluster_centers: centroids of clusters :type cluster_centers: np.ndarray Examples >>> from flowtorch.rom import SVDEncoder, CNM ... assemble data matrix >>> encoder = SVDEncoder(rank=20) >>> info = encoder.train(data_matrix) >>> reduced_state = encoder.encode(data_matrix) >>> cnm = CNM(reduced_state, encoder, n_clusters=20, model_order=4) >>> prediction = cnm.predict(data_matrix[:, :5], end_time=1.0, step_size=0.1) >>> reduced_prediction = cnm.predict_reduced(reduced_state[:, :5], 1.0, 0.1) >>> full_prediction = encoder.decode(reduced_prediction) """ def __init__(self, reduced_state: pt.Tensor, encoder: Encoder, dt: float, n_clusters: int = 10, model_order: int = 1, cluster_config: dict = {}): # the base class performs compatibility checks of the input data super(CNM, self).__init__(reduced_state, encoder) self.dt = dt self.n_clusters = n_clusters self.model_order = model_order self._dtype = reduced_state.dtype self._cluster = KMeans(n_clusters, **cluster_config).fit( reduced_state.T.numpy() # batch dimension comes first in Scikit-Learn ) self._sequence = remove_sequential_duplicates(self._cluster.labels_) self._transition_prob = self._compute_transition_prob() self._transition_time = self._compute_transition_time() self._times = None self._visited_clusters = None self._tree = None def _compute_transition_prob(self) -> Dict[str, np.ndarray]: """Compute the transition probabilities between clusters. :raises Exception: if the model order is shorter than or equal to the cluster sequence found in the initial dataset :return: cluster sequence of length `self.model_order` as key and transition probabilities for each potential next cluster as value; the probabilities are stored as 2D arrays, where the first column corresponds to the id of the next cluster and the second column contains the associated probability :rtype: Dict[str, np.ndarray] """ if self.model_order >= self._sequence.size: raise Exception("Could not compute transition probabilities: " + f"length of cluster sequence ({self._sequence.size}) must be higher " + f"than chosen model order ({self.model_order})") visited_clusters = deque( self._sequence[:self.model_order], self.model_order) prob = defaultdict(list) for next_cluster in self._sequence[self.model_order:]: key = ",".join(map(str, visited_clusters)) prob[key].append(next_cluster) visited_clusters.append(next_cluster) for key, next_clusters in prob.items(): unique, counts = np.unique(next_clusters, return_counts=True) prob[key] = np.stack((unique, counts/counts.sum())).T return prob def _compute_transition_time(self) -> Dict[str, float]: """Compute transition time between cluster centroids. This function first checks the list of cluster labels for sequential duplicates as explained in this post_. Then the transition time from the past clusters to the next cluster is computed. The main idea is that if a cluster in the training data sequence is duplicated, possibly several times, then the transition time from and to that cluster must be higher. For example, if the labels are [0, 0, 1, 3, 0, ...], then the sequence of unique clusters is [0, 1, 3, 0, ...] and the number of sequential occurrences is [2, 1, 1, ...]. For a model order of two (considering two clusters to compute the next one), the transion time for the sequence '0,1,3' (having been in 0 then 1, and now going to 3) is half of the time spent in cluster one plus half of the time spent in cluster, which is 0.5 * (1 + 1) * dt. It might happen that certain transitions occur multiple times with different transition times. Therefore, the final transition time is computed as the average of all observered transition times for a given sequence. .. _post: https://stackoverflow.com/questions/39340345/how-to-count-consecutive-duplicates-in-a-python-list :return: dictionary with the sequence of past clusters plus next cluster as key and the average transition time as value :rtype: Dict[str, float] """ seq_duplicates = np.array( [sum(1 for _ in group) for _, group in groupby(self._cluster.labels_)] ) transition = defaultdict(list) for i in range(self._sequence.size - self.model_order): ip_order = i + self.model_order key = ",".join(map(str, self._sequence[i:ip_order + 1])) transition[key].append( 0.5 * self.dt * np.sum(seq_duplicates[ip_order - 1:ip_order + 1]) ) return {key: np.mean(value) for key, value in transition.items()} def _find_closest_cluster(self, label: int) -> int: """Find the label of the nearest cluster. If there are two nearest clusters with the same distance, the KDTree returns the index of the first cluster that was found. :param label: label of the cluster from which to compute the distance :type label: int :return: label of the nearest cluster :rtype: int """ if self._tree is None: self._tree = KDTree(self.cluster_centers) _, ind = self._tree.query( np.expand_dims(self.cluster_centers[label, :], axis=0), 2 ) return ind[0, -1] def _find_history(self, history: deque) -> deque: """Find an alternative history based on a suggested history. This method is used in two places: 1) to create an initial history if the initial reduced state provided fewer cluster labels than needed for the given model order 2) to create an alternative history if the trajectory ended up in a dead end cluster (a cluster for which no transition probabilities are known); it is assumed that the last cluster of the trajectory was replaced with the label of the nearest neighboring cluster The following strategy is used to find the most similar trajectory to the one provided as input: - use as much of the given trajectory is possible and find matching histories in the list of available histories - if no trajectory is found, the process is repeated is shorter segment of the trajectory; this process continues until at least on potential history is found - if multiple possible histories are found that are equally similar to the input trajectory, the final history is sampled at random :param history: trajectroy to replace or complete :type history: deque :return: a trajectory as similar as possible to the input for which a transition probability is available :rtype: deque """ count = 0 while count < self.model_order: test_history = ",".join(map(str, list(history)[count:])) possible_histories = [key for key in self._transition_prob.keys() if key.endswith(test_history)] if len(possible_histories) > 0: select_key = np.random.choice(possible_histories) return deque(map(int, select_key.split(",")), self.model_order) count += 1 def _find_initial_history(self, initial_reduced_state: np.ndarray) -> deque: """Find a sensible initial history for a given reduced state. :param initial_reduced_state: initial reduced state or initial sequence of reduced state vectors; if a sequence is given, each column must form one reduced state vector :type initial_reduced_state: np.ndarray :return: initial trajectory of cluster labels with as many elements as required by the model order :rtype: deque """ if initial_reduced_state.ndim == 1: label = self._cluster.predict( np.expand_dims(initial_reduced_state, axis=0)) else: label = self._cluster.predict( initial_reduced_state.T ) history = deque(remove_sequential_duplicates(label), self.model_order) if len(history) < self.model_order: return self._find_history(history) else: return history def _sample_next_cluster(self, history: deque) -> Tuple[deque, float]: """Sample next cluster and corresponding transition time. In the key resulting from the current history is not present in the list of available transition probabilities, the last cluster label in the given history is replaced with the label of the nearest cluster, and a new history is searched. :param history: current history/past/trajectory :type history: deque :return: the history with the new cluster appended and the transition time :rtype: Tuple[deque, float] """ key = ",".join(map(str, list(history))) if not key in self._transition_prob: last_cluster = history.pop() history.append(self._find_closest_cluster(last_cluster)) history = self._find_history(history) key = ",".join(map(str, list(history))) next_cluster = int(np.random.choice( self._transition_prob[key][:, 0], p=self._transition_prob[key][:, 1]) ) key += ",{:d}".format(next_cluster) history.append(next_cluster) return history, self._transition_time[key] def _interpolate_trajectory(self, step_size: float) -> pt.Tensor: """Add interpolated clusters to overall trajectory. :param step_size: time step size at which to place clusters :type step_size: float :raises ValueError: if the trajectory has fewer than two clusters :return: trajectory with interpolated clusters :rtype: pt.Tensor """ if len(self.times) < 2: raise ValueError("At least two predictions must be available " + "to interpolate a trajectory") times = np.arange( self.times[0], self.times[-1]+0.5*step_size, step_size) prediction = pt.empty((self.encoder.reduced_state_size, times.size), dtype=self._dtype) for dim in range(self.encoder.reduced_state_size): spline = InterpolatedUnivariateSpline( self._times, self.cluster_centers[self.visited_clusters][:, dim], k=min(3, len(self.times)-1) ) prediction[dim, :] = pt.from_numpy(spline(times)) return prediction
[docs] def predict_reduced(self, initial_state: pt.Tensor, end_time: float, step_size: float) -> pt.Tensor: """Advance given reduced state in time. :param initial_state: initial reduced state vector or sequence of reduced state vectors; the state vectors must form the columns of the input tensor is a sequence is given :type initial_state: pt.Tensor :param end_time: time at which to step the simulation; the simulation time always starts at zero :type end_time: float :param step_size: time step size to advance simulation time; note that the sampling of the next cluster is independent of the step size; the step size is only used to interpolate states inbetween the sampled clusters :type step_size: float :return: temporal evolution of the given reduced state vector; each column forms a temporal snapshot :rtype: pt.Tensor """ history = self._find_initial_history(initial_state.numpy()) self._visited_clusters = [history[-1]] self._times = [0.0] while self._times[-1] < end_time: history, trans_time = self._sample_next_cluster(history) self._times.append(self._times[-1] + trans_time) self._visited_clusters.append(history[-1]) return self._interpolate_trajectory(step_size)
@property def dt(self) -> float: return self._dt @dt.setter def dt(self, value): check_larger_than(value, 0.0, "dt") self._dt = value @property def n_clusters(self) -> int: return self._n_clusters @n_clusters.setter def n_clusters(self, value: int): check_int_larger_than(value, 1, "n_clusters") self._n_clusters = value @property def model_order(self) -> int: return self._model_order @model_order.setter def model_order(self, value): check_int_larger_than(value, 0, "model_order") self._model_order = value @property def cluster_centers(self): return self._cluster.cluster_centers_ @property def visited_clusters(self) -> list: return self._visited_clusters @property def times(self) -> list: return self._times @property def Q(self): return self._transition_prob @property def T(self): return self._transition_time