"""Contains a reader class dedicated to loading data from HDF5 files."""
import os
from dataclasses import fields
from typing import Any
from warnings import warn
import h5py
import numpy as np
import yaml
from yaml.parser import ParserError
import spine.data
from spine.utils.logger import logger
from .base import ReaderBase
__all__ = ["HDF5Reader"]
[docs]
class HDF5Reader(ReaderBase):
"""Class which reads information stored in HDF5 files.
This class inherits from the :class:`ReaderBase` class. It provides
methods to load HDF5 files and extract their data products. The files
must be structured as follows:
- An `events` dataset with all the region references
- One dataset per data product corresponding to each region reference in
the `events` dataset
"""
name: str = "hdf5"
[docs]
def __init__(
self,
file_keys: str | list[str] | None = None,
file_list: str | None = None,
limit_num_files: int | None = None,
max_print_files: int = 10,
n_entry: int | None = None,
n_skip: int | None = None,
entry_list: list[int] | None = None,
skip_entry_list: list[int] | None = None,
run_event_list: list[list[int]] | None = None,
skip_run_event_list: list[list[int]] | None = None,
create_run_map: bool = False,
build_classes: bool = True,
skip_unknown_attrs: bool = False,
run_info_key: str = "run_info",
allow_missing: bool = False,
keep_open: bool = True,
swmr: bool = False,
ignore_incomplete: bool = False,
) -> None:
"""Initalize the HDF5 file reader.
Parameters
----------
file_keys : str or list[str], optional
Path or list of paths to the HDF5 files to be read
file_list : str, optional
Path to a text file containing a list of file paths to be read
limit_num_files : int, optional
Integer limiting number of files to be taken per data directory
max_print_files : int, default 10
Maximum number of loaded file names to be printed
n_entry : int, optional
Maximum number of entries to load
n_skip : int, optional
Number of entries to skip at the beginning
entry_list : list[int], optional
List of integer entry IDs to add to the index
skip_entry_list : list[int], optional
List of integer entry IDs to skip from the index
run_event_list : list[list[int]], optional
List of (run, subrun, event) triplets to add to the index
skip_run_event_list : list[list[int]], optional
List of (run, subrun, event) triplets to skip from the index
create_run_map : bool, default False
Initialize a map between (run, subrun, event) triplets and entries.
For large files, this can be quite expensive (must load every entry).
build_classes : bool, default True
If the stored object is a class, build it back
skip_unknown_attrs : bool, default False
If `True`, allow a loaded object to have unrecognized attributes.
This allows backward compatibility with old files, but use with
extreme caution, as this might hide a fundamental issue with your code.
run_info_key : str, default 'run_info'
Name of the data product which contains the run info of the event
allow_missing : bool, default False
If `True`, allows missing entries in the entry or event list
keep_open : bool, default True
If `True`, keep one read-only HDF5 handle open per file and per
process. This avoids reopening files for every event access. If
`False`, open and close the file on each `get` call.
swmr : bool, default False
If `True`, open files in HDF5 single-writer/multiple-reader mode.
This is only relevant when reading files produced by a writer that
was configured for SWMR-safe operation.
ignore_incomplete : bool, default False
If `True`, allow opening files marked as incomplete. By default,
files with an explicit `info.attrs["complete"] = False` marker are
rejected.
"""
# Process the list of files
self.process_file_paths(file_keys, file_list, limit_num_files, max_print_files)
self.keep_open = keep_open
self.swmr = swmr
self.ignore_incomplete = ignore_incomplete
self._handle_pid: int | None = None
self._file_handles: dict[int, h5py.File] = {}
# If an entry list is requested based on run/subrun/event ID, create map
if run_event_list is not None or skip_run_event_list is not None:
create_run_map = True
# Loop over the input files, build a map from index to file ID
file_index, run_info = [], []
self.num_entries = 0
self.file_offsets = np.empty(len(self.file_paths), dtype=np.int64)
for i, path in enumerate(self.file_paths):
with h5py.File(path, "r") as in_file:
# Check that there are events in the file
assert "events" in in_file, "File does not contain an event tree"
if (
"info" in in_file
and "complete" in in_file["info"].attrs
and not in_file["info"].attrs["complete"]
and not self.ignore_incomplete
):
raise RuntimeError(
f"HDF5 file '{path}' is marked incomplete. "
"Pass ignore_incomplete=True to override."
)
events = in_file["events"]
assert isinstance(
events, h5py.Dataset
), "'events' is not a dataset in the HDF5 file."
# If requested, register the (run, subrun, event) information
if create_run_map:
assert (
run_info_key in in_file
), f"Must provide {run_info_key} to create run map"
info = in_file[run_info_key]
assert isinstance(
info, h5py.Dataset
), f"{run_info_key} is not a dataset in the HDF5 file."
assert all(
k in info.dtype.names for k in ["run", "subrun", "event"]
), f"{run_info_key} dataset missing required fields."
for r, s, e in zip(info["run"], info["subrun"], info["event"]):
run_info.append((r, s, e))
# Update the total number of entries
num_entries = len(events)
file_index.append(i * np.ones(num_entries, dtype=np.int64))
self.file_offsets[i] = self.num_entries
self.num_entries += num_entries
# Dump the number of entries to load
logger.info("Total number of entries in the file(s): %d\n", self.num_entries)
# Concatenate the file indexes into one, set run info if needed
self.file_index = np.concatenate(file_index)
self.run_info = run_info if create_run_map else None
# Process the run information
self.process_run_info()
# Process the entry list
self.process_entry_list(
n_entry,
n_skip,
entry_list,
skip_entry_list,
run_event_list,
skip_run_event_list,
allow_missing,
)
# Store other attributes
self.build_classes = build_classes
self.skip_unknown_attrs = skip_unknown_attrs
# Process the configuration used to produce the HDF5 file
self.cfg = self.process_cfg()
# Process the SPINE version used to produced the HDF5 file
self.version = self.process_version()
[docs]
def close(self) -> None:
"""Close any persistent HDF5 handles owned by this reader.
This only affects handles cached in the current process. It is safe to
call repeatedly.
"""
for handle in getattr(self, "_file_handles", {}).values():
try:
handle.close()
except Exception:
pass
self._file_handles = {}
self._handle_pid = None
def __del__(self) -> None:
"""Best-effort cleanup of persistent read handles on object teardown."""
self.close()
def _check_handle_pid(self) -> None:
"""Ensure cached handles belong to the current process.
Reader instances may be copied into worker processes by data-loading
frameworks. When that happens, inherited file handles must not be
reused. This method drops any cached handles on PID changes and lets
the caller reopen them lazily in the new process.
"""
current_pid = _get_reader_pid()
if self._handle_pid is None:
self._handle_pid = current_pid
return
if self._handle_pid != current_pid:
self.close()
self._handle_pid = current_pid
def _open_file(self, file_idx: int) -> tuple[h5py.File, bool]:
"""Return a readable HDF5 handle for one input file.
Parameters
----------
file_idx : int
Position of the target file in `self.file_paths`
Returns
-------
tuple[h5py.File, bool]
The opened HDF5 file handle and a flag indicating whether the
caller is responsible for closing it. The flag is `True` only when
`keep_open=False`.
"""
if not self.keep_open:
return h5py.File(self.file_paths[file_idx], "r", swmr=self.swmr), True
self._check_handle_pid()
handle = self._file_handles.get(file_idx)
if handle is None or not handle.id.valid:
handle = h5py.File(self.file_paths[file_idx], "r", swmr=self.swmr)
self._file_handles[file_idx] = handle
return handle, False
[docs]
def process_cfg(self) -> dict[str, Any] | None:
"""Fetches the SPINE configuration used to produce the HDF5 file.
Returns
-------
dict
Configuration dictionary
"""
# Fetch the string-form configuration
with h5py.File(self.file_paths[0], "r") as in_file:
assert "info" in in_file, "HDF5 file missing 'info' group."
assert (
"cfg" in in_file["info"].attrs
), "HDF5 file 'info' group missing 'cfg' attribute."
cfg_str = in_file["info"].attrs["cfg"]
# Attempt to parse it (need try for now for SPINE versions < v0.4.0)
try:
assert isinstance(cfg_str, str), "'cfg' attribute is not a string."
cfg = yaml.safe_load(cfg_str)
except ParserError:
warn(
"Parsing configuration failed, returning None for SPINE versions < v0.4.0"
)
return None
return cfg
[docs]
def process_version(self) -> str:
"""Returns the SPINE release version used to produce the HDF5 file.
Returns
-------
str
SPINE release tag
"""
# Fetch the string-form configuration
with h5py.File(self.file_paths[0], "r") as in_file:
assert "info" in in_file, "HDF5 file missing 'info' group."
assert (
"version" in in_file["info"].attrs
), "HDF5 file 'info' group missing 'version' attribute."
version = in_file["info"].attrs["version"]
assert isinstance(version, str), "'version' attribute is not a string."
return version
[docs]
def get(self, idx: int) -> dict[str, Any]:
"""Returns a specific entry in the file.
Parameters
----------
idx : int
Integer entry ID to access
Returns
-------
data : dict
Ditionary of data products corresponding to one event
"""
# Get the appropriate entry index
if idx < 0 or idx >= len(self):
raise IndexError(
f"Index {idx} out of bounds for dataset of size {len(self)}."
)
file_idx = self.get_file_index(idx)
entry_idx = self.get_file_entry_index(idx)
# Use the event tree to find out what needs to be loaded
data = {"file_index": file_idx, "file_entry_index": entry_idx}
data.update(self.get_source_provenance(file_idx, entry_idx))
in_file, should_close = self._open_file(file_idx)
try:
events = in_file["events"]
assert isinstance(
events, h5py.Dataset
), "'events' is not a dataset in the HDF5 file."
event = events[entry_idx]
names = getattr(getattr(event, "dtype", None), "names", None)
if names is not None:
for key in names:
self.load_key(in_file, event, data, key)
else:
raise ValueError("Event entry does not have named fields.")
finally:
if should_close:
in_file.close()
# Use the global index, not the one read from file
data["index"] = idx
return data
[docs]
@staticmethod
def resolve_object_class(class_name: str, array: np.ndarray) -> type:
"""Resolve an HDF5 object class name to the concrete SPINE class.
This keeps backward-compatibility quirks localized in the reader.
In particular, older HDF5 files stored image metadata with
``class_name="Meta"``. Newer files store the explicit
``ImageMeta2D`` / ``ImageMeta3D`` class names instead.
Parameters
----------
class_name : str
Class name stored in the HDF5 dataset metadata
array : np.ndarray
Structured array slice containing the serialized objects
Returns
-------
type
Concrete SPINE data class to reconstruct
"""
if class_name != "Meta":
return getattr(spine.data, class_name)
from spine.data.larcv.meta import ImageMeta2D, ImageMeta3D
names = getattr(array.dtype, "names", None)
assert names is not None and "count" in names, (
"Legacy HDF5 class_name='Meta' requires a structured dtype "
"with a 'count' field."
)
sample = array[0] if len(array) else None
if sample is None:
return ImageMeta3D
dim = len(sample["count"])
if dim == 2:
return ImageMeta2D
if dim == 3:
return ImageMeta3D
raise ValueError(
f"Unsupported legacy Meta dimensionality: {dim}. Expected 2 or 3."
)
[docs]
def load_key(
self,
in_file: h5py.File,
event: dict[str, Any],
data: dict[str, Any],
key: str,
) -> None:
"""Fetch a specific key for a specific event.
Parameters
----------
in_file : h5py.File
HDF5 file instance
event : dict
Dictionary of objects that make up one event
data : dict
Dictionary of data products corresponding to one event
key: str
Name of the dataset in the entry
"""
# The event-level information is a region reference: fetch it
region_ref = event[key]
dataset = in_file[key]
if isinstance(dataset, h5py.Dataset):
names = getattr(getattr(dataset, "dtype", None), "names", None)
if not names:
# If the reference points at a simple dataset, return
data[key] = dataset[region_ref]
if dataset.attrs["scalar"]:
data[key] = data[key][0]
if len(dataset.shape) > 1:
data[key] = data[key].reshape(-1, dataset.shape[1])
else:
# If the dataset has multiple attributes, it contains an object.
# Start by fetching the appropriate class to rebuild
array = dataset[region_ref]
class_name = dataset.attrs["class_name"]
assert isinstance(
class_name, str
), "Dataset missing 'class_name' attribute."
obj_class = self.resolve_object_class(class_name, array)
# If needed, get the list of recognized attributes
known_attrs = []
if self.skip_unknown_attrs:
known_attrs = [f.name for f in fields(obj_class)]
# Load the object
names = array.dtype.names
data[key] = []
for i, el in enumerate(array):
# Fetch the list of key/value pairs, filter if requested
if self.skip_unknown_attrs:
obj_dict = {}
for i, k in enumerate(names):
if k in known_attrs:
obj_dict[k] = el[i]
else:
obj_dict = dict(zip(names, el))
# Rebuild an instance of the object class, if requested
if self.build_classes:
data[key].append(obj_class.from_dict(obj_dict))
else:
data[key].append(obj_dict)
if in_file[key].attrs["scalar"]:
data[key] = data[key][0]
elif isinstance(dataset, h5py.Group):
# If the reference points at a group, unpack
index = dataset["index"]
assert isinstance(index, h5py.Dataset), "Dataset 'index' is missing."
el_refs = index[region_ref].flatten()
if len(index.shape) == 1:
elements = dataset["elements"]
assert isinstance(
elements, h5py.Dataset
), "Dataset 'elements' is missing."
ret = np.empty(len(el_refs), dtype=object)
ret[:] = [elements[r] for r in el_refs]
if len(elements.shape) > 1:
for i in range(len(el_refs)):
ret[i] = ret[i].reshape(-1, elements.shape[1])
else:
elements = [dataset[f"element_{i}"] for i in range(len(el_refs))]
ret = []
for i, el in enumerate(elements):
assert isinstance(
el, h5py.Dataset
), f"Dataset 'element_{i}' is missing."
ret.append(el[el_refs[i]])
if len(el.shape) > 1:
ret[i] = ret[i].reshape(-1, el.shape[1])
data[key] = ret
else:
raise ValueError(f"Dataset for key '{key}' is neither a group nor dataset.")
def _get_reader_pid() -> int:
"""Return the current process ID for HDF5 handle ownership checks."""
return os.getpid()