Source code for flowtorch.data.tau_dataloader

r"""Direct access to TAU simulation data.

The DRL (Deutsches Luft- und Raumfahrtzentrum) TAU_ code saves
snapshots in the NetCFD format. The :class:`TAUDataloader` is a
wrapper around the NetCFD Python bindings to simplify the access
to snapshot data.

.. _TAU: https://www.dlr.de/as/desktopdefault.aspx/tabid-395/526_read-694/

"""
# standard library packages
from abc import abstractmethod
from os.path import join, split
from glob import glob
from collections import defaultdict
from typing import List, Dict, Tuple, Union, Set
# third party packages
from netCDF4 import Dataset
import torch as pt
# flowtorch packages
from flowtorch import DEFAULT_DTYPE
from .dataloader import Dataloader
from .utils import check_list_or_str, check_and_standardize_path

VOL_SOLUTION_NAME = ".pval.unsteady_"
SURF_SOLUTION_NAME = ".surface.pval.unsteady_"
PSOLUTION_POSTFIX = ".domain_"
PMESH_NAME = "domain_{:s}_grid_1"
PVERTEX_KEY = "pcoord"
PWEIGHT_KEY = "pvolume"
PADD_POINTS_KEY = "addpoint_idx"
PGLOBAL_ID_KEY = "globalidx"
VERTEX_KEYS = ("points_xc", "points_yc", "points_zc")
WEIGHT_KEY = "volume"

COMMENT_CHAR = "#"
CONFIG_SEP = ":"
SOLUTION_PREFIX_KEY = "solution_prefix"
GRID_FILE_KEY = "primary_grid"
GRID_PREFIX_KEY = "grid_prefix"
N_DOMAINS_KEY = "n_domains"
BMAP_FILE_KEY = "bmap_file"


[docs] class TAUConfig(object): """Load and parse TAU parameter files. The class does not parse the full content of the parameter file but only content that is absolutely needed to load snapshot data. """ def __init__(self, file_path: str): """Create a `TauConfig` instance from the file path. :param file_path: path to the parameter file :type path: str """ self._path, self._file_name = split(file_path) with open(join(self._path, self._file_name), "r") as config: self._file_content = config.readlines() self._config = None def _parse_config(self, parameter: str) -> str: """Extract a value based on a given pattern. Every line of the parameter file follows the structure: parameter : value This function extracts the value as string and remove potential white spaces or comments (#). The separator is expected to be a colon. Note: if the parameter is found multiple times, the value of the last occurrence is returned. :param parameter: the parameter of which to extract the value :type pattern: str :return: extracted value or empty string :rtype: str """ value = "" for line in self._file_content: if parameter in line: value = line.split(CONFIG_SEP)[-1].split(COMMENT_CHAR)[0].strip() return value def _parse_bmap(self) -> dict: """Load and/or parse boundary mapping. The boundary mapping is required to load TAU surface data; the parameter file either contains the boundary mapping or it points to an external file containing the mapping. The mapping associates boundary name, e.g., 'farfield', with one or more integer values (markers), which are needed to locate the boundary field in the NetCFD output file. Note: this function was first implemented by Sebastian Spinner (DLR) and then refactored and merged into flowTorch. :return: dictionary with the keys being the zone (patch) names and the key being a list of markers :rtype: dict """ filename = self._parse_config("Boundary mapping filename") if filename == "(thisfile)": content = self._file_content else: with open(join(self._path, filename), "r") as bfile: content = bfile.readlines() bmap = {} for i, line in enumerate(content): if "Markers" in line: markers = line.split( CONFIG_SEP)[-1].split(COMMENT_CHAR)[0].split(",") markers = [int(m) for m in markers] block_end_found = False write_surface_data = False j = i while not block_end_found: j += 1 if "Write surface data (0/1)" in content[j]: write_surface_data = True elif "Name" in content[j]: name = content[j].split( CONFIG_SEP)[-1].split(COMMENT_CHAR)[0].strip() elif "block end" in content[j]: block_end_found = True else: continue if block_end_found and write_surface_data: bmap[name] = markers return bmap def _gather_config(self): """Gather all required configuration values. """ config = {} config[SOLUTION_PREFIX_KEY] = self._parse_config("Output files prefix") config[GRID_FILE_KEY] = self._parse_config("Primary grid filename") config[GRID_PREFIX_KEY] = self._parse_config("Grid prefix") config[N_DOMAINS_KEY] = int(self._parse_config("Number of domains")) config[BMAP_FILE_KEY] = self._parse_bmap() self._config = config @property def path(self) -> str: return self._path @property def config(self) -> dict: if self._config is None: self._gather_config() return self._config
[docs] class TAUBase(Dataloader): """Base class with shared functionality of TAU Dataloaders. """ def __init__(self, parameter_file: str, distributed: bool = False, dtype: str = DEFAULT_DTYPE): self._para = TAUConfig(parameter_file) self._distributed = distributed self._dtype = dtype self._mesh_data = None self._solution_name = None def _decompose_file_name(self) -> Dict[str, str]: """Extract write time and iteration from file name. :raises FileNotFoundError: if no solution files are found :return: dictionary with write times as keys and the corresponding iterations as values :rtype: Dict[str, str] """ base = join(self._para.path, self._para.config[SOLUTION_PREFIX_KEY]) base += self._solution_name suffix = f"{PSOLUTION_POSTFIX}0" if self._distributed else "e???" files = glob(f"{base}i=*t=*{suffix}") if len(files) < 1: raise FileNotFoundError( f"Could not find solution files in {self._para.path}/") time_iter = {} split_at = PSOLUTION_POSTFIX if self._distributed else " " for f in files: t = f.split("t=")[-1].split(split_at)[0] i = f.split("i=")[-1].split("_t=")[0] time_iter[t] = i return time_iter def _file_name(self, time: str, suffix: str = "") -> str: """Create solution file name from write time. :param time: snapshot write time :type time: str :param suffix: suffix to append to the file name; used for decomposed simulations :type suffix: str, optional :return: name of solution file :rtype: str """ itr = self._time_iter[time] path = join(self._para.path, self._para.config[SOLUTION_PREFIX_KEY]) return f"{path}{self._solution_name}i={itr}_t={time}{suffix}" @abstractmethod def _load_single_snapshot(self, field_name: str, time: str) -> pt.Tensor: """Load a single snapshot of a single field from the netCDF4 file(s). :param field_name: name of the field :type field_name: str :param time: snapshot write time :type time: str :return: tensor holding the field values :rtype: pt.Tensor """ pass @abstractmethod def _load_mesh_data(self): """Load mesh vertices and cell volumes/areas. The mesh data is saved as class member `_mesh_data`. The tensor has the dimension n_points x 4; the first three columns correspond to the x/y/z coordinates, and the 4th column contains the volumes/areas. """ pass
[docs] def load_snapshot(self, field_name: Union[List[str], str], time: Union[List[str], str]) -> Union[List[pt.Tensor], pt.Tensor]: check_list_or_str(field_name, "field_name") check_list_or_str(time, "time") # load multiple fields if isinstance(field_name, list): if isinstance(time, list): return [ pt.stack([self._load_single_snapshot(field, t) for t in time], dim=-1) for field in field_name ] else: return [ self._load_single_snapshot(field, time) for field in field_name ] # load single field else: if isinstance(time, list): return pt.stack( [self._load_single_snapshot(field_name, t) for t in time], dim=-1 ) else: return self._load_single_snapshot(field_name, time)
@property def write_times(self) -> List[str]: return sorted(list(self._time_iter.keys()), key=float)
[docs] class TAUDataloader(TAUBase): """Load TAU simulation data. The loader is currently limited to read: - internal field solution, serial/reconstructed and distributed - mesh vertices, serial and distributed - cell volumes, serial (if present) and distributed Examples >>> from os.path import join >>> from flowtorch import DATASETS >>> from flowtorch.data import TAUDataloader >>> path = DATASETS["tau_backward_facing_step"] >>> loader = TAUDataloader(join(path, "simulation.para")) >>> times = loader.write_times >>> fields = loader.field_names[times[0]] >>> fields ['density', 'x_velocity', 'y_velocity', ...] >>> density = loader.load_snapshot("density", times) To load distributed simulation data, set `distributed=True` >>> path = DATASETS["tau_cylinder_2D"] >>> loader = TAUDataloader(join(path, "simulation.para"), distributed=True) >>> vertices = loader.vertices """ def __init__(self, parameter_file: str, distributed: bool = False, dtype: str = DEFAULT_DTYPE): """Create loader instance from TAU parameter file. :param parameter_file: path to TAU simulation parameter file :type parameter_file: str :param distributed: True if mesh and solution are distributed in domain files; defaults to False :type distributed: bool, optional :param dtype: tensor type, defaults to DEFAULT_DTYPE :type dtype: str, optional """ super(TAUDataloader, self).__init__(parameter_file, distributed, dtype) self._solution_name = VOL_SOLUTION_NAME self._time_iter = self._decompose_file_name() def _load_domain_mesh_data(self, pid: str) -> pt.Tensor: """Load vertices and volumes for a single processor domain. :param pid: domain id :type pid: str :return: tensor of size n_points x 4, where n_points is the number of unique cells in the domain, and the 4 columns contain the coordinates of the vertices (x, y, z) and the cell volumes :rtype: pt.Tensor """ prefix = self._para.config[GRID_PREFIX_KEY] name = PMESH_NAME.format(pid) if not (prefix == "(none)"): name = f"{prefix}_{name}" path = join(self._para.path, name) with Dataset(path) as data: vertices = pt.tensor(data[PVERTEX_KEY][:], dtype=self._dtype) volumes = pt.tensor(data[PWEIGHT_KEY][:], dtype=self._dtype) global_ids = pt.tensor(data[PGLOBAL_ID_KEY][:], dtype=pt.int64) n_add_points = data[PADD_POINTS_KEY].shape[0] n_points = volumes.shape[0] - n_add_points data = pt.zeros((n_points, 4), dtype=self._dtype) sorting = pt.argsort(global_ids[:n_points]) data[:, 0] = vertices[:n_points, 0][sorting] data[:, 1] = vertices[:n_points, 1][sorting] data[:, 2] = vertices[:n_points, 2][sorting] data[:, 3] = volumes[:n_points][sorting] return data def _load_mesh_data(self): """Load mesh vertices and cell volumes. The mesh data is saved as class member `_mesh_data`. The tensor has the dimension n_points x 4; the first three columns correspond to the x/y/z coordinates, and the 4th column contains the volumes. """ if self._distributed: n = self._para.config[N_DOMAINS_KEY] self._mesh_data = pt.cat( [self._load_domain_mesh_data(str(pid)) for pid in range(n)], dim=0 ) else: path = join(self._para.path, self._para.config[GRID_FILE_KEY]) with Dataset(path) as data: vertices = pt.stack( [pt.tensor(data[key][:], dtype=self._dtype) for key in VERTEX_KEYS], dim=-1 ) if WEIGHT_KEY in data.variables.keys(): weights = pt.tensor( data.variables[WEIGHT_KEY][:], dtype=self._dtype) else: print( f"Warning: could not find cell volumes in file {path}") weights = pt.ones(vertices.shape[0], dtype=self._dtype) self._mesh_data = pt.cat((vertices, weights.unsqueeze(-1)), dim=-1) def _load_single_snapshot(self, field_name: str, time: str) -> pt.Tensor: """Load a single snapshot of a single field from the netCDF4 file(s). :param field_name: name of the field :type field_name: str :param time: snapshot write time :type time: str :return: tensor holding the field values :rtype: pt.Tensor """ if self._distributed: field = [] for pid in range(self._para.config[N_DOMAINS_KEY]): path = self._file_name(time, f".domain_{pid}") with Dataset(path) as data: field.append( pt.tensor( data.variables[field_name][:], dtype=self._dtype) ) return pt.cat(field, dim=0) else: path = self._file_name(time) with Dataset(path) as data: field = pt.tensor( data.variables[field_name][:], dtype=self._dtype) return field @property def field_names(self) -> Dict[str, List[str]]: """Find available fields in solution files. Available fields are determined by matching the number of weights with the length of datasets in the available solution files; for distributed cases, the fields are only determined based on *domain_0*. :return: dictionary with time as key and list of available solution fields as value :rtype: Dict[str, List[str]] """ self._field_names = {} if self._distributed: n_points = self._load_domain_mesh_data("0").shape[0] suffix = ".domain_0" else: n_points = self.vertices.shape[0] suffix = "" for time in self.write_times: self._field_names[time] = [] with Dataset(self._file_name(time, suffix)) as data: for key in data.variables.keys(): if data[key].shape[0] == n_points: self._field_names[time].append(key) return self._field_names @property def vertices(self) -> pt.Tensor: if self._mesh_data is None: self._load_mesh_data() return self._mesh_data[:, :3] @property def weights(self) -> pt.Tensor: if self._mesh_data is None: self._load_mesh_data() return self._mesh_data[:, 3]
[docs] class TAUSurfaceDataloader(TAUBase): """Load TAU surface data. The loader is currently limited to read and parse reconstructed/'gathered' surface data from a NetCFD4 container. Note: the functionality of this class was first implemented by Sebastian Spinner (DLR) and then refactored and merged into flowTorch. Examples >>> from os.path import join >>> from flowtorch import DATASETS >>> from flowtorch.data import TAUDataloader >>> path = DATASETS["tau_wing_surface"] >>> loader = TAUDataloader(join(path, "simulation.para")) >>> loader.zone_names ['WingUpper', 'WingLower', 'WingTE', 'WingTipRight', 'WingTipLeft'] >>> loader.zone = 'WingLower' >>> times = loader.write_times >>> fields = loader.field_names[times[0]] >>> fields ['x_velocity', 'y_velocity', ...] >>> cp_lower = loader.load_snapshot("cp", times) """ def __init__(self, parameter_file: str, dtype: str = DEFAULT_DTYPE): super().__init__(parameter_file, False, dtype) self._solution_name = SURF_SOLUTION_NAME self._time_iter = self._decompose_file_name() self._zone = self.zone_names[0] self._zone_ids = None def _load_zone_ids(self): """Load global vertex/field indices of zones. TAU surface meshes consist of triangles and/or quadrilaterals. The grid file contains lists of global point ids that form individual elements. For example, a surface mesh consisting of three triangular elements could be described by the tensor [[5, 2, 1], [1, 4, 6], [2, 3, 5]], where the first triangle would be formed by the points with id 5, 2, and 1. Each element also comes with a boundary marker encoding the zone the element belongs to. If a zone contains both triangular and quadrilateral elements, the merged list of all elements is ordered such that all triangles come first. """ path = join(self._para.path, self._para.config[GRID_FILE_KEY]) with Dataset(path) as data: boundary_markers = pt.tensor( data.variables["boundarymarker_of_surfaces"][:], dtype=int) surface_tri, surface_quad = None, None surface_tri_key = "points_of_surfacetriangles" if surface_tri_key in data.variables.keys(): surface_tri = pt.tensor( data.variables[surface_tri_key][:], dtype=int) surface_quad_key = "points_of_surfacequadrilaterals" if surface_quad_key in data.variables.keys(): surface_quad = pt.tensor( data.variables[surface_quad_key][:], dtype=int) self._zone_ids = {} for zone_name, zone_markers in self._para.config[BMAP_FILE_KEY].items(): marker_selection = pt.isin( boundary_markers, pt.tensor(zone_markers)) if surface_tri is not None and surface_quad is not None: expanded = pt.empty((surface_tri.size(0), 4), dtype=pt.float64) expanded[:, :3] = surface_tri expanded[:, 3] = float("nan") merged = pt.unique( pt.cat((expanded, surface_quad), dim=0)[marker_selection].flatten()) sorted, indices = pt.sort(merged[~pt.isnan( merged)].type(pt.int64)) self._zone_ids[zone_name] = sorted del expanded, merged elif surface_tri is not None: self._zone_ids[zone_name] = pt.unique( surface_tri[boundary_markers].flatten() ).type(pt.int64) else: self._zone_ids[zone_name] = pt.unique( surface_quad[boundary_markers].flatten() ).type(pt.int64) def _load_mesh_data(self): """Load mesh vertices for all zones. The mesh data is saved as class member `_mesh_data`. The tensor for each zone has the dimension n_points x 4; the first three columns correspond to the x/y/z coordinates. Loading or computing the face area is currently not implemented; instead, the weight of each element is set to unity. """ path = join(self._para.path, self._para.config[GRID_FILE_KEY]) with Dataset(path) as data: vertices = pt.stack( [pt.tensor(data[key][:], dtype=self._dtype) for key in VERTEX_KEYS], dim=-1 ) self._mesh_data = {} for zone_name, zone_ids in self.zone_ids.items(): self._mesh_data[zone_name] = pt.ones( (zone_ids.size(0), 4), dtype=self._dtype) self._mesh_data[zone_name][:, :3] = vertices[zone_ids] def _load_single_snapshot(self, field_name: str, time: str) -> pt.Tensor: with Dataset(self._file_name(time)) as data: global_ids = pt.from_numpy(data.variables["global_id"][:]) ids = pt.where(pt.isin(global_ids, self.zone_ids[self.zone]))[0].numpy() field = pt.tensor( data.variables[field_name][:], dtype=self._dtype) return field[ids] @property def field_names(self) -> Dict[str, List[str]]: """Find available fields in solution files. Available fields are determined by matching the number of weights with the length of datasets in the available solution files; for distributed cases, the fields are only determined based on *domain_0*. :return: dictionary with time as key and list of available solution fields as value :rtype: Dict[str, List[str]] """ self._field_names = defaultdict(list) n_points = pt.unique( pt.cat([ids for ids in self.zone_ids.values()])).size(0) for time in self.write_times: with Dataset(self._file_name(time)) as data: for key in data.variables.keys(): if data[key].shape[0] == n_points: self._field_names[time].append(key) return self._field_names @property def mesh_data(self) -> Dict[str, pt.Tensor]: if self._mesh_data is None: self._load_mesh_data() return self._mesh_data @property def vertices(self) -> pt.Tensor: return self.mesh_data[self.zone][:, :3] @property def weights(self) -> pt.Tensor: return self.mesh_data[self.zone][:, 3] @property def zone_ids(self) -> Dict[str, pt.Tensor]: if self._zone_ids is None: self._load_zone_ids() return self._zone_ids @property def zone_names(self) -> List[str]: """Names of available blocks/zones. :return: block/zone names :rtype: List[str] """ return list(self._para.config[BMAP_FILE_KEY].keys()) @property def zone(self) -> str: """Currently selected block/zone. :return: block/zone name :rtype: str """ return self._zone @zone.setter def zone(self, value: str): """Select active block/zone. The selected block remains unchanged if an invalid block name is passed. :param value: name of block to select :type value: str """ if value in self.zone_names: self._zone = value else: print(f"Zone '{value}' not found. Available zones are:") print(self.zone_names)