Source code for spine.io.read.hdf5

"""Contains a reader class dedicated to loading data from HDF5 files."""

import os
from dataclasses import fields
from typing import Any
from warnings import warn

import h5py
import numpy as np
import yaml
from yaml.parser import ParserError

import spine.data
from spine.utils.logger import logger

from .base import ReaderBase

__all__ = ["HDF5Reader"]


[docs] class HDF5Reader(ReaderBase): """Class which reads information stored in HDF5 files. This class inherits from the :class:`ReaderBase` class. It provides methods to load HDF5 files and extract their data products. The files must be structured as follows: - An `events` dataset with all the region references - One dataset per data product corresponding to each region reference in the `events` dataset """ name: str = "hdf5"
[docs] def __init__( self, file_keys: str | list[str] | None = None, file_list: str | None = None, limit_num_files: int | None = None, max_print_files: int = 10, n_entry: int | None = None, n_skip: int | None = None, entry_list: list[int] | None = None, skip_entry_list: list[int] | None = None, run_event_list: list[list[int]] | None = None, skip_run_event_list: list[list[int]] | None = None, create_run_map: bool = False, build_classes: bool = True, skip_unknown_attrs: bool = False, run_info_key: str = "run_info", allow_missing: bool = False, keep_open: bool = True, swmr: bool = False, ignore_incomplete: bool = False, ) -> None: """Initalize the HDF5 file reader. Parameters ---------- file_keys : str or list[str], optional Path or list of paths to the HDF5 files to be read file_list : str, optional Path to a text file containing a list of file paths to be read limit_num_files : int, optional Integer limiting number of files to be taken per data directory max_print_files : int, default 10 Maximum number of loaded file names to be printed n_entry : int, optional Maximum number of entries to load n_skip : int, optional Number of entries to skip at the beginning entry_list : list[int], optional List of integer entry IDs to add to the index skip_entry_list : list[int], optional List of integer entry IDs to skip from the index run_event_list : list[list[int]], optional List of (run, subrun, event) triplets to add to the index skip_run_event_list : list[list[int]], optional List of (run, subrun, event) triplets to skip from the index create_run_map : bool, default False Initialize a map between (run, subrun, event) triplets and entries. For large files, this can be quite expensive (must load every entry). build_classes : bool, default True If the stored object is a class, build it back skip_unknown_attrs : bool, default False If `True`, allow a loaded object to have unrecognized attributes. This allows backward compatibility with old files, but use with extreme caution, as this might hide a fundamental issue with your code. run_info_key : str, default 'run_info' Name of the data product which contains the run info of the event allow_missing : bool, default False If `True`, allows missing entries in the entry or event list keep_open : bool, default True If `True`, keep one read-only HDF5 handle open per file and per process. This avoids reopening files for every event access. If `False`, open and close the file on each `get` call. swmr : bool, default False If `True`, open files in HDF5 single-writer/multiple-reader mode. This is only relevant when reading files produced by a writer that was configured for SWMR-safe operation. ignore_incomplete : bool, default False If `True`, allow opening files marked as incomplete. By default, files with an explicit `info.attrs["complete"] = False` marker are rejected. """ # Process the list of files self.process_file_paths(file_keys, file_list, limit_num_files, max_print_files) self.keep_open = keep_open self.swmr = swmr self.ignore_incomplete = ignore_incomplete self._handle_pid: int | None = None self._file_handles: dict[int, h5py.File] = {} # If an entry list is requested based on run/subrun/event ID, create map if run_event_list is not None or skip_run_event_list is not None: create_run_map = True # Loop over the input files, build a map from index to file ID file_index, run_info = [], [] self.num_entries = 0 self.file_offsets = np.empty(len(self.file_paths), dtype=np.int64) for i, path in enumerate(self.file_paths): with h5py.File(path, "r") as in_file: # Check that there are events in the file assert "events" in in_file, "File does not contain an event tree" if ( "info" in in_file and "complete" in in_file["info"].attrs and not in_file["info"].attrs["complete"] and not self.ignore_incomplete ): raise RuntimeError( f"HDF5 file '{path}' is marked incomplete. " "Pass ignore_incomplete=True to override." ) events = in_file["events"] assert isinstance( events, h5py.Dataset ), "'events' is not a dataset in the HDF5 file." # If requested, register the (run, subrun, event) information if create_run_map: assert ( run_info_key in in_file ), f"Must provide {run_info_key} to create run map" info = in_file[run_info_key] assert isinstance( info, h5py.Dataset ), f"{run_info_key} is not a dataset in the HDF5 file." assert all( k in info.dtype.names for k in ["run", "subrun", "event"] ), f"{run_info_key} dataset missing required fields." for r, s, e in zip(info["run"], info["subrun"], info["event"]): run_info.append((r, s, e)) # Update the total number of entries num_entries = len(events) file_index.append(i * np.ones(num_entries, dtype=np.int64)) self.file_offsets[i] = self.num_entries self.num_entries += num_entries # Dump the number of entries to load logger.info("Total number of entries in the file(s): %d\n", self.num_entries) # Concatenate the file indexes into one, set run info if needed self.file_index = np.concatenate(file_index) self.run_info = run_info if create_run_map else None # Process the run information self.process_run_info() # Process the entry list self.process_entry_list( n_entry, n_skip, entry_list, skip_entry_list, run_event_list, skip_run_event_list, allow_missing, ) # Store other attributes self.build_classes = build_classes self.skip_unknown_attrs = skip_unknown_attrs # Process the configuration used to produce the HDF5 file self.cfg = self.process_cfg() # Process the SPINE version used to produced the HDF5 file self.version = self.process_version()
[docs] def close(self) -> None: """Close any persistent HDF5 handles owned by this reader. This only affects handles cached in the current process. It is safe to call repeatedly. """ for handle in getattr(self, "_file_handles", {}).values(): try: handle.close() except Exception: pass self._file_handles = {} self._handle_pid = None
def __del__(self) -> None: """Best-effort cleanup of persistent read handles on object teardown.""" self.close() def _check_handle_pid(self) -> None: """Ensure cached handles belong to the current process. Reader instances may be copied into worker processes by data-loading frameworks. When that happens, inherited file handles must not be reused. This method drops any cached handles on PID changes and lets the caller reopen them lazily in the new process. """ current_pid = _get_reader_pid() if self._handle_pid is None: self._handle_pid = current_pid return if self._handle_pid != current_pid: self.close() self._handle_pid = current_pid def _open_file(self, file_idx: int) -> tuple[h5py.File, bool]: """Return a readable HDF5 handle for one input file. Parameters ---------- file_idx : int Position of the target file in `self.file_paths` Returns ------- tuple[h5py.File, bool] The opened HDF5 file handle and a flag indicating whether the caller is responsible for closing it. The flag is `True` only when `keep_open=False`. """ if not self.keep_open: return h5py.File(self.file_paths[file_idx], "r", swmr=self.swmr), True self._check_handle_pid() handle = self._file_handles.get(file_idx) if handle is None or not handle.id.valid: handle = h5py.File(self.file_paths[file_idx], "r", swmr=self.swmr) self._file_handles[file_idx] = handle return handle, False
[docs] def process_cfg(self) -> dict[str, Any] | None: """Fetches the SPINE configuration used to produce the HDF5 file. Returns ------- dict Configuration dictionary """ # Fetch the string-form configuration with h5py.File(self.file_paths[0], "r") as in_file: assert "info" in in_file, "HDF5 file missing 'info' group." assert ( "cfg" in in_file["info"].attrs ), "HDF5 file 'info' group missing 'cfg' attribute." cfg_str = in_file["info"].attrs["cfg"] # Attempt to parse it (need try for now for SPINE versions < v0.4.0) try: assert isinstance(cfg_str, str), "'cfg' attribute is not a string." cfg = yaml.safe_load(cfg_str) except ParserError: warn( "Parsing configuration failed, returning None for SPINE versions < v0.4.0" ) return None return cfg
[docs] def process_version(self) -> str: """Returns the SPINE release version used to produce the HDF5 file. Returns ------- str SPINE release tag """ # Fetch the string-form configuration with h5py.File(self.file_paths[0], "r") as in_file: assert "info" in in_file, "HDF5 file missing 'info' group." assert ( "version" in in_file["info"].attrs ), "HDF5 file 'info' group missing 'version' attribute." version = in_file["info"].attrs["version"] assert isinstance(version, str), "'version' attribute is not a string." return version
[docs] def get(self, idx: int) -> dict[str, Any]: """Returns a specific entry in the file. Parameters ---------- idx : int Integer entry ID to access Returns ------- data : dict Ditionary of data products corresponding to one event """ # Get the appropriate entry index if idx < 0 or idx >= len(self): raise IndexError( f"Index {idx} out of bounds for dataset of size {len(self)}." ) file_idx = self.get_file_index(idx) entry_idx = self.get_file_entry_index(idx) # Use the event tree to find out what needs to be loaded data = {"file_index": file_idx, "file_entry_index": entry_idx} data.update(self.get_source_provenance(file_idx, entry_idx)) in_file, should_close = self._open_file(file_idx) try: events = in_file["events"] assert isinstance( events, h5py.Dataset ), "'events' is not a dataset in the HDF5 file." event = events[entry_idx] names = getattr(getattr(event, "dtype", None), "names", None) if names is not None: for key in names: self.load_key(in_file, event, data, key) else: raise ValueError("Event entry does not have named fields.") finally: if should_close: in_file.close() # Use the global index, not the one read from file data["index"] = idx return data
[docs] @staticmethod def resolve_object_class(class_name: str, array: np.ndarray) -> type: """Resolve an HDF5 object class name to the concrete SPINE class. This keeps backward-compatibility quirks localized in the reader. In particular, older HDF5 files stored image metadata with ``class_name="Meta"``. Newer files store the explicit ``ImageMeta2D`` / ``ImageMeta3D`` class names instead. Parameters ---------- class_name : str Class name stored in the HDF5 dataset metadata array : np.ndarray Structured array slice containing the serialized objects Returns ------- type Concrete SPINE data class to reconstruct """ if class_name != "Meta": return getattr(spine.data, class_name) from spine.data.larcv.meta import ImageMeta2D, ImageMeta3D names = getattr(array.dtype, "names", None) assert names is not None and "count" in names, ( "Legacy HDF5 class_name='Meta' requires a structured dtype " "with a 'count' field." ) sample = array[0] if len(array) else None if sample is None: return ImageMeta3D dim = len(sample["count"]) if dim == 2: return ImageMeta2D if dim == 3: return ImageMeta3D raise ValueError( f"Unsupported legacy Meta dimensionality: {dim}. Expected 2 or 3." )
[docs] def load_key( self, in_file: h5py.File, event: dict[str, Any], data: dict[str, Any], key: str, ) -> None: """Fetch a specific key for a specific event. Parameters ---------- in_file : h5py.File HDF5 file instance event : dict Dictionary of objects that make up one event data : dict Dictionary of data products corresponding to one event key: str Name of the dataset in the entry """ # The event-level information is a region reference: fetch it region_ref = event[key] dataset = in_file[key] if isinstance(dataset, h5py.Dataset): names = getattr(getattr(dataset, "dtype", None), "names", None) if not names: # If the reference points at a simple dataset, return data[key] = dataset[region_ref] if dataset.attrs["scalar"]: data[key] = data[key][0] if len(dataset.shape) > 1: data[key] = data[key].reshape(-1, dataset.shape[1]) else: # If the dataset has multiple attributes, it contains an object. # Start by fetching the appropriate class to rebuild array = dataset[region_ref] class_name = dataset.attrs["class_name"] assert isinstance( class_name, str ), "Dataset missing 'class_name' attribute." obj_class = self.resolve_object_class(class_name, array) # If needed, get the list of recognized attributes known_attrs = [] if self.skip_unknown_attrs: known_attrs = [f.name for f in fields(obj_class)] # Load the object names = array.dtype.names data[key] = [] for i, el in enumerate(array): # Fetch the list of key/value pairs, filter if requested if self.skip_unknown_attrs: obj_dict = {} for i, k in enumerate(names): if k in known_attrs: obj_dict[k] = el[i] else: obj_dict = dict(zip(names, el)) # Rebuild an instance of the object class, if requested if self.build_classes: data[key].append(obj_class.from_dict(obj_dict)) else: data[key].append(obj_dict) if in_file[key].attrs["scalar"]: data[key] = data[key][0] elif isinstance(dataset, h5py.Group): # If the reference points at a group, unpack index = dataset["index"] assert isinstance(index, h5py.Dataset), "Dataset 'index' is missing." el_refs = index[region_ref].flatten() if len(index.shape) == 1: elements = dataset["elements"] assert isinstance( elements, h5py.Dataset ), "Dataset 'elements' is missing." ret = np.empty(len(el_refs), dtype=object) ret[:] = [elements[r] for r in el_refs] if len(elements.shape) > 1: for i in range(len(el_refs)): ret[i] = ret[i].reshape(-1, elements.shape[1]) else: elements = [dataset[f"element_{i}"] for i in range(len(el_refs))] ret = [] for i, el in enumerate(elements): assert isinstance( el, h5py.Dataset ), f"Dataset 'element_{i}' is missing." ret.append(el[el_refs[i]]) if len(el.shape) > 1: ret[i] = ret[i].reshape(-1, el.shape[1]) data[key] = ret else: raise ValueError(f"Dataset for key '{key}' is neither a group nor dataset.")
def _get_reader_pid() -> int: """Return the current process ID for HDF5 handle ownership checks.""" return os.getpid()