Source code for spine.io.parse.larcv.particle

"""Module that contains all parsers related to LArCV particle data.

Contains the following parsers:
- :class:`LArCVParticleParser`
- :class:`LArCVNeutrinoParser`
- :class:`LArCVParticlePointParser`
- :class:`LArCVParticleCoordinateParser`
- :class:`LArCVParticleGraphParser`
- :class:`LArCVSingleParticlePIDParser`
- :class:`LArCVSingleParticleEnergyParser`
"""

from __future__ import annotations

from typing import Any

import numpy as np

from spine.constants import (
    INVAL_ID,
    PDG_TO_PID,
    PID_MASSES,
    PPN_LPART_COL,
    TRACK_SHP,
    VALUE_COL,
    NuInteractionScheme,
)
from spine.constants.factory import enum_factory
from spine.data import Meta, Neutrino, Particle
from spine.utils.conditional import larcv
from spine.utils.gnn.network import filter_invalid_nodes
from spine.utils.particles import process_particles
from spine.utils.ppn import get_ppn_labels, get_vertex_labels, image_coordinates

from ..base import ParserBase
from ..data import ParserEdgeIndex, ParserObjectList, ParserTensor

__all__ = [
    "LArCVParticleParser",
    "LArCVNeutrinoParser",
    "LArCVParticlePointParser",
    "LArCVParticleCoordinateParser",
    "LArCVVertexPointParser",
    "LArCVParticleGraphParser",
    "LArCVSingleParticlePIDParser",
    "LArCVSingleParticleEnergyParser",
]


[docs] class LArCVParticleParser(ParserBase): """Class which loads larcv.Particle objects to local Particle ones. .. code-block. yaml schema: particles: parser: particle particle_event: particle_pcluster cluster_event: cluster3d_pcluster asis: False pixel_coordinates: True post_process: True """ # Name of the parser (as specified in the configuration) name = "particle" # Type of object(s) returned by the parser returns = "object_list" def __init__( self, pixel_coordinates: bool = True, post_process: bool = True, skip_empty: bool = False, asis: bool = False, **kwargs: Any, ) -> None: """Initialize the parser. Parameters ---------- pixel_coordinates : bool, default True If set to `True`, the parser rescales the truth positions (start, end, etc.) to voxel coordinates post_process : bool, default True Processes particles to add/correct missing attributes skip_empty : bool, default False Do not read the truth information corresponding to empty particles. This saves considerable read time when there are a lot of irrelevant particle stored in the LArCV file. It puts an empty `Particle` object in place of empty particles, to preserve list size and typing. asis : bool, default False Load the objects as larcv objects, do not build local data class **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__(**kwargs) # Store the revelant attributes self.pixel_coordinates = pixel_coordinates self.post_process = post_process self.skip_empty = skip_empty self.asis = asis def __call__(self, trees: dict[str, Any]) -> ParserObjectList: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process( self, particle_event: Any, sparse_event: Any | None = None, cluster_event: Any | None = None, particle_mpv_event: Any | None = None, neutrino_event: Any | None = None, ) -> ParserObjectList: """Fetch the list of true particle objects. Parameters ---------- particle_event : larcv.EventParticle Particle event which contains the list of true particles sparse_event : larcv.EventSparseTensor3D, optional Tensor which contains the metadata needed to convert the positions in voxel coordinates cluster_event : larcv.EventClusterVoxel3D, optional Cluster which contains the metadata needed to convert the positions in voxel coordinates particle_mpv_event : larcv.EventParticle, optional Particle event which contains the list of true MPV particles neutrino_event : larcv.EventNeutrino, optional Neutrino event which contains the list of true neutrinos Returns ------- List[Particle] List of true particle objects """ # If asis is true, return larcv objects particle_list = list(particle_event.as_vector()) if self.asis: if self.pixel_coordinates: raise ValueError( "If `asis` is True, `pixel_coordinates` must be False." ) if self.post_process: raise ValueError("If `asis` is True, `post_process` must be False.") if self.skip_empty: raise ValueError("If `asis` is True`, `skip_empty` must be False.") return ParserObjectList(particle_list, larcv.Particle()) # Convert to a list of particle objects particles = [] for p in particle_list: if not self.skip_empty or p.num_voxels() > 0 or p.id() == p.group_id(): particles.append(Particle.from_larcv(p)) else: particles.append(Particle()) # If requested, post-process the particle list if self.post_process: process_particles( particles, particle_event, particle_mpv_event, neutrino_event ) # If requested, convert the point positions to pixel coordinates if self.pixel_coordinates: # Fetch the metadata if not (sparse_event is not None) ^ (cluster_event is not None): raise ValueError( "Must provide either `sparse_event` or `cluster_event` to " "get the metadata and convert positions to voxel units." ) ref_event = sparse_event if sparse_event is not None else cluster_event assert ref_event is not None # Guaranteed by the check above meta = Meta.from_larcv(ref_event.meta()) # Convert all the relevant attributes for p in particles: if p.id > -1: p.to_px(meta) # Define the shifts to be applied to each index attribute num_particles = len(particles) if neutrino_event is not None: num_neutrinos = len(neutrino_event.as_vector()) else: nu_ids = np.array([part.nu_id for part in particles], dtype=int) num_neutrinos = np.max(nu_ids, initial=-1) + 1 index_shifts = {} for attr in Particle().index_attrs: index_shifts[attr] = num_particles if attr != "nu_id" else num_neutrinos # Return return ParserObjectList(particles, Particle(), index_shifts)
[docs] class LArCVNeutrinoParser(ParserBase): """Class which loads larcv.Neutrino objects to local Neutrino ones. .. code-block. yaml schema: neutrinos: parser: neutrino neutrino_event: neutrino_mpv cluster_event: cluster3d_pcluster pixel_coordinates: True asis: False """ # Name of the parser (as specified in the configuration) name = "neutrino" # Type of object(s) returned by the parser returns = "object_list" def __init__( self, pixel_coordinates: bool = True, asis: bool = False, interaction_scheme: int | str = NuInteractionScheme.LARSOFT, **kwargs: Any, ) -> None: """Initialize the parser. Parameters ---------- pixel_coordinates : bool, default True If set to `True`, the parser rescales the truth positions (start, end, etc.) to voxel coordinates asis : bool, default False Load the objects as larcv objects, do not build local data class interaction_scheme : str, int or NuInteractionScheme, default LARSOFT Convention used to interpret the neutrino interaction mode/type codes. **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__(**kwargs) # Store the revelant attributes self.pixel_coordinates = pixel_coordinates self.asis = asis if isinstance(interaction_scheme, str): self.interaction_scheme = enum_factory( "interaction_scheme", interaction_scheme ) else: self.interaction_scheme = int(interaction_scheme) def __call__(self, trees: dict[str, Any]) -> ParserObjectList: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process( self, neutrino_event: Any, sparse_event: Any | None = None, cluster_event: Any | None = None, ) -> ParserObjectList: """Fetch the list of true neutrino objects. Parameters ---------- neutrino_event : larcv.EventNeutrino Neutrino event which contains the list of true neutrinos sparse_event : larcv.EventSparseTensor3D, optional Tensor which contains the metadata needed to convert the positions in voxel coordinates cluster_event : larcv.EventClusterVoxel3D, optional Cluster which contains the metadata needed to convert the positions in voxel coordinates Returns ------- List[Neutrino] List of true neutrino objects """ # If asis is true, return larcv objects neutrino_list = list(neutrino_event.as_vector()) if self.asis: if self.pixel_coordinates: raise ValueError( "If `asis` is True, `pixel_coordinates` must be False." ) return ParserObjectList(neutrino_list, larcv.Neutrino()) # Convert to a list of neutrino objects neutrinos = [ Neutrino.from_larcv(n, interaction_scheme=self.interaction_scheme) for n in neutrino_list ] # If requested, convert the point positions to pixel coordinates if self.pixel_coordinates: # Fetch the metadata if not (sparse_event is not None) ^ (cluster_event is not None): raise ValueError( "Must provide either `sparse_event` or `cluster_event` to " "get the metadata and convert positions to voxel units." ) ref_event = sparse_event if sparse_event is not None else cluster_event assert ref_event is not None # Guaranteed by the check above meta = Meta.from_larcv(ref_event.meta()) # Convert all the relevant attributes for n in neutrinos: n.to_px(meta) # Define the shifts to be applied to each index attribute inter_ids = [n.interaction_id for n in neutrinos] num_neutrinos = len(neutrino_event.as_vector()) max_inter = np.max(inter_ids, initial=-1) + 1 index_shifts = {"id": num_neutrinos, "interaction_id": max_inter} return ParserObjectList(neutrinos, Neutrino(), index_shifts)
[docs] class LArCVParticlePointParser(ParserBase): """Class that retrieves the points of interests. It provides the coordinates of the end points, types and particle index. .. code-block. yaml schema: points: parser: particle_points particle_event: particle_pcluster sparse_event: sparse3d_pcluster include_point_tagging: True """ # Name of the parser (as specified in the configuration) name = "particle_points" # Type of object(s) returned by the parser returns = "tensor" def __init__(self, include_point_tagging: bool = True, **kwargs: Any) -> None: """Initialize the parser. Parameters ---------- include_point_tagging : bool, default True If `True`, includes start vs end point tagging **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__(**kwargs) # Store the revelant attributes self.include_point_tagging = include_point_tagging # Define the output columns containing indexes self.index_cols = np.array([PPN_LPART_COL]) def __call__(self, trees: dict[str, Any]) -> ParserTensor: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process( self, particle_event: Any, sparse_event: Any | None = None, cluster_event: Any | None = None, ) -> ParserTensor: """Fetch the list of label points of interest. Parameters ---------- particle_event : larcv.EventParticle Particle event which contains the list of true particles sparse_event : larcv.EventSparseTensor3D, optional Tensor which contains the metadata needed to convert the positions in voxel coordinates cluster_event : larcv.EventClusterVoxel3D, optional Cluster which contains the metadata needed to convert the positions in voxel coordinates Returns ------- np_voxels : np.ndarray (N, 3) array of [x, y, z] coordinates np_features : np.ndarray (N, 2/3) array of [point type, particle index(, end point tagging)] meta : Meta Metadata of the parsed image """ # Fetch the metadata if not (sparse_event is not None) ^ (cluster_event is not None): raise ValueError( "Must provide either `sparse_event` or `cluster_event` to " "get the metadata and convert positions to voxel units." ) ref_event = sparse_event if sparse_event is not None else cluster_event assert ref_event is not None # Guaranteed by the check above meta = ref_event.meta() # Get the point labels particle_v = particle_event.as_vector() point_labels = get_ppn_labels( particle_v, meta, self.ftype, include_point_tagging=self.include_point_tagging, ) return ParserTensor( coords=point_labels[:, :3], features=point_labels[:, 3:], meta=Meta.from_larcv(meta), index_cols=self.index_cols, index_shifts=np.array([particle_event.size()]), )
[docs] class LArCVParticleCoordinateParser(ParserBase): """Class that retrieves that end points of particles. It provides the coordinates of the end points, time and shape. .. code-block. yaml schema: coords: parser: particle_coordinates particle_event: particle_pcluster sparse_event: sparse3d_pcluster """ # Name of the parser (as specified in the configuration) name = "particle_coords" # Type of object(s) returned by the parser returns = "tensor" def __call__(self, trees: dict[str, Any]) -> ParserTensor: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process( self, particle_event: Any, sparse_event: Any | None = None, cluster_event: Any | None = None, ) -> ParserTensor: """Fetch the start/end point and time of each true particle. Parameters ---------- particle_event : larcv.EventParticle Particle event which contains the list of true particles sparse_event : larcv.EventSparseTensor3D, optional Tensor which contains the metadata needed to convert the positions in voxel coordinates cluster_event : larcv.EventClusterVoxel3D, optional Cluster which contains the metadata needed to convert the positions in voxel coordinates Returns ------- np_voxels : np.ndarray (N, 6) array of [x_s, y_s, z_s, x_e, y_e, z_e] start and end point coordinates np_features : np.ndarray (N, 2) array of [first_step_t, shape_id] meta : Meta Metadata of the parsed image """ # Fetch the metadata if not (sparse_event is not None) ^ (cluster_event is not None): raise ValueError( "Must provide either `sparse_event` or `cluster_event` to " "get the metadata and convert positions to voxel units." ) ref_event = sparse_event if sparse_event is not None else cluster_event assert ref_event is not None # Guaranteed by the check above meta = ref_event.meta() # Scale particle coordinates to image size particle_v = particle_event.as_vector() # Make features coord_labels = np.empty((len(particle_v), 8), dtype=self.ftype) for i, p in enumerate(particle_v): start_point = last_point = image_coordinates(meta, p.first_step()) if p.shape() == TRACK_SHP: # End point only meaningful for tracks last_point = image_coordinates(meta, p.last_step()) extra = [p.t(), p.shape()] coord_labels[i] = np.concatenate((start_point, last_point, extra)) return ParserTensor( coords=coord_labels[:, :6], features=coord_labels[:, 6:], meta=Meta.from_larcv(meta), )
[docs] class LArCVVertexPointParser(ParserBase): """Class that retrieves the vertices. It provides the coordinates of points where multiple particles originate: - If the `neutrino_event` is provided, it simply uses the coordinates of the neutrino interaction points. - If the `particle_event` is provided instead, it looks for ancestor point positions shared by at least two particles. .. code-block. yaml schema: vertices: parser: vertex_points particle_event: particle_pcluster #neutrino_event: neutrino_mpv sparse_event: sparse3d_pcluster include_point_tagging: True """ # Name of the parser (as specified in the configuration) name = "vertex_points" # Type of object(s) returned by the parser returns = "tensor" def __init__(self, **kwargs: Any) -> None: """Initialize the parser. Parameters ---------- **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__(**kwargs) # Define the output columns containing indexes self.index_cols = np.array([VALUE_COL]) def __call__(self, trees: dict[str, Any]) -> ParserTensor: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process( self, particle_event: Any | None = None, neutrino_event: Any | None = None, sparse_event: Any | None = None, cluster_event: Any | None = None, ) -> ParserTensor: """Fetch the list of label vertex points. Parameters ---------- particle_event : larcv.EventParticle Particle event which contains the list of true particles neutrino_event : larcv.EventNeutrino Neutrino event which contains the list of true neutrinos sparse_event : larcv.EventSparseTensor3D, optional Tensor which contains the metadata needed to convert the positions in voxel coordinates cluster_event : larcv.EventClusterVoxel3D, optional Cluster which contains the metadata needed to convert the positions in voxel coordinates Returns ------- np_voxels : np.ndarray (N, 3) array of [x, y, z] coordinates np_features : np.ndarray (N, 1) array of [vertex ID] meta : Meta Metadata of the parsed image """ # Check that only one source of vertex is provided if not (particle_event is not None) ^ (neutrino_event is not None): raise ValueError( "Must provide either `particle_event` or `sparse_event` to " "get the vertex points, not both." ) # Fetch the metadata if not (sparse_event is not None) ^ (cluster_event is not None): raise ValueError( "Must provide either `sparse_event` or `cluster_event` to " "get the metadata and convert positions to voxel units." ) ref_event = sparse_event if sparse_event is not None else cluster_event assert ref_event is not None # Guaranteed by the check above meta = ref_event.meta() # Get the vertex labels particle_v = particle_event.as_vector() if particle_event else None neutrino_v = neutrino_event.as_vector() if neutrino_event else None point_labels = get_vertex_labels(particle_v, neutrino_v, meta, self.ftype) # Get the index shift to apply index_shifts = np.max(point_labels[:, 0], keepdims=True, initial=-1) + 1 return ParserTensor( coords=point_labels[:, :3], features=point_labels[:, 3:], meta=Meta.from_larcv(meta), index_cols=self.index_cols, index_shifts=index_shifts, )
[docs] class LArCVParticleGraphParser(ParserBase): """Class that uses larcv.EventParticle to construct edges between particles (i.e. clusters). .. code-block. yaml schema: graph: parser: particle_graph particle_event: particle_pcluster cluster_event: cluster3d_pcluster include_fragment_edges: false """ # Name of the parser (as specified in the configuration) name = "particle_graph" # Type of object(s) returned by the parser returns = "tensor" def __init__(self, include_fragment_edges: bool = False, **kwargs: Any) -> None: """Initialize the parser. Parameters ---------- include_fragment_edges : bool, default False If `True`, includes edges which join particles in the same group **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__(**kwargs) # Store the revelant attributes self.include_fragment_edges = include_fragment_edges def __call__(self, trees: dict[str, Any]) -> ParserEdgeIndex: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process( self, particle_event: Any, cluster_event: Any | None = None ) -> ParserEdgeIndex: """Fetch the parentage connections from the true particle list. Configuration ------------- particle_event : larcv.EventParticle Particle event which contains the list of true particles cluster_event : larcv.EventClusterVoxel3D, optional Cluster used to check if particles have 0 pixel in the image. If so, the edges to those clusters are removed and the broken parantage is subsequently patched. Returns ------- ParserEdgeIndex Parsed edge-index payload with shape ``(2, E)`` and the number of particles stored as the global shift span. """ # Check on the cluster input, if provided particles_v = particle_event.as_vector() num_particles = particles_v.size() clusters_v = None if cluster_event is not None: # Check that the cluster list is of the expected length clusters_v = cluster_event.as_vector() num_clusters = len(clusters_v) if num_particles not in (num_clusters, num_clusters - 1): raise ValueError( f"The number of particles ({num_particles}) must be " f"aligned with the number of clusters ({num_clusters}). " "There can me one more catch-all cluster at the end." ) # Build a list of edges edges, zero_nodes = [], [] for cluster_id, part in enumerate(particles_v): # If the parent ID is invalid (broken parentage), skip if part.parent_id() == INVAL_ID: continue # Only include edges within particle groups if explicitely requested if not self.include_fragment_edges and part.group_id() != cluster_id: continue # Add edge between particle and its direct parent if part.parent_id() != part.group_id(): edges.append([int(part.parent_id()), cluster_id]) # If the cluster event is provided, keep track of empty nodes if clusters_v is not None and clusters_v[cluster_id].size() == 0: zero_nodes.append(cluster_id) # Convert the list of edges to a numpy array if not edges: edges = np.empty((0, 2), dtype=np.int64) else: edges = np.vstack(edges).astype(np.int64) # Remove zero-pixel nodes, if possible if len(zero_nodes) > 0 and len(edges) > 0: edges = filter_invalid_nodes(edges, zero_nodes) return ParserEdgeIndex(features=edges.T, span=num_particles)
[docs] class LArCVSingleParticlePIDParser(ParserBase): """Get the first true particle's species. .. code-block. yaml schema: image_pid: parser: single_particle_pid particle_event: particle_pcluster """ # Name of the parser (as specified in the configuration) name = "single_particle_pid" # Type of object(s) returned by the parser returns = "scalar" # Overlay strategy for the objects returned by the parser overlay = "cat" def __call__(self, trees: dict[str, Any]) -> int: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process(self, particle_event: Any) -> int: """Fetch the species of the first particle. Configuration ------------- particle_event : larcv.EventParticle Particle event which contains the list of true particles Returns ------- int Species of the first particle """ pid = -1 for p in particle_event.as_vector(): if p.track_id() == 1: if int(p.pdg_code()) in PDG_TO_PID: pid = PDG_TO_PID[int(p.pdg_code())] break return pid
[docs] class LArCVSingleParticleEnergyParser(ParserBase): """Get the first true particle's kinetic energy. .. code-block. yaml schema: image_energy: parser: single_particle_energy particle_event: particle_pcluster """ # Name of the parser (as specified in the configuration) name = "single_particle_energy" # Type of object(s) returned by the parser returns = "scalar" # Overlay strategy for the objects returned by the parser overlay = "cat" def __call__(self, trees: dict[str, Any]) -> float: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process(self, particle_event: Any) -> float: """Fetch the kinetic energy of the first particle. Configuration ------------- particle_event : larcv.EventParticle Particle event which contains the list of true particles Returns ------- float Kinetic energy of the first particle """ ke = -1.0 for p in particle_event.as_vector(): if p.track_id() == 1: if int(p.pdg_code()) in PDG_TO_PID: einit = p.energy_init() pid = PDG_TO_PID[int(p.pdg_code())] mass = PID_MASSES[pid] ke = einit - mass break return ke