Source code for spine.io.parse.larcv.cluster

"""Module that contains all parsers related to LArCV cluster data.

Contains the following parsers:
- :class:`LArCVCluster2DParser`
- :class:`LArCVCluster3DParser`
- :class:`LArCVCluster3DAggregateParser`
- :class:`LArCVCluster3DChargeRescaledParser`
"""

from __future__ import annotations

from collections import OrderedDict
from typing import Any
from warnings import warn

import numpy as np

from spine.constants import (
    ANCST_COL,
    CLUST_COL,
    DELTA_SHP,
    GROUP_COL,
    INTER_COL,
    NU_COL,
    PART_COL,
    SHAPE_COL,
    SHAPE_PREC,
    VALUE_COL,
)
from spine.data import Meta
from spine.math.cluster import dbscan
from spine.math.distance import METRICS
from spine.utils.conditional import larcv
from spine.utils.particles import process_particle_event
from spine.utils.ppn import image_coordinates

from ..base import ParserBase
from ..clean_data import clean_sparse_data
from ..data import ParserTensor
from .sparse import (
    LArCVSparse3DAggregateParser,
    LArCVSparse3DChargeRescaledParser,
    LArCVSparse3DParser,
)

__all__ = [
    "LArCVCluster2DParser",
    "LArCVCluster3DParser",
    "LArCVCluster3DAggregateParser",
    "LArCVCluster3DChargeRescaledParser",
]


[docs] class LArCVCluster2DParser(ParserBase): """Class that retrieves and parses a 2D cluster list. .. code-block. yaml schema: cluster_label: parser: cluster2d cluster_event: cluster2d_pcluster """ # Name of the parser (as specified in the configuration) name = "cluster2d" # Type of object(s) returned by the parser returns = "tensor" def __init__(self, dtype: str, cluster_event: str, projection_id: int) -> None: """Initialize the parser. Parameters ---------- cluster_event : larcv.EventClusterPixel2D Event which contains the 2D clusters projection_id : int Projection ID to get the 2D images from """ # Initialize the parent class super().__init__(dtype, cluster_event=cluster_event) # Store the revelant attributes self.projection_id = projection_id # Define the overlay strategy parameters self.index_cols = np.array([CLUST_COL]) self.sum_cols = np.array([VALUE_COL]) def __call__(self, trees: dict[str, Any]) -> ParserTensor: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process(self, cluster_event: Any) -> ParserTensor: """Converts a 2D clusters tensor into a single tensor. Parameters ---------- cluster_event : larcv.EventClusterPixel2D Event which contains the 2D clusters Returns ------- ParserTensor coords : np.ndarray (N, 2) array of [x, y] coordinates features : np.ndarray (N, 2) array of [pixel value, cluster ID] meta : Meta Metadata of the parsed image """ # Get the cluster from the appropriate projection cluster_event_p = cluster_event.cluster_pixel_2d(self.projection_id) # Loop over clusters, store information meta = cluster_event_p.meta() num_clusters = cluster_event_p.size() clusters_voxels, clusters_features = [], [] for i in range(num_clusters): cluster = cluster_event_p.as_vector()[i] num_points = cluster.as_vector().size() if num_points > 0: x = np.empty(num_points, dtype=np.int32) y = np.empty(num_points, dtype=np.int32) value = np.empty(num_points, dtype=np.float32) larcv.as_flat_arrays(cluster, meta, x, y, value) pixels = np.stack([x, y], axis=1).astype(self.itype) value = value.astype(self.ftype) cluster_id = np.full(num_points, i, dtype=self.ftype) clusters_voxels.append(pixels) clusters_features.append(np.column_stack([value, cluster_id])) # Concatenate clusters if not clusters_voxels: np_voxels = np.empty((0, 2), dtype=self.ftype) np_features = np.empty((0, 2), dtype=self.ftype) else: np_voxels = np.concatenate(clusters_voxels, axis=0) np_features = np.concatenate(clusters_features, axis=0) # Evaluate shifts to apply to each index column index_shifts = np.max(np_features[:, -1], keepdims=True, initial=-1) + 1 return ParserTensor( coords=np_voxels, features=np_features, meta=Meta.from_larcv(meta), remove_duplicates=True, index_shifts=index_shifts, index_cols=self.index_cols, sum_cols=self.sum_cols, )
[docs] class LArCVCluster3DParser(ParserBase): """Class that retrieves and parses a 3D cluster list. .. code-block. yaml schema: cluster_label: parser: cluster3d cluster_event: cluster3d_pcluster particle_event: particle_pcluster particle_mpv_event: particle_mpv neutrino_event: neutrino_mpv sparse_semantics_event: sparse3d_semantics sparse_value_event: sparse3d_pcluster add_particle_info: true clean_data: true type_include_mpr: false type_include_secondary: false primary_include_mpr: true break_clusters: false """ # Name of the parser (as specified in the configuration) name = "cluster3d" # Type of object(s) returned by the parser returns = "tensor" def __init__( self, dtype: str, particle_event: Any | None = None, add_particle_info: bool = False, clean_data: bool = False, type_include_mpr: bool = True, type_include_secondary: bool = True, primary_include_mpr: bool = True, break_clusters: bool = False, break_eps: float = 1.1, break_metric: str = "chebyshev", shape_precedence: np.ndarray | list[int] | tuple[int, ...] = SHAPE_PREC, **kwargs: Any, ) -> None: """Initialize the parser. Parameters ---------- particle_event : larcv.EventParticle, optional List of true particle information. If prodided, allows to fetch more information about each of the pixels in the image add_particle_info : bool, default False If `True`, adds truth information from the true particle list clean_data : bool, default False If `True`, removes duplicate voxels type_include_mpr : bool, default False If `False`, sets all PID labels to -1 for MPR particles type_include_secondary : bool, default False If `False`, sets all PID labels to -1 for secondary particles primary_include_mpr : bool, default False If `False`, sets all primary labels to -1 for MPR particles break_clusters : bool, default False If `True`, runs DBSCAN on each cluster, assigns different cluster IDs to different fragments of the broken-down cluster break_eps : float, default 1.1 Distance scale used in the break up procedure break_metric : str, default 'chebyshev' Distance metric used in the break up produce shape_precedence: list, default SHAPE_PREC Array of classes in the reference array, ordered by precedence **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__(dtype, particle_event=particle_event, **kwargs) # Store the revelant attributes self.add_particle_info = add_particle_info self.clean_data = clean_data self.type_include_mpr = type_include_mpr self.type_include_secondary = type_include_secondary self.primary_include_mpr = primary_include_mpr self.shape_precedence = np.asarray(shape_precedence) # Intialize DBSCAN if the clusters are to be broken up self.break_clusters = break_clusters self.break_eps = break_eps self.break_metric_id = METRICS[break_metric] # Intialize the sparse and particle parsers self.sparse_parser = LArCVSparse3DParser(dtype, sparse_event="dummy") # If particle information is to be incldued, check that it is provided if self.add_particle_info: # Must provide particle event to build particle info assert particle_event is not None, ( "If `add_particle_info` is `True`, must provide the " "`particle_event` argument." ) # Define the overlay strategy parameters self.sum_cols = np.array([VALUE_COL]) if not self.add_particle_info: self.index_cols = np.array([CLUST_COL]) self.prec_col = None else: self.index_cols = np.array( [CLUST_COL, PART_COL, GROUP_COL, ANCST_COL, INTER_COL, NU_COL] ) self.prec_col = SHAPE_COL def __call__(self, trees: dict[str, Any]) -> ParserTensor: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process(**self.get_input_data(trees))
[docs] def process( self, cluster_event: Any, particle_event: Any | None = None, particle_mpv_event: Any | None = None, neutrino_event: Any | None = None, sparse_semantics_event: Any | None = None, sparse_value_event: Any | None = None, ) -> ParserTensor: """Parse a list of 3D clusters into a single tensor. Parameters ---------- cluster_event : larcv.EventClusterVoxel3D Event which contains the 3D clusters particle_event : larcv.EventParticle, optional List of true particle information. If prodided, allows to fetch more information about each of the pixels in the image particle_mpv_event : larcv.EventParticle, optional List of true particle information for MPV particles only. If provided, it is used to determine which particles are MPV particle_mpv_event: larcv.EventNeutrino, optional List of true neutrino information. If provided, it is used to determine which particles are MPV sparse_semantics_event : larcv.EventSparseTensor3D, optional Semantics of each of the voxels in the image. If provided, overrides the order of precedence used in combining clusters which share voxels. sparse_value_event : larcv.EventSparseTensor3D, optional Value of each of the voxels in the image. If provided, overrides the value provided byt eh list of 3D clusters itself Returns ------- ParserTensor coords : np.ndarray (N, 3) array of [x, y, z] coordinates features : np.ndarray (N, 2/14) array of features, minimally [voxel value, cluster ID]. If `add_particle_info` is `True`, the additonal columns are [particle ID, group ID, interaction ID, neutrino ID, particle type, group primary bool, interaction primary bool, vertex x, vertex y, vertex z, momentum, semantic type] meta : Meta Metadata of the parsed image """ # Get the cluster-wise information first meta = cluster_event.meta() num_clusters = cluster_event.as_vector().size() labels = OrderedDict() labels["cluster"] = np.arange(num_clusters) num_particles = num_clusters num_neutrinos = 0 if self.add_particle_info: # Check that that particle objects are of the expected length assert particle_event is not None # Guaranteed by __init__ num_particles = particle_event.size() assert num_particles in (num_clusters, num_clusters - 1), ( f"The number of particles ({num_particles}) must be " f"aligned with the number of clusters ({num_clusters}). " f"There can me one more catch-all cluster at the end." ) # Load up the particle list particles = list(particle_event.as_vector()) # Fetch the variables missing from the larcv objects ( _, group_ids, ancestor_ids, interaction_ids, nu_ids, group_primaries, inter_primaries, types, ) = process_particle_event( particle_event, particle_mpv_event, neutrino_event ) # Store the cluster ID information labels["cluster"] = [p.id() for p in particles] labels["part"] = [p.id() for p in particles] labels["group"] = group_ids labels["ancst"] = ancestor_ids labels["inter"] = interaction_ids labels["nu"] = nu_ids # Store the type/primary status labels["type"] = types labels["pgroup"] = group_primaries labels["pinter"] = inter_primaries # Store the vertex and momentum anc_pos = np.empty((len(particles), 3), dtype=self.ftype) for i, p in enumerate(particles): anc_pos[i] = image_coordinates(meta, p.ancestor_position()) labels["vtx_x"] = anc_pos[:, 0] labels["vtx_y"] = anc_pos[:, 1] labels["vtx_z"] = anc_pos[:, 2] labels["p"] = [p.p() for p in particles] # Store the shape last (consistent with semantics tensor) labels["shape"] = [p.shape() for p in particles] # If requested, give invalid labels to a subset of particles if not self.type_include_secondary: secondary_mask = np.where(np.array(labels["pinter"]) < 1)[0] labels["type"] = np.asarray(labels["type"]) labels["type"][secondary_mask] = -1 if not self.type_include_mpr or not self.primary_include_mpr: mpr_mask = np.where(np.array(labels["nu"]) < 0)[0] if not self.type_include_mpr: labels["type"] = np.asarray(labels["type"]) labels["type"][mpr_mask] = -1 if not self.primary_include_mpr: labels["pinter"] = np.asarray(labels["pinter"]) labels["pinter"][mpr_mask] = -1 # Count the number of neutrinos if neutrino_event is not None: num_neutrinos = len(neutrino_event.as_vector()) else: num_neutrinos = np.max(nu_ids, initial=-1) + 1 # Loop over clusters, store information clusters_voxels, clusters_features = [], [] id_offset = 0 for i in range(num_clusters): cluster = cluster_event.as_vector()[i] num_points = cluster.as_vector().size() if num_points > 0: # Get the position and pixel value from EventSparseTensor3D x = np.empty(num_points, dtype=np.int32) y = np.empty(num_points, dtype=np.int32) z = np.empty(num_points, dtype=np.int32) value = np.empty(num_points, dtype=np.float32) larcv.as_flat_arrays(cluster, meta, x, y, z, value) voxels = np.stack([x, y, z], axis=1).astype(self.itype) value = value.astype(self.ftype) clusters_voxels.append(voxels) # Append the cluster-wise information features = [value] for l in labels.values(): val = l[i] if i < num_particles else -1 features.append(np.full(num_points, val, dtype=self.ftype)) # If requested, break cluster into detached pieces if self.break_clusters: frag_labels = dbscan( voxels, eps=self.break_eps, min_samples=1, metric_id=self.break_metric_id, ) features[1] = id_offset + frag_labels id_offset += np.max(frag_labels, initial=-1) + 1 clusters_features.append(np.column_stack(features)) # If there are no non-empty clusters, return. Concatenate otherwise if not clusters_voxels: np_voxels = np.empty((0, 3), dtype=self.itype) np_features = np.empty((0, len(labels) + 1), dtype=self.ftype) else: np_voxels = np.concatenate(clusters_voxels, axis=0) np_features = np.concatenate(clusters_features, axis=0) # If requested, remove duplicate voxels (cluster overlaps) and # match the semantics to those of the provided reference if len(np_voxels) and ( (sparse_semantics_event is not None) or (sparse_value_event is not None) ): if not self.clean_data: warn( "You must set `clean_data` to `True` if you specify a " "sparse tensor in `Cluster3DParser`." ) self.clean_data = True # Extract voxels and features assert self.add_particle_info, ( "Need to add particle info to fetch particle " "semantics for each voxel." ) assert ( sparse_semantics_event is not None ), "Need to provide a semantics tensor to clean up output." tensor_seg = self.sparse_parser.process(sparse_semantics_event) np_voxels, np_features = clean_sparse_data( np_voxels, np_features, tensor_seg.coords, precedence=self.shape_precedence, sum_cols=self.sum_cols - VALUE_COL, ) # Match the semantic column to the reference tensor np_features[:, -1] = tensor_seg.features[:, -1] # Set all cluster labels to -1 if semantic class is LE or ghost shape_mask = tensor_seg.features[:, -1] > DELTA_SHP np_features[shape_mask, 1:-1] = -1 # If a value tree is provided, override value colum if sparse_value_event: tensor_val = self.sparse_parser.process(sparse_value_event) np_features[:, 0] = tensor_val.features[:, 0] # Evaluate shifts to apply to each index column clust_shift = np.max(np_features[:, 1], initial=-1) + 1 if not self.add_particle_info: index_shifts = np.array([clust_shift]) else: cs, ps, ns = clust_shift, num_particles, num_neutrinos index_shifts = np.array([cs, ps, ps, ps, ps, ns]) return ParserTensor( coords=np_voxels, features=np_features, meta=Meta.from_larcv(meta), remove_duplicates=True, index_shifts=index_shifts, index_cols=self.index_cols, sum_cols=self.sum_cols, prec_col=self.prec_col, precedence=self.shape_precedence, )
[docs] class LArCVCluster3DAggregateParser(LArCVCluster3DParser): """Identical to :class:`Cluster3DParser`, but aggregates charge information from multiple value sources. """ # Name of the parser (as specified in the configuration) name = "cluster3d_aggr" def __init__( self, dtype: str, sparse_value_event_list: list[str], value_aggr: str, **kwargs: Any, ) -> None: """Initialize the parser. Parameters ---------- sparse_value_event_list : List[larcv.EventSparseTensor3D] List of sparse tensors used to compute the aggregated charge value_aggr : str Value aggregation function to apply ('sum', 'mean', 'max', etc.) **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__( dtype, sparse_value_event_list=sparse_value_event_list, **kwargs ) # Initialize the sparse parser which computes the rescaled charge self.sparse_aggr_parser = LArCVSparse3DAggregateParser( dtype, sparse_event_list=sparse_value_event_list, aggr=value_aggr ) def __call__(self, trees: dict[str, Any]) -> ParserTensor: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process_aggr(**self.get_input_data(trees))
[docs] def process_aggr( self, sparse_value_event_list: list[Any], **kwargs: Any ) -> ParserTensor: """Parse a list of 3D clusters into a single tensor and fetch the value column by aggregating multiple tensor features. Parameters ---------- sparse_value_event_list : List[larcv.EventSparseTensor3D] List of sparse value tensors **kwargs : dict, optional Extra data products to pass to the parent Cluster3DParser Returns ------- ParserTensor coords : np.ndarray (N, 3) array of [x, y, z] coordinates features : np.ndarray (N, 2/14) array of features, minimally [voxel value, cluster ID]. If `add_particle_info` is `True`, the additonal columns are [group ID, interaction ID, neutrino ID, particle type, group primary bool, interaction primary bool, vertex x, vertex y, vertex z, momentum, semantic type, particle ID] meta : Meta Metadata of the parsed image """ # Process the input using the main parser tensor = self.process(**kwargs) # Modify the value column using the aggregate tensor values tensor_val = self.sparse_aggr_parser.process_aggr(sparse_value_event_list) tensor.features[:, 0] = tensor_val.features[:, 0] return tensor
[docs] class LArCVCluster3DChargeRescaledParser(LArCVCluster3DParser): """Identical to :class:`Cluster3DParser`, but computes rescaled charges on the fly. """ # Name of the parser (as specified in the configuration) name = "cluster3d_rescale_charge" def __init__( self, dtype: str, sparse_value_event_list: list[str], collection_only: bool = False, collection_id: int = 2, **kwargs: Any, ) -> None: """Initialize the parser. Parameters ---------- sparse_value_event_list : List[larcv.EventSparseTensor3D] (7) List of sparse tensors used to compute the rescaled charge - Charge value of each of the contributing planes (3) - Index of the plane hit contributing to the space point (3) - Semantic labels (1) collection_only : bool, default False If True, only uses the collection plane charge collection_id : int, default 2 Index of the collection plane **kwargs : dict, optional Data product arguments to be passed to the `process` function """ # Initialize the parent class super().__init__( dtype, sparse_value_event_list=sparse_value_event_list, **kwargs ) # Initialize the sparse parser which computes the rescaled charge self.sparse_rescale_parser = LArCVSparse3DChargeRescaledParser( dtype, sparse_event_list=sparse_value_event_list, collection_only=collection_only, collection_id=collection_id, ) def __call__(self, trees: dict[str, Any]) -> ParserTensor: """Parse one entry. Parameters ---------- trees : dict Dictionary which maps each data product name to a LArCV object """ return self.process_rescale(**self.get_input_data(trees))
[docs] def process_rescale( self, sparse_value_event_list: list[Any], **kwargs: Any ) -> ParserTensor: """Parse a list of 3D clusters into a single tensor and reset the value column by rescaling the charge coming from 3 wire planes. Parameters ---------- sparse_value_event_list : List[larcv.EventSparseTensor3D] (7) List of sparse tensors used to compute the rescaled charge - Charge value of each of the contributing planes (3) - Index of the plane hit contributing to the space point (3) - Semantic labels (1) **kwargs : dict, optional Extra data products to pass to the parent Cluster3DParser Returns ------- ParserTensor coords : np.ndarray (N, 3) array of [x, y, z] coordinates features : np.ndarray (N, 2/14) array of features, minimally [voxel value, cluster ID]. If `add_particle_info` is `True`, the additonal columns are [group ID, interaction ID, neutrino ID, particle type, group primary bool, interaction primary bool, vertex x, vertex y, vertex z, momentum, semantic type, particle ID] meta : Meta Metadata of the parsed image """ # Process the input using the main parser tensor = self.process(**kwargs) # Modify the value column using the charge rescaled on the fly tensor_val = self.sparse_rescale_parser.process_rescale(sparse_value_event_list) tensor.features[:, 0] = tensor_val.features[:, 0] return tensor