Source code for flowtorch.data.foam_dataloader

"""Classes to work with OpenFOAM cases, meshes, and fields.

The :class:`FOAMDataloader` class allows to load fields from
an OpenFOAM simulation folder. Currently, only the ESI-OpenCFD
branch of OpenFOAM is supported (v1912, v2006). The :class:`FOAMCase`
class assembles information about the folder and file structure
of a simulation. The :class:`FOAMMesh` allows loading and parsing
the finite volume mesh.
"""

# standard library packages
import glob
import os
import struct
import sys
from typing import List, Dict, Tuple, Union
# third party packages
import torch as pt
# flowtorch packages
from flowtorch import DEFAULT_DTYPE
from .dataloader import Dataloader
from .utils import check_and_standardize_path, check_list_or_str


FIELD_TYPE_DIMENSION = {
    b"volScalarField": 1,
    b"volVectorField": 3
}
CONSTANT_PATH = "constant/"
POLYMESH_PATH = "constant/polyMesh/"
MESH_FILES = ("points", "owner", "neighbour", "faces", "boundary")


MAX_LINE_HEADER = 30
MAX_LINE_INTERNAL_FIELD = 40

SIZE_OF_CHAR = struct.calcsize("c")
SIZE_OF_INT = struct.calcsize("i")
SIZE_OF_DOUBLE = struct.calcsize("d")


[docs]class FOAMDataloader(Dataloader): """Load internal fields and mesh properties of OpenFOAM cases. The project ofpp_ by Xu Xianghua has been a great help to implement some of the methods. .. _ofpp: https://github.com/xu-xianghua/ofpp Examples >>> from flowtorch import DATASETS >>> from flowtorch.data import FOAMDataloader >>> path = DATASETS["of_cavity_ascii_parallel"] >>> loader = FOAMDataloader(path) >>> loader.write_times ['0', '0.1', '0.2', '0.3', '0.4', '0.5'] >>> loader.field_names {'0': ['p', 'U'], '0.1': ['p', 'phi', 'U'], '0.2': ['p', 'phi', 'U'], '0.3': [ 'p', 'phi', 'U'], '0.4': ['p', 'phi', 'U'], '0.5': ['p', 'phi', 'U']} >>> vertices = loader.vertices >>> vertices[:3] tensor([[0.0025, 0.0025, 0.0050], [0.0075, 0.0025, 0.0050], [0.0125, 0.0025, 0.0050]]) >>> loader.weights[:3] # cell volumes tensor([2.5000e-07, 2.5000e-07, 2.5000e-07]) >>> p = loader.load_snapshot("p", "0.5") >>> p.shape torch.Size([400]) >>> p[:3] tensor([ 4.2993e-06, -5.8226e-03, -1.2960e-02]) """ def __init__(self, path: str, dtype: str = DEFAULT_DTYPE): """Create a FOAMDataloader instance from a path. :param path: path to an OpenFOAM simulation folder. :type path: str :param dtype: tensor type; default is single precision, `torch.float32` :type dtype: str """ self._case = FOAMCase(path) self._mesh = FOAMMesh(self._case) self._dtype = dtype def _parse_data(self, data: List[str]) -> pt.Tensor: field_type = self._field_type(data[:MAX_LINE_HEADER]) if not field_type in FIELD_TYPE_DIMENSION.keys(): raise ValueError(f"Field type {field_type} not supported.") try: if self._case._is_binary(data[:MAX_LINE_HEADER]): field_data = self._unpack_internalfield_binary( data, FIELD_TYPE_DIMENSION[field_type] ) else: field_data = self._unpack_internalfield_ascii( data, FIELD_TYPE_DIMENSION[field_type] ) except Exception as e: print(e) else: return field_data def _find_nonuniform(self, data: List[str]) -> Tuple[int, int]: for i, line in enumerate(data): if b"nonuniform" in line: return i, int(data[i+1]) return 0, 0 def _field_type(self, data: List[str]) -> str: for line in data: if b"class" in line: for field_type in FIELD_TYPE_DIMENSION.keys(): if field_type in line: return field_type return None return None def _unpack_internalfield_ascii(self, data: List[str], dim: int) -> pt.Tensor: start, n_values = self._find_nonuniform(data[:MAX_LINE_INTERNAL_FIELD]) start += 3 if dim == 1: return pt.tensor([float(line) for line in data[start:start + n_values]], dtype=self._dtype) else: return pt.tensor( [list(map(float, line[1:-2].split())) for line in data[start:start + n_values]], dtype=self._dtype ) def _unpack_internalfield_binary(self, data: List[str], dim: int) -> pt.Tensor: start, n_values = self._find_nonuniform(data[:MAX_LINE_INTERNAL_FIELD]) start += 2 buffer = b"".join(data[start:]) values = struct.unpack( "{}d".format(dim*n_values), buffer[SIZE_OF_CHAR:SIZE_OF_CHAR+SIZE_OF_DOUBLE*n_values*dim] ) if dim > 1: return pt.tensor(values, dtype=self._dtype).reshape(n_values, dim) else: return pt.tensor(values, dtype=self._dtype) def _load_single_snapshot(self, field_name: str, time: str) -> pt.Tensor: file_paths = [] if self._case._distributed: for proc in range(self._case._processors): file_paths.append( self._case.build_file_path(field_name, time, proc)) else: file_paths.append(self._case.build_file_path(field_name, time, 0)) field_data = [] for file_path in file_paths: with open(file_path, "rb") as file: field_data.append(self._parse_data(file.readlines())) return pt.cat(field_data) def _load_multiple_snapshots(self, field_name: str, times: List[str]) -> pt.Tensor: return pt.stack( [self._load_single_snapshot(field_name, time) for time in times], dim=-1 )
[docs] def load_snapshot(self, field_name: Union[List[str], str], time: Union[List[str], str]) -> Union[List[pt.Tensor], pt.Tensor]: check_list_or_str(field_name, "field_name") check_list_or_str(time, "time") # load multiple fields if isinstance(field_name, list): if isinstance(time, list): return [self._load_multiple_snapshots(name, time) for name in field_name] else: return [self._load_single_snapshot(name, time) for name in field_name] # load a single field else: if isinstance(time, list): return self._load_multiple_snapshots(field_name, time) else: return self._load_single_snapshot(field_name, time)
@ property def write_times(self) -> List[str]: """ Access to available snapshot/write times via :func:`FOAMCase._eval_write_times`. :getter: returns the available write times :type: List[str] """ return self._case._time_folders @ property def field_names(self) -> Dict[str, List[str]]: """ Access to the available field names for all available write times via :func:`FOAMCase._eval_field_names`. :getter: returns names of availabe fields :type: Dict[str, List[str]] """ return self._case._field_names @ property def vertices(self) -> pt.Tensor: """ In OpenFOAM, field for post-processing are defined at the control volume's center (*vol<Type>Fields*). Therefore, the `vertices` property enables access to cell center locations via :class:`FOAMMesh`. :getter: returns control volume centers :type: pt.Tensor """ return self._mesh.get_cell_centers() @ property def weights(self) -> pt.Tensor: """ For results obtained using a finite volume method with co-located arrangement (OpenFOAM), a sensible weight for a cell-centered value is the cell volume. The cell volumes are availabe via the :class:`FOAMMesh` class. :getter: returns cell volumes :type: pt.Tensor """ return self._mesh.get_cell_volumes()
[docs]class FOAMCase(object): """Class to access and parse OpenFOAM cases. Most of the attributes and methods are private because they are typically accessed via a :class:`FOAMDataloader` instance. .. automethod:: _eval_distributed .. automethod:: _eval_processors .. automethod:: _eval_write_times .. automethod:: _eval_field_names """ def __init__(self, path: str): """Create a `FOAMCase` instance based on a path. :param path: path to OpenFOAM simulation case :type path: str """ self._path = check_and_standardize_path(path) self._distributed = self._eval_distributed() self._processors = self._eval_processors() self._time_folders = self._eval_write_times() self._field_names = self._eval_field_names() if not self._check_mesh_files(): sys.exit("Error: could not find valid mesh in case {:s}".format( self._case._path)) def _is_binary(self, header: List[str]) -> bool: for line in header: if b"format" in line: if b"binary" in line: return True else: return False return False def _check_mesh_files(self) -> bool: """Check if all mesh files are available. """ if self._distributed: files_found = [] for proc in range(self._processors): files_found += [ os.path.isfile( self._path + "/processor{:d}/".format(proc) + POLYMESH_PATH + mesh_file ) for mesh_file in MESH_FILES ] else: files_found = [ os.path.isfile( self._path + "/" + POLYMESH_PATH + mesh_file ) for mesh_file in MESH_FILES ] return all(files_found)
[docs] def _eval_distributed(self) -> bool: """Check if the simulation case is distributed (parallel). .. warning:: Collated output is currently not supported/not detected. :return: `True` if distributed :rtype: bool """ proc_dirs = glob.glob(self._path + "/processor*") return len(proc_dirs) > 0
[docs] def _eval_processors(self) -> int: """Get number of processor folders. :return: number of processor folders or 1 for serial runs :rtype: int """ if self._distributed: return len(glob.glob(self._path + "/processor*")) else: return 1
[docs] def _eval_write_times(self) -> List[str]: """Assemble a list of all write times. :return: a list of all time folders :rtype: list(str) .. warning:: For distributed simulations, it is assumed that all processor folders contain the same time folders. """ if self._distributed: time_path = self._path + "/processor0" else: time_path = self._path dirs = [folder for folder in os.listdir(time_path) if os.path.isdir(os.path.join(time_path, folder))] time_dirs = [] for entry in dirs: try: _ = float(entry) time_dirs.append(entry) except: pass if len(time_dirs) < 2: print( "Warning: found only one or less time folders in {:s}" .format(self._path) ) return sorted(time_dirs, key=float)
[docs] def _eval_field_names(self) -> Dict[str, List[str]]: """Get a dictionary of all fields and files in all time folders. .. warning:: For distributed cases, only *processor0* is evaluated. The fields for all other processors are assumed to be the same. :return: dictionary with write times as keys and a list of field names for each time as values :rtype: dict """ all_time_folders = [ self.build_file_path("", time, 0) for time in self._time_folders ] all_fields = {} for i, folder in enumerate(all_time_folders): all_fields[self._time_folders[i]] = [ field for field in os.listdir(folder) if os.path.isfile(os.path.join(folder, field)) ] return all_fields
[docs] def build_file_path(self, field_name: str, time: str, processor: int = 0) -> str: """Create the path to file inside the time folder of a simulation. :param field_name: name of the field or file, e.g., \"U\" or \"p\" :type field_name: str :param time: name of the time folder, e.g., \"0.01\" :type time: str :param processor: processor folder to load the data from; ignored in serial simulation cases; defaults to `0` :type processor: int, optional :return: path to file inside a time folder :rtype: str Examples >>> from flowtorch.data import FOAMCase >>> case = FOAMCase("./cavity_binary_parallel/") >>> case._distributed True >>> case._processors 4 >>> case._time_folders ['0', '0.1', '0.2', '0.3', '0.4', '0.5'] >>> case._field_names {'0': ['U', 'p'], '0.1': ['U', 'p', 'phi'], '0.2': ['U', 'p', 'phi'], '0.3': [ 'U', 'p', 'phi'], '0.4': ['U', 'p', 'phi'], '0.5': ['U', 'p', 'phi']} >>> case.build_file_path("U", "0.1", 1) './cavity_binary_parallel/processor1/0.1/U' """ if self._distributed: file_path = ( self._path + "/processor{:d}/{:s}/{:s}".format(processor, time, field_name) ) else: file_path = self._path + "/{:s}/{:s}".format(time, field_name) return file_path
[docs]class FOAMMesh(object): """Class to load and process OpenFOAM meshes. OpenFOAM stores the finite volume mesh as a collection of several files located in *constant/polyMesh* or in *processorXX/constant/polyMesh* for serial and distributed cases, respectively. Even though OpenFOAM is a cell-centered finite volume method, the cell-centers and volumes are not explicitly stored. Instead, a so-called face-addressing storage is used. All internal faces have an owner cell and a neighbor cell. Boundary faces only have an owner cell. The mesh attributes are defined in several files: - **points**: list of vertices forming cell faces; the list index of a point is used as label - **faces**: list of all cell faces defined as point labels - **owner**: list of cell labels that are face owners - **neighbour**: list of cell labels that are face neighbors; BE spelling - **boundary**: definition of faces belonging to a patch Examples >>> from flowtorch.data import FOAMMesh >>> mesh = FOAMMesh.from_path("./") >>> centers = mesh.get_cell_centers() >>> centers.size() torch.Size([400, 3]) >>> centers[:2] tensor([[0.0025, 0.0025, 0.0050], [0.0075, 0.0025, 0.0050]]) >>> volumes = mesh.get_cell_volumes() >>> volumes.size() torch.Size([400]) >>> volumes[:2] tensor([2.5000e-07, 2.5000e-07]) .. warning:: Dynamically changing meshes are currently not supported. .. warning:: Distributed meshes may be parsed and concatenated, but the cell centers and volumes won't have the same ordering as when computed from a reconstructed mesh. .. automethod:: _compute_face_centers_and_areas .. automethod:: _compute_cell_centers_and_volumes """ def __init__(self, case: FOAMCase, dtype: str = DEFAULT_DTYPE): """Create FOAMMesh object based on :class:`FOAMCase`. """ if not isinstance(case, FOAMCase): sys.exit("Error: case must be of type FOAMCase, not {:s}" .format(type(case).__name__)) self._case = case self._dtype = dtype self._itype = pt.int64 self._cell_centers = None self._cell_volumes = None
[docs] @ classmethod def from_path(cls, path: str, dtype: str = DEFAULT_DTYPE): """Create FOAMMesh object based on path to OpenFOAM simulation case. """ return cls(FOAMCase(path), dtype)
def _get_list_length(self, data: List[str]) -> Tuple[int, int]: """Find list length of points, faces, and cells. :param data: number of elements in OpenFOAM list and line with first list entry :type data: tuple(int, int) """ for i, line in enumerate(data): try: n_entries = int(line) except: pass else: return i, n_entries return 0, 0 def _get_n_cells(self, mesh_path: str) -> int: """Extract number of cells from *owner* file. :param mesh_path: polyMesh location :type mesh_path: str """ n_cells = 0 with open(mesh_path + "owner", "rb") as file: found = False while not found: line = file.readline() if b"note" in line: tokens = line.split() for token in tokens: if b"nCells" in token: n_cells = int(token.split(b":")[1]) found = True return n_cells def _parse_points(self, mesh_path: str) -> pt.Tensor: """Parse mesh vertices defined in *constant/polyMesh/points*. """ with open(mesh_path + "points", "rb") as file: data = file.readlines() start, length = self._get_list_length(data[:MAX_LINE_HEADER]) if self._case._is_binary(data[:MAX_LINE_HEADER]): start += 1 buffer = b"".join(data[start:]) values = struct.unpack( "{}d".format(3*length), buffer[SIZE_OF_CHAR:SIZE_OF_CHAR+SIZE_OF_DOUBLE*length*3] ) return pt.tensor(values, dtype=self._dtype).reshape(length, 3) else: start += 2 return pt.tensor( [list(map(float, line[1:-2].split())) for line in data[start:start + length]], dtype=self._dtype ) def _parse_faces(self, mesh_path: str) -> Tuple[pt.Tensor, pt.Tensor]: """Parse cell faces stored in in *constant/polyMesh/faces*. """ def zero_pad(tensor, new_size): """Increase size of second tensor dimension. """ diff = new_size - tensor.size()[1] pad = pt.zeros((tensor.size()[0], diff), dtype=self._itype) return pt.cat([tensor, pad], dim=1) with open(mesh_path + "faces", "rb") as file: data = file.readlines() start, length = self._get_list_length(data[:MAX_LINE_HEADER]) if self._case._is_binary(data[:MAX_LINE_HEADER]): n_points_faces = pt.zeros((length-1, 1), dtype=self._itype) faces = pt.zeros_like(n_points_faces, dtype=self._itype) start += 1 buffer = b"".join(data[start:]) idx = struct.unpack( "{}i".format(length), buffer[SIZE_OF_CHAR:SIZE_OF_CHAR + SIZE_OF_INT*length] ) # search or the next opening bracket to see where the second list starts # the length of the second list is interpreted as a sequence of characters # so looking 50 characters ahead allows for a very large number of faces list_0_end = SIZE_OF_INT*length offset = 0 for i, c in enumerate(buffer[list_0_end:list_0_end + 50*SIZE_OF_CHAR]): if chr(c) == r"(": offset = i+1 values = struct.unpack( "{}i".format(idx[-1]), buffer[offset+SIZE_OF_INT*length:offset + (length+idx[-1])*SIZE_OF_INT] ) for i in range(length-1): face_labels = pt.tensor( values[idx[i]:idx[i+1]], dtype=self._itype) n_points_faces[i] = len(face_labels) if len(face_labels) > faces.size()[1]: faces = zero_pad(faces, len(face_labels)) faces[i][:len(face_labels)] = face_labels else: n_points_faces = pt.zeros((length, 1), dtype=self._itype) faces = pt.zeros_like(n_points_faces, dtype=self._itype) start += 2 for i, line in enumerate(data[start:start + length]): n_points_faces[i] = int(line[:1]) face_labels = pt.tensor( list(map(int, line[2:-2].split())), dtype=self._itype) if len(face_labels) > faces.size()[1]: faces = zero_pad(faces, len(face_labels)) faces[i][:len(face_labels)] = face_labels return n_points_faces, faces def _parse_owners_and_neighbors(self, mesh_path: str) -> Tuple[pt.Tensor, pt.Tensor]: """Parse face owners and neighbors. - owners are parsed from *constant/polyMesh/owner* - neighbors are parsed from *constant/polyMesh/neighbour* """ with open(mesh_path + "owner", "rb") as file: data = file.readlines() start, length = self._get_list_length(data[:MAX_LINE_HEADER]) if self._case._is_binary(data[:MAX_LINE_HEADER]): start += 1 buffer = b"".join(data[start:]) owner_values = struct.unpack( "{}i".format(length), buffer[SIZE_OF_CHAR:SIZE_OF_CHAR+SIZE_OF_INT*length] ) else: start += 2 owner_values = [ int(line[:-1]) for line in data[start:start + length] ] with open(mesh_path + "neighbour", "rb") as file: data = file.readlines() start, length = self._get_list_length(data[:MAX_LINE_HEADER]) if self._case._is_binary(data[:MAX_LINE_HEADER]): start += 1 buffer = b"".join(data[start:]) neighbor_values = struct.unpack( "{}i".format(length), buffer[SIZE_OF_CHAR:SIZE_OF_CHAR+SIZE_OF_INT*length] ) else: start += 2 neighbor_values = [ int(line[:-1]) for line in data[start:start + length] ] return ( pt.tensor(owner_values, dtype=self._itype), pt.tensor(neighbor_values, dtype=self._itype) ) def _centers_and_volumes_computed(self, path: str) -> bool: """Check if files *C* and *V* exist in the specified location. """ return os.path.isfile(path + "C") and os.path.isfile(path + "V") def _parse_cell_centers(self, path: str) -> pt.Tensor: """Parse cell centers from the constant directory. """ with open(path + "C", "rb") as file: data = file.readlines() start, length = self._get_list_length(data[:MAX_LINE_HEADER]) if self._case._is_binary(data[:MAX_LINE_HEADER]): start += 1 buffer = b"".join(data[start:]) values = struct.unpack( "{}d".format(3*length), buffer[SIZE_OF_CHAR:SIZE_OF_CHAR+SIZE_OF_DOUBLE*length*3] ) return pt.tensor(values, dtype=self._dtype).reshape(length, 3) else: start += 2 return pt.tensor( [list(map(float, line[1:-2].split())) for line in data[start:start + length]], dtype=self._dtype ) def _parse_cell_volumes(self, path: str) -> pt.Tensor: """Parse cell volumes from the constant directory. """ with open(path + "V", "rb") as file: data = file.readlines() start, length = self._get_list_length(data[:MAX_LINE_HEADER]) if self._case._is_binary(data[:MAX_LINE_HEADER]): start += 1 buffer = b"".join(data[start:]) values = struct.unpack( "{}d".format(length), buffer[SIZE_OF_CHAR:SIZE_OF_CHAR+SIZE_OF_DOUBLE*length] ) return pt.tensor(values, dtype=self._dtype) else: start += 2 return pt.tensor( [float(line[:-1]) for line in data[start:start + length]], dtype=self._dtype )
[docs] def _compute_face_centers_and_areas(self, points: pt.Tensor, faces: pt.Tensor, n_points_faces: pt.Tensor ) -> Tuple[pt.Tensor, pt.Tensor]: """Compute face centers and areas. The implemented algorithm is close to the one in makeFaceCentresAndAreas_. The main steps are: 1. compute an estimate of the face center by averaging all face vertices 2. decompose the face into triangles 3. compute the sum over all area-weighted triangle centroids, triangle areas, and face area normal vectors 4. compute the face centroid and face area normal from the (weighted) sums .. _makeFaceCentresAndAreas: https://www.openfoam.com/documentation/guides/latest/api/primitiveMeshFaceCentresAndAreas_8C_source.html """ face_centers = pt.zeros( (n_points_faces.size()[0], 3), dtype=self._dtype) face_areas = pt.zeros_like(face_centers, dtype=self._dtype) center_estimates = pt.zeros_like(face_centers, dtype=self._dtype) for i in range(faces.shape[1]): center_estimates += points[faces[:, i]] * \ pt.where(i < n_points_faces, 1, 0) center_estimates /= n_points_faces area_sums = pt.zeros_like(n_points_faces, dtype=self._dtype) for i in range(faces.shape[1]): this_point_mask = pt.where(i < n_points_faces, 1, 0) next_point_mask = pt.where(i+1 < n_points_faces, 1, 0) last_point_mask = pt.where(i+1 == n_points_faces, 1, 0) this_point = points[faces[:, i]] * this_point_mask if i+1 < faces.shape[1]: next_point = points[faces[:, i+1]] * next_point_mask else: next_point = pt.zeros_like(face_centers, dtype=self._dtype) next_point += points[faces[:, 0]] * last_point_mask c = center_estimates * this_point_mask + this_point + next_point n = pt.cross( next_point - this_point, center_estimates * this_point_mask - this_point, dim=1 ) a = pt.norm(n, dim=1).unsqueeze(1) face_centers += c * a area_sums += a face_areas += n face_centers /= (area_sums * 3.0) face_areas *= 0.5 return face_centers, face_areas
[docs] def _compute_cell_centers_and_volumes(self, mesh_path: str) -> Tuple[pt.Tensor, pt.Tensor]: """Compute the cell centers and volumes of an OpenFOAM mesh. The implemented algorithm is close to the one in makeCellCentresAndVols_. The following steps are involved: 1. compute an estimate of the cell center as the average over all face centers 2. compute centroids and volumes of all pyramids formed by the cell faces and and the center estimate 3. the cell volume equals the sum over all pyramid volumes 4. the cell center is the volume-weighted average of all pyramid centroids .. _makeCellCentresAndVols: https://www.openfoam.com/documentation/guides/latest/api/primitiveMeshCellCentresAndVols_8C_source.html """ points = self._parse_points(mesh_path) n_points_faces, faces = self._parse_faces(mesh_path) owners, neighbors = self._parse_owners_and_neighbors(mesh_path) face_centers, face_areas = self._compute_face_centers_and_areas( points, faces, n_points_faces ) n_cells = self._get_n_cells(mesh_path) cell_centers = pt.zeros((n_cells, 3), dtype=self._dtype) cell_volumes = pt.zeros(n_cells, dtype=self._dtype) center_estimate = pt.zeros_like(cell_centers) n_faces_cell = pt.zeros(n_cells, dtype=self._itype) for i, owner in enumerate(owners): center_estimate[owner] += face_centers[i] n_faces_cell[owner] += 1 for i, neigh in enumerate(neighbors): center_estimate[neigh] += face_centers[i] n_faces_cell[neigh] += 1 center_estimate /= n_faces_cell.unsqueeze(-1) for i, owner in enumerate(owners): pyr_3vol = pt.dot( face_areas[i], face_centers[i] - center_estimate[owner] ) pyr_ctr = 3.0/4.0 * face_centers[i] + center_estimate[owner] / 4.0 cell_centers[owner] += pyr_3vol * pyr_ctr cell_volumes[owner] += pyr_3vol for i, neigh in enumerate(neighbors): pyr_3vol = pt.dot( face_areas[i], center_estimate[neigh] - face_centers[i] ) pyr_ctr = 3.0/4.0 * face_centers[i] + center_estimate[neigh] / 4.0 cell_centers[neigh] += pyr_3vol * pyr_ctr cell_volumes[neigh] += pyr_3vol cell_centers /= cell_volumes.unsqueeze(-1) cell_volumes /= 3.0 return cell_centers, cell_volumes
def _load_mesh(self): """Load or compute cell volumes and centers. .. warning:: For distributed cases, individual processor fields are simply concatenated. This reconstruction does not yield volumes and centers in the same order as the reconstructed OpenFOAM mesh. """ if self._case._distributed: proc_data = [] for proc in range(self._case._processors): mesh_location = self._case._path + \ "/processor{:d}/".format(proc) + POLYMESH_PATH proc_data.append( self._compute_cell_centers_and_volumes(mesh_location) ) centers = pt.cat(list(zip(*proc_data))[0]) volumes = pt.cat(list(zip(*proc_data))[1]) else: if self._centers_and_volumes_computed( self._case._path + f"/{CONSTANT_PATH}" ): print( f"Loading precomputed cell centers and volumes from {CONSTANT_PATH}") centers = self._parse_cell_centers( self._case._path + "/" + CONSTANT_PATH) volumes = self._parse_cell_volumes( self._case._path + "/" + CONSTANT_PATH) else: mesh_location = self._case._path + "/" + POLYMESH_PATH print( "Could not find precomputed cell centers and volumes.\n" + "Computing cell geometry from scratch (slow, not recommended for large meshes).\n" + "To compute cell centers and volumes in OpenFOAM, run:\n\n" + "postProcess -func \"writeCellCentres\" -constant -time none\n" + "postProcess -func \"writeCellVolumes\" -constant -time none" ) centers, volumes = self._compute_cell_centers_and_volumes( mesh_location) self._cell_centers = centers self._cell_volumes = volumes
[docs] def get_cell_centers(self) -> pt.Tensor: """Return or compute and return control volume centers. :return: control volume centers :rtype: pt.Tensor """ if self._cell_centers == None: self._load_mesh() return self._cell_centers
[docs] def get_cell_volumes(self) -> pt.Tensor: """Return or compute and return cell volumes. :return: cell volumes :rtype: pt.Tensor """ if self._cell_volumes == None: self._load_mesh() return self._cell_volumes