"""Module that contains parsers that do not fit in other categories.
Contains the following parsers:
- :class:`LArCVMetaParser`
- :class:`LArCVRunInfoParser`
- :class:`LArCVFlashParser`
- :class:`LArCVCRTHitParser`
- :class:`LArCVTriggerParser`
"""
from __future__ import annotations
from typing import Any
import numpy as np
from spine.data import CRTHit, Flash, Meta, RunInfo, Trigger
from spine.geo import GeoManager
from spine.utils.conditional import larcv
from spine.utils.optical import FlashMerger
from ..base import ParserBase
from ..data import ParserObjectList
__all__ = [
"LArCVMetaParser",
"LArCVRunInfoParser",
"LArCVFlashParser",
"LArCVCRTHitParser",
"LArCVTriggerParser",
]
[docs]
class LArCVRunInfoParser(ParserBase):
"""Parse run information (run, subrun, event number).
.. code-block. yaml
schema:
run_info:
parser: run_info
sparse_event: sparse3d_pcluster
"""
# Name of the parser (as specified in the configuration)
name = "run_info"
# Type of object(s) returned by the parser
returns = "object"
# Overlay strategy for the objects returned by the parser
overlay = "cat"
def __call__(self, trees: dict[str, Any]) -> RunInfo:
"""Parse one entry.
Parameters
----------
trees : dict
Dictionary which maps each data product name to a LArCV object
"""
return self.process(**self.get_input_data(trees))
[docs]
def process(
self, sparse_event: Any | None = None, cluster_event: Any | None = None
) -> RunInfo:
"""Fetches the run information from one object that has it.
Parameters
----------
sparse_event : Union[larcv.EventSparseTensor2D
larcv.EventSparseTensor3D], optional
Tensor which contains the run information as an attribute
cluster_event : Union[larcv.EventClusterPixel2D,
larcv.EventClusterVoxel3D], optional
Cluster which contains the run information as an attribute
Returns
-------
RunInfo
Run information object
"""
# Check on the input, pick a source for the run information
if not (sparse_event is not None) ^ (cluster_event is not None):
raise ValueError("Must specify either `sparse_event` or `cluster_event`.")
ref_event = sparse_event if sparse_event is not None else cluster_event
return RunInfo.from_larcv(ref_event)
[docs]
class LArCVFlashParser(ParserBase):
"""Copy construct Flash and return an array of `Flash`.
This parser also takes care of flashes that have been split between their
respective optical volumes, provided a `flash_event_list`. This parser
assumes that the trees are provided in order of the volume ID they
correspond to.
.. code-block. yaml
schema:
flashes:
parser: flash
flash_event_list:
- flash_cryoE
- flash_cryoW
"""
# Name of the parser (as specified in the configuration)
name = "flash"
# Alternative allowed names of the parser
aliases = ("opflash",)
# Type of object(s) returned by the parser
returns = "object_list"
def __init__(self, merge: dict[str, Any] | None = None, **kwargs: Any) -> None:
"""Initialize the flash parser.
Parameters
----------
merge : dict, optional
Flash merging configuration
**kwargs : dict, optional
data product arguments to be passed to the `process` function
"""
# Initialize the parent class
super().__init__(**kwargs)
# Initialize the flash merging class, if needed
self.merger = None
if merge is not None:
self.merger = FlashMerger(**merge)
# Fetch the geometry information to resize the flash objects
geo = GeoManager.get_instance()
if geo.optical is not None:
self.num_channels_per_volume = geo.optical.num_channels_per_volume
else:
raise ValueError(
"Optical geometry not found, required to resize the flash objects."
)
def __call__(self, trees: dict[str, Any]) -> ParserObjectList:
"""Parse one entry.
Parameters
----------
trees : dict
Dictionary which maps each data product name to a LArCV object
"""
return self.process(**self.get_input_data(trees))
[docs]
def process(
self,
flash_event: Any | None = None,
flash_event_list: list[Any] | None = None,
) -> ParserObjectList:
"""Fetches the list of optical flashes.
Parameters
-------------
flash_event : larcv.EventFlash, optional
Optical flash event which contains a list of flash objects
flash_event_list : List[larcv.EventFlash], optional
List of optical flash events, each a list of flash objects
Returns
-------
List[Flash]
List of optical flash objects
"""
# Check on the input
if not (flash_event is not None) ^ (flash_event_list is not None):
raise ValueError("Must specify either `flash_event` or `flash_event_list`.")
# Parse flash objects
flashes = []
if flash_event is not None:
# If there is a single flash event, parse it as is
flash_list = flash_event.as_vector()
flashes = [Flash.from_larcv(larcv.Flash(f)) for f in flash_list]
elif flash_event_list is not None:
# Otherwise, set the volume ID of the flash to the source index
# and count the flash index from 0 to the largest number
idx = 0
for volume_id, flash_event in enumerate(flash_event_list):
if flash_event is None:
raise ValueError(f"Flash event at index {volume_id} is None.")
for f in flash_event.as_vector():
# Cast and update attributes
flash = Flash.from_larcv(f)
flash.id = idx
flash.volume_id = volume_id
# Append, increment counter
flashes.append(flash)
idx += 1
# Resize the flash objects to match the geometry
for flash in flashes:
pe_per_ch = np.zeros(
self.num_channels_per_volume, dtype=flash.pe_per_ch.dtype
)
if len(flash.pe_per_ch) > len(pe_per_ch):
# If the flash spans > 1 optical volume, reshape
lower = flash.volume_id * len(pe_per_ch)
upper = (flash.volume_id + 1) * len(pe_per_ch)
pe_per_ch = flash.pe_per_ch[lower:upper]
elif len(flash.pe_per_ch) < len(pe_per_ch):
# Pad if it does not fill the full length
pe_per_ch[: len(flash.pe_per_ch)] = flash.pe_per_ch
else:
pe_per_ch = flash.pe_per_ch
flash.pe_per_ch = pe_per_ch
# If requested, merge flashes which match in time
if self.merger is not None:
flashes, _ = self.merger(flashes)
return ParserObjectList(flashes, Flash())
[docs]
class LArCVCRTHitParser(ParserBase):
"""Copy construct CRTHit and return an array of `CRTHit`.
.. code-block. yaml
schema:
crthits:
parser: crthit
crthit_event: crthit_crthit
"""
# Name of the parser (as specified in the configuration)
name = "crthit"
# Type of object(s) returned by the parser
returns = "object_list"
def __call__(self, trees: dict[str, Any]) -> ParserObjectList:
"""Parse one entry.
Parameters
----------
trees : dict
Dictionary which maps each data product name to a LArCV object
"""
return self.process(**self.get_input_data(trees))
[docs]
def process(self, crthit_event: Any) -> ParserObjectList:
"""Fetches the list of CRT hits.
Parameters
----------
crthit_event : larcv.CRTHitEvent
Returns
-------
List[CRTHit]
List of CRT hit objects
"""
# Output as a list of LArCV CRT hit objects
crthit_list = crthit_event.as_vector()
crthits = [CRTHit.from_larcv(larcv.CRTHit(c)) for c in crthit_list]
return ParserObjectList(crthits, CRTHit())
[docs]
class LArCVTriggerParser(ParserBase):
"""Copy construct Trigger and return a `Trigger`.
.. code-block. yaml
schema:
trigger:
parser: trigger
trigger_event: trigger_base
"""
# Name of the parser (as specified in the configuration)
name = "trigger"
# Type of object(s) returned by the parser
returns = "object"
# Overlay strategy for the objects returned by the parser
overlay = "cat"
def __call__(self, trees: dict[str, Any]) -> Trigger:
"""Parse one entry.
Parameters
----------
trees : dict
Dictionary which maps each data product name to a LArCV object
"""
return self.process(**self.get_input_data(trees))
[docs]
def process(self, trigger_event: Any) -> Trigger:
"""Fetches the trigger information.
Parameters
----------
trigger_event : larcv.TriggerEvent
Returns
-------
Trigger
Trigger object
"""
# Output as a trigger objects
trigger = Trigger.from_larcv(larcv.Trigger(trigger_event))
return trigger