"""Module to read and write the internal flowTorch data format.
The :class:`HDF5Writer` class allows to write field and mesh data
into an HDF5 file. It also creates an XDMF accessor file for postprocessing
in ParaView. The XDMF file creation is implemented in the :class:`XDMFWriter`
class. The :class:`HDF5Dataloader` class is the complementary writer for the
flowTorch data format. Moreover, the :class:`FOAM2HDF5` class allows conversion
of reconstructed OpenFOAM simulation cases into the HDF5-based flowTorch format.
"""
# standard library packages
from os.path import isfile, exists
from os import remove
from typing import List, Tuple, Dict, Union
import sys
# third party packages
import torch as pt
from h5py import File
# flowtorch packages
from flowtorch import DEFAULT_DTYPE
from .dataloader import Dataloader
from .foam_dataloader import FOAMCase, FOAMMesh, FOAMDataloader, POLYMESH_PATH, MAX_LINE_HEADER, FIELD_TYPE_DIMENSION
from .utils import check_list_or_str
CONST_GROUP = "constant"
VAR_GROUP = "variable"
VERTICES_DS = "vertices"
CONNECTIVITY_DS = "connectivity"
CENTERS_DS = "centers"
VOLUMES_DS = "volumes"
XDMF_HEADER = """<source lang="xml" line="1">
<Domain>
<Grid Name="flowTorch" GridType="Collection" CollectionType="Temporal">
"""
XDMF_FOOTER = """
</Grid>
</Domain>
</source>
"""
TOPOLOGY = "topology"
GEOMETRY = "geometry"
dtype_conversion = {
pt.float32: "f4",
pt.float64: "f8",
pt.int32: "i4",
pt.int64: "i8",
"float32": "f4",
"float64": "f8",
"int32": "i4",
"int64": "i8"
}
xdmf_attributes = {
1: "Scalar",
2: "Vector",
3: "Vector",
6: "Tensor6",
9: "Tensor"
}
xdmf_dtype_conversion = {
"f4": ("Float", 4),
"f8": ("Float", 8),
"i4": ("Int", 4),
"i8": ("Int", 8),
"float32": ("Float", 4),
"float64": ("Float", 8),
"int32": ("Int", 4),
"int64": ("Int", 8)
}
[docs]class HDF5Dataloader(Dataloader):
"""Load HDF5-based flowTorch data.
Examples
>>> from flowtorch import HDF5Dataloader
>>> loader = HDF5Dataloader("flowtorch.hdf5")
>>> times = loader.write_times
>>> p, U = loader.load_snapshot(["p", "U"], write_times)
>>> vertices = loader.vertices
"""
def __init__(self, file_path: str, dtype: str = DEFAULT_DTYPE):
"""Create HDF5Dataloader instance.
:param file_path: path to HDF5 file
:type file_path: str
:param dtype: tensor type of floating point values, defaults to DEFAULT_DTYPE
:type dtype: str, optional
"""
if not isfile(file_path):
raise FileNotFoundError(f"Could not find file {file_path}")
self._file_path = file_path
self._dtype = dtype
self._file = File(self._file_path, mode="a")
[docs] def load_snapshot(self, field_name: Union[List[str], str],
time: Union[List[str], str]) -> Union[List[pt.Tensor], pt.Tensor]:
check_list_or_str(field_name, "field_name")
check_list_or_str(time, "time")
# load multiple fields
if isinstance(field_name, list):
if isinstance(time, list):
return [pt.stack([pt.tensor(
self._file[f"{VAR_GROUP}/{t}/{field}"][:], dtype=self._dtype
).squeeze() for t in time], dim=-1) for field in field_name]
else:
return [pt.tensor(
self._file[f"{VAR_GROUP}/{time}/{field}"][:], dtype=self._dtype
).squeeze() for field in field_name]
# load single field
else:
if isinstance(time, list):
return pt.stack([pt.tensor(
self._file[f"{VAR_GROUP}/{t}/{field_name}"][:], dtype=self._dtype
).squeeze() for t in time], dim=-1)
else:
return pt.tensor(
self._file[f"{VAR_GROUP}/{time}/{field_name}"][:], dtype=self._dtype
).squeeze()
@property
def write_times(self) -> List[str]:
return list(self._file[VAR_GROUP].keys())
@property
def field_names(self) -> Dict[str, List[str]]:
times = self.write_times
field_names = dict.fromkeys(times, [])
for time in times:
field_names[time] = list(self._file[f"{VAR_GROUP}/{time}"].keys())
return field_names
@property
def vertices(self) -> pt.Tensor:
return pt.tensor(
self._file[f"{CONST_GROUP}/{CENTERS_DS}"][:], dtype=self._dtype
)
@property
def weights(self) -> pt.Tensor:
return pt.tensor(
self._file[f"{CONST_GROUP}/{VOLUMES_DS}"][:], dtype=self._dtype
)
[docs]class HDF5Writer(object):
"""Class to write flowTorch data to HDF5 file.
Two types of data are supported:
- variable: (field) data that changes with times, e.g, snapshots
- constant: constant data like mesh vertices or cell volumes
An XDMF accessor file can be created to support visual post-processing
with ParaView and other XDMF-compatible software packages.
Examples
>>> import torch as pt
>>> from flowtorch.data import HDF5Writer
>>> writer = HDF5Writer("test_file.hdf5")
>>> writer.write("ones", (3, 2), pt.ones((3, 2)), "0.01")
>>> int_data = pt.ones((3, 2), dtype=pt.int32)
>>> writer.write("ones_int", (3, 2), int_data, dtype=pt.int32)
"""
def __init__(self, file: str):
"""Write flowTorch HDF5 files.
:param file: path and file name to HDF5 file.
:type file: str
"""
self._file_path = file
self._file = File(file, mode="a")
[docs] def write(self,
name: str,
size: tuple,
data: pt.Tensor = None,
time: str = None,
dtype: str = DEFAULT_DTYPE
):
"""Write data to HDF5 file.
:param name: dataset name
:type name: str
:param size: dataset shape
:type size: tuple
:param data: data to write; if None, dataset is only allocated
:type data: pt.Tensor, optional
:param time: snapshot time, dataset if created in VAR_GROUP if present
:type time: str, optional
:param dtype: data type, defaults to pt.float32
:type dtype: str, optional
"""
if time is not None:
ds_name = VAR_GROUP + "/{:s}/{:s}".format(time, name)
else:
ds_name = CONST_GROUP + "/{:s}".format(name)
if dtype in dtype_conversion.keys():
if ds_name in self._file:
del self._file[ds_name]
ds = self._file.create_dataset(
ds_name,
size,
dtype=dtype_conversion[dtype]
)
if data is not None:
shape_diff = len(size) - len(data.size())
if shape_diff == 1:
data = data.unsqueeze(-1)
elif shape_diff == -1:
data = data.squeeze()
ds[:] = data.numpy()
else:
print(
"Warning: invalid data type {:s} for field {:s}. Skipping field.".format(
str(dtype), name)
)
[docs] def write_xdmf(self):
writer = XDMFWriter(self._file_path, self._file)
writer.create_xdmf("flowtorch.xdmf")
[docs]class FOAM2HDF5(object):
"""Convert reconstructed OpenFOAM cases to flowTorch HDF5 format.
If the simulation case is decomposed into processor folders/domains,
an info statement is displayed and no conversion is performed.
Examples
>>> from flowtorch import DATASETS
>>> from flowtorch.data import FOAM2HDF5
>>> converter = FOAM2HDF5(DATASETS["of_cavity_ascii"])
>>> converter.convert("cavity.hdf5", skip_zero=True)
"""
def __init__(self, path: str, dtype=DEFAULT_DTYPE):
"""Create FOAM2HDF5 converter from path.
:param path: path to OpenFOAM case
:type path: str
:param dtype: tensor type, defaults to DEFAULT_DTYPE
:type dtype: str, optional
"""
self._loader = FOAMDataloader(path, dtype)
self._dtype = dtype
self._topology = None
self._mesh_points = None
[docs] def convert(self, filename: str, skip_zero: bool = True):
"""Convert OpenFOAM case to flowTorch HDF5 file.
:param filename: name of the HDF5 file
:type filename: str
:param skip_zero: skip zero folder if true; defaults to True
:type skip_zero: bool, optional
"""
file_path = self._loader._case._path + "/" + filename
self._remove_file_if_present(file_path)
if self._loader._case._distributed:
message = """The direct conversion of distributed cases is currently not supported.\n
Workaround:
1. run reconstructPar (OpenFOAM utility)
1.1 if there is no reconstructed mesh, run also reconstructParMesh
2. remove all processor* folders
3. perform the conversion again (flowTorch)
"""
print(message)
else:
print("Writing data to file {:s}".format(file_path))
writer = HDF5Writer(file_path)
print("Converting mesh.")
self._convert_mesh(writer)
print("Converting fields.")
self._convert_fields(writer, skip_zero)
print("Conversion finished. Writing XDMF file.")
writer.write_xdmf()
def _remove_file_if_present(self, file_path: str):
"""Remove output file from previous runs if present
:param file_path: path to file
:type file_path: str
"""
if exists(file_path):
print("Removing old file {:s}".format(file_path))
remove(file_path)
def _convert_mesh(self, writer: HDF5Writer):
mesh_path = self._loader._case._path + "/" + POLYMESH_PATH
n_cells, n_points, n_top = self._gather_mesh_information(mesh_path)
data = self._get_vertices(mesh_path, job=0)
writer.write(VERTICES_DS, (n_points, 3), data, None, self._dtype)
data = self._get_topology(job=0)
writer.write(CONNECTIVITY_DS, (n_top,), data, None, pt.int32)
data = self._get_cell_centers(job=1)
writer.write(CENTERS_DS, (n_cells, 3), data, None, self._dtype)
data = self._get_cell_volumes(job=1)
writer.write(VOLUMES_DS, (n_cells,), data, None, self._dtype)
def _gather_mesh_information(self, mesh_path: str):
"""Gather information for parallel writing of mesh data.
:param mesh_path: path to polyMesh folder
:type mesh_path: str
"""
n_cells = self._loader._mesh._get_n_cells(mesh_path)
n_points_faces, faces = self._loader._mesh._parse_faces(mesh_path)
owners, neighbors = self._loader._mesh._parse_owners_and_neighbors(
mesh_path)
self._mesh_points = self._loader._mesh._parse_points(mesh_path)
cell_faces = [[] for _ in range(n_cells)]
n_faces_cell = pt.zeros(n_cells, dtype=pt.int32)
n_face_labels = 0
for i, owner in enumerate(owners):
cell_faces[owner].append(faces[i][:n_points_faces[i]])
n_face_labels += n_points_faces[i]
n_faces_cell[owner] += 1
for i, neigh in enumerate(neighbors):
cell_faces[neigh].append(faces[i][:n_points_faces[i]])
n_face_labels += n_points_faces[i]
n_faces_cell[neigh] += 1
topology_length = n_cells * 2 + \
pt.sum(n_faces_cell).item() + n_face_labels
self._topology = pt.zeros(topology_length, dtype=pt.int32)
marker = 0
for i, faces in enumerate(cell_faces):
self._topology[marker] = 16
self._topology[marker+1] = len(faces)
marker += 2
for j in range(len(faces)):
n_labels = faces[j].size()[0]
self._topology[marker] = n_labels
self._topology[marker+1:marker+1+n_labels] = faces[j]
marker += n_labels + 1
return n_cells, self._mesh_points.size()[0], self._topology.size()[0]
def _get_topology(self, job: int = 0):
return self._topology
def _get_vertices(self, mesh_path: str, job: int = 0):
return self._mesh_points
def _get_cell_centers(self, job: int = 0):
return self._loader._mesh.get_cell_centers()
def _get_cell_volumes(self, job: int = 0):
return self._loader._mesh.get_cell_volumes()
def _convert_fields(self, writer: HDF5Writer, skip_zero: bool):
"""Convert convert OpenFOAM fields to HDF5.
:param writer: HDF5 writer
:type writer: :class:`HDF5Writer`
:param skip_zero: skip zero folder if true
:type skip_zero: bool
"""
field_info = self._gather_field_information(skip_zero)
for job, info in enumerate(field_info):
print(
f"Converting field {info[0]} at time {info[1]}, dimension {info[2]}")
data = self._load_field(*info[:2], job=job)
writer.write(info[0], info[2], data, info[1])
def _gather_field_information(self, skip_zero: bool) -> List[list]:
"""Gather field information for parallel writing.
- check if field type is supported
- determine data size
:param skip_zero: skip zero folder if true
:type skip_zero: bool
:return: list of all fields; each list element is a list
with the entries [name, time, shape]
:rtype: list
"""
def load_n_lines(file_name, n):
lines = []
with open(file_name, "rb") as file:
for _ in range(n):
lines.append(file.readline())
return lines
field_info = []
mesh_path = self._loader._case._path + "/" + POLYMESH_PATH
n_cells = self._loader._mesh._get_n_cells(mesh_path)
all_fields = self._loader.field_names
if skip_zero and "0" in all_fields.keys():
del all_fields["0"]
for time in all_fields.keys():
for name in all_fields[time]:
path = self._loader._case.build_file_path(name, time)
header = load_n_lines(path, MAX_LINE_HEADER)
field_type = self._loader._field_type(header)
if field_type in FIELD_TYPE_DIMENSION.keys():
field_info.append(
[name, time, (n_cells, FIELD_TYPE_DIMENSION[field_type])])
return field_info
def _load_field(self, field: str, time: str, job: int = 0) -> pt.Tensor:
return self._loader.load_snapshot(field, time)
[docs]class XDMFWriter(object):
"""Create XDMF file to open flowTorch HDF5 files in ParaView.
Example
>>> from flowtorch.data import XDMFWriter
>>> writer = XDMFWriter("flowtorch.hdf5")
>>> writer.create_xdmf("flowtorch.xdmf")
"""
def __init__(self, file_path: str, hdf5_file: File):
"""Create XDMFWriter instance from path and file.
:param file_path: path to DHF5 file
:type file_path: str
:param hdf5_file: HDF5 file instance
:type hdf5_file: File
"""
if "/" in file_path:
self._path = file_path[:file_path.rfind("/")]
self._hdf5_filename = file_path[file_path.rfind("/") + 1:]
else:
self._path = "."
self._hdf5_filename = file_path
self._file = hdf5_file
self._n_cells = self._get_n_cells()
[docs] @classmethod
def from_filepath(cls, file_path: str):
"""Create XDMFWriter from file path.
:param file_path: path to HDF5 file
:type file_path: str
"""
return cls(
file_path,
File(file_path, mode="a")
)
def _get_n_cells(self) -> int:
n_cells = 0
location = "/{:s}/{:s}".format(CONST_GROUP, VOLUMES_DS)
if location in self._file:
n_cells = self._file[location].shape[0]
if n_cells == 0:
print("XDMF warning: could not determine number of cells.")
return n_cells
def _add_grid(self, time: str, offset: str = "") -> str:
"""
"""
grid = offset + "<Grid Name=\"Grid\" GridType=\"Uniform\">\n"
grid += self._add_topology(offset + " "*4)
grid += self._add_geometry(offset + " "*4)
if time is not None:
grid += offset + " "*4 + "<Time Value=\"{:s}\"/>\n".format(time)
for key in self._find_attributes(time):
grid += self._add_attribute(time, key, offset + " "*4)
grid += offset + "</Grid>\n"
return grid
def _find_attributes(self, time: str) -> List[str]:
location = "/{:s}/{:s}".format(VAR_GROUP, time)
keys = self._file[location].keys()
valid_attr = []
for key in keys:
loc = location + "/{:s}".format(key)
first_dim = self._file[loc].shape[0]
if first_dim == self._n_cells:
valid_attr.append(key)
return valid_attr
def _add_topology(self, offset: str = "") -> str:
topology = offset + \
"<Topology Name=\"{:s}\" TopologyType=\"Mixed\">\n".format(
TOPOLOGY)
location = self._hdf5_filename + \
":/{:s}/{:s}".format(CONST_GROUP, CONNECTIVITY_DS)
topology += self._add_dataitem(location, offset + " "*4)
topology += offset + "</Topology>\n"
return topology
def _add_geometry(self, offset: str = "") -> str:
geometry = offset + "<Geometry GeometryType=\"XYZ\">\n"
location = self._hdf5_filename + \
":/{:s}/{:s}".format(CONST_GROUP, VERTICES_DS)
geometry += self._add_dataitem(location, offset + " "*4)
geometry += offset + "</Geometry>\n"
return geometry
def _add_attribute(self, time: str, name: str, offset: str = "") -> str:
location = self._hdf5_filename + ":/{:s}/{:s}/{:s}".format(
VAR_GROUP, time, name
)
shape = self._file[location.split(":")[-1]].shape
tensor_type = xdmf_attributes[len(shape)]
attribute = offset + "<Attribute Name=\"{:s}\" AttributeType=\"{:s}\" Center=\"Cell\">\n".format(
name, tensor_type
)
attribute += self._add_dataitem(location, offset + " "*4)
attribute += offset + "</Attribute>\n"
return attribute
def _add_dataitem(self, location: str, offset: str = "") -> str:
path_in_file = location.split(":")[-1]
shape = self._file[path_in_file].shape
dimensions = " ".join(["{:d}".format(i) for i in shape])
dtype, precision = xdmf_dtype_conversion[
str(self._file[path_in_file].dtype)
]
dataitem = offset + "<DataItem Dimensions=\"{:s}\" NumberType=\"{:s}\" Precision=\"{:d}\" Format=\"HDF\">\n".format(
dimensions, dtype, precision
)
dataitem += offset + " "*4 + "{:s}\n".format(location)
dataitem += offset + "</DataItem>\n"
return dataitem
[docs] def create_xdmf(self, filename: str = None):
"""
:param filename: [description]
:type filename: [type]
"""
xdmf_str = XDMF_HEADER
times = list(self._file[VAR_GROUP].keys())
if len(times) > 0:
for time in times:
xdmf_str += self._add_grid(time, " "*12)
else:
xdmf_str += self._add_grid(None, " "*12)
xdmf_str = xdmf_str[:-1] # remove last linebreak
xdmf_str += XDMF_FOOTER
if filename is None:
if "." in self._hdf5_filename:
filename = self._hdf5_filename[:self._hdf5_filename.rfind(
".")] + ".xdmf"
else:
filename = self._hdf5_filename + ".xdmf"
print(
"Writing file {:s} as wrapper for {:s} at location {:s}".format(
filename, self._hdf5_filename, self._path
)
)
with open(self._path + "/" + filename, "w") as file:
file.write(xdmf_str)