Source code for spine.utils.ppn

"""Module which contains utility function to process PPN information.

It contains functions to produce PPN labels and functions to process the
PPN predictions into something human-readable.
"""

from warnings import warn

import numba as nb
import numpy as np

import spine.math as sm
from spine.constants import (
    BATCH_COL,
    COORD_COLS,
    DELTA_SHP,
    LOWES_SHP,
    MICHL_SHP,
    PPN_CLASS_COLS,
    PPN_END_COLS,
    PPN_OCC_COL,
    PPN_ROFF_COLS,
    PPN_ROFF_COLS_HI,
    PPN_ROFF_COLS_LO,
    PPN_RPOS_COLS,
    PPN_RPOS_COLS_HI,
    PPN_RPOS_COLS_LO,
    PPN_RTYPE_COLS,
    PPN_SCORE_COLS,
    PPN_SHAPE_COL,
    SHOWR_SHP,
    TRACK_SHP,
    UNKWN_SHP,
)
from spine.data import IndexBatch, TensorBatch

from .conditional import torch
from .jit import numbafy
from .torch.scripts import cdist_fast


[docs] class PPNPredictor: """PPN post-processing class to convert PPN raw predictions into points.""" def __init__( self, score_threshold=0.5, type_score_threshold=0.5, type_dist_threshold=1.999, pool_score_fn="max", pool_dist=1.999, enforce_type=True, classes=[SHOWR_SHP, TRACK_SHP, MICHL_SHP, DELTA_SHP], apply_deghosting=False, ): """Initialize the PPN post-processor. Parameters ---------- score_threshold : float, default 0.5 Score above which a point is considered to be active type_score_threshold : float, default 0.5 Score above which a type prediction must be to be considered type_dist_threshold : float, default 1.999 Distance threshold for matching with semantic type predictions pool_score_fn : str, default 'max' Which operation to use to pool PPN points scores ('max' or 'mean') pool_dist : float, default 1.999 Distance below which PPN points should be merged into one (DBSCAN) enforce_type : bool, default True Whether to force PPN points predicted of type X to be within N voxels of a voxel with same predicted semantic type classes : List[int], default [0, 1, 2, 3] Number of semantic classes apply_deghosting : bool, default False Whether to deghost the input, if a `ghost` tensor is provided """ # Store the parameters self.score_threshold = score_threshold self.type_score_threshold = type_score_threshold self.type_dist_threshold = type_dist_threshold self.enforce_type = enforce_type self.classes = classes self.apply_deghosting = apply_deghosting # Store the score pooling function self.pool_dist = pool_dist self.pool_score_fn = pool_score_fn def __call__( self, ppn_points, ppn_coords, ppn_masks, ppn_classify_endpoints=None, segmentation=None, ghost=None, entry=None, selection=None, **kwargs, ): """Converts the batched raw output of PPN to a discrete set of proposed points of interest. Notes ----- This function works on both wrapped (:class:`TensorBatch`) and unwrapped (`List[np.ndarray]`) batches of data. Parameters ---------- ppn_points : Union[TensorBatch, List[np.ndarray]] Raw output of PPN ppn_coords : Union[List[TensorBatch], List[List[np.ndarray]] Coordinates of the image at each PPN layer ppn_masks : Union[List[TensorBatch], List[List[np.ndarray]] Predicted masks of at each PPN layer ppn_classify_endpoints : Union[TensorBatch, List[np.ndarray]], optional Raw logits from the end point classification layer of PPN segmentation : Union[TensorBatch, List[np.ndarray]], optional Raw logits from the semantic segmentation network output ghost : Union[TensorBatch, List[np.ndarray]], optional Raw logits from the ghost segmentation network output entry : int, optional Entry in the batch for which to compute the point predictions selection : Union[IndexBatch, List[np.ndarray]], optional List of indexes to consider exclusively (e.g. to get PPN predictions within a list of clusters) **kwargs : dict, optional Extraneous outputs not used in this post-processor Returns ------- Union[TensorBatch, List[np.ndarray]] (N, P) Tensor of predicted points with P divided between [batch_id, x, y, z, validity scores (2), occupancy, type scores (5), predicted type, endpoint type] """ # Set the list of entries to loop over if entry is not None: assert isinstance(entry, int), "If entry is specified, must be integer" entries = [entry] else: entries = range(len(ppn_points)) # Loop over the entries, process it ppn_pred = [] ppn_classify_endpoints_b, segmentation_b, ghost_b, selection_b = ( None, None, None, None, ) for b in range(len(ppn_points)): # Prepare input for that entry ppn_points_b = ppn_points[b] if isinstance(ppn_points, TensorBatch): ppn_coords_b = ppn_coords[-1][b][:, COORD_COLS] ppn_mask_b = ppn_masks[-1][b].flatten() else: ppn_coords_b = ppn_coords[b][-1][:, COORD_COLS] ppn_mask_b = ppn_masks[b][-1].flatten() if ppn_classify_endpoints is not None: ppn_classify_endpoints_b = ppn_classify_endpoints[b] if segmentation is not None: segmentation_b = segmentation[b] if ghost is not None: ghost_b = ghost[b] if selection is not None: selection_b = selection[b] # Append self.entry = b ppn_pred.append( self.process_single( ppn_points_b, ppn_coords_b, ppn_mask_b, ppn_classify_endpoints_b, segmentation_b, ghost_b, selection_b, ) ) # Return if entry is not None: return ppn_pred[0] elif not isinstance(ppn_points, TensorBatch): return ppn_pred else: tensor = TensorBatch.from_list(ppn_pred) tensor.coord_cols = COORD_COLS return tensor
[docs] def process_single( self, ppn_raw, ppn_coords, ppn_mask, ppn_ends=None, segmentation=None, ghost=None, selection=None, ): """Converts the PPN output from a single entry into points of interests for that entry. Notes ----- This function works with both `torch.Tensor` and `np.ndarray` inputs. Parameters ---------- ppn_raw : Union[torch.Tensor, np.ndarray] Raw output of PPN ppn_coords : Union[torch.Tensor, np.ndarray] Coordinates of the image at each PPN layer ppn_masks : Union[torch.Tensor, np.ndarray] Predicted masks of at each PPN layer ppn_ends : Union[torch.Tensor, np.ndarray], optional Raw logits from the end point classification layer of PPN segmentation : Union[torch.Tensor, np.ndarray], optional Raw logits from the semantic segmentation network output ghost : Union[torch.Tensor, np.ndarray], optional Raw logits from the ghost segmentation network output selection : Union[torch.Tensor, np.ndarray], optional List of indexes to consider exclusively (e.g. to get PPN predictions within a list of clusters) Returns ------- Union[torch.Tensor, np.ndarray] Predicted points encoded as rows containing coordinates, validity scores, occupancy, type scores, predicted type and endpoint type. """ # Define operations on the basis of the input type if torch.is_tensor(ppn_raw): dtype, device = ppn_raw.dtype, ppn_raw.device cat, unique, argmax = torch.cat, torch.unique, torch.argmax where, mean, softmax = torch.where, torch.mean, torch.softmax cdist = cdist_fast empty = lambda x: torch.empty(x, dtype=dtype, device=device) zeros = lambda x: torch.zeros(x, dtype=dtype, device=device) pool_fn = getattr(torch, self.pool_score_fn) if self.pool_score_fn == "max": pool_fn = torch.amax else: cat, unique, argmax = np.concatenate, np.unique, np.argmax where, mean = np.where, np.mean softmax, cdist = sm.softmax, sm.distance.cdist empty = lambda x: np.empty(x, dtype=ppn_raw.dtype) zeros = lambda x: np.zeros(x, dtype=ppn_raw.dtype) pool_fn = getattr(np, self.pool_score_fn) # Fetch the segmentation tensor, if needed if self.enforce_type: assert ( segmentation is not None ), "Must provide the segmentation tensor to enforce types" if ghost is not None and self.apply_deghosting: mask_ghost = where(argmax(ghost, 1) == 0)[0] segmentation = segmentation[mask_ghost] # Restrict the PPN output to points above the score threshold scores = softmax(ppn_raw[:, PPN_RPOS_COLS], 1) mask = ppn_mask & (scores[:, -1] > self.score_threshold) # Restrict the PPN output to a subset of points, if requested if selection is not None: mask_update = zeros(mask.shape, dtype=bool) if entry is not None: assert len(selection) == len(ppn_points) and not np.issclar( selection[0] ) mask_update[selection[b]] = True else: assert len(selection) and np.issclar(selection[0]) mask_update[selection] = True mask &= mask_update # Apply the mask mask = where(mask)[0] scores = scores[mask] ppn_raw = ppn_raw[mask] ppn_coords = ppn_coords[mask] if ppn_ends is not None: ppn_ends = ppn_ends[mask] # Get the type predictions type_scores = softmax(ppn_raw[:, PPN_RTYPE_COLS], 1) type_pred = argmax(type_scores, 1) if ppn_ends is not None: end_scores = softmax(ppn_ends, 1) # Get the PPN point predictions coords = ppn_coords + 0.5 + ppn_raw[:, PPN_ROFF_COLS] if self.enforce_type: # Loop over the invidual classes seg_masks = [] for c in self.classes: # Restrict the points to a specific class seg_pred = argmax(segmentation[mask], 1) seg_mask = seg_pred == c seg_mask &= type_scores[:, c] > self.type_score_threshold seg_mask = where(seg_mask)[0] # Make sure the points are within range of compatible class dist_mat = cdist(coords[seg_mask], ppn_coords[seg_mask]) dist_mask = (dist_mat < self.type_dist_threshold).any(1) seg_mask = seg_mask[dist_mask] seg_masks.append(seg_mask) # Restrict the available points further seg_mask = cat(seg_masks) coords = coords[seg_mask] scores = scores[seg_mask] type_pred = type_pred[seg_mask] type_scores = type_scores[seg_mask] if ppn_ends is not None: end_scores = end_scores[seg_mask] # At this point, if there are no valid proposed points left, abort if not len(coords): return empty((0, 13 + 2 * (ppn_ends is not None))) # Cluster nearby points together if torch.is_tensor(coords): clusts = self.dbscan_points(coords.detach().cpu().numpy()) else: clusts = self.dbscan_points(coords) ppn_pred = empty((len(clusts), 13 + 2 * (ppn_ends is not None))) for i, c in enumerate(clusts): types, cnts = unique(type_pred[c], return_counts=True) type_c = types[argmax(cnts)] ppn_pred[i, BATCH_COL] = self.entry ppn_pred[i, COORD_COLS] = mean(coords[c], 0) ppn_pred[i, PPN_SCORE_COLS] = pool_fn(scores[c], 0) ppn_pred[i, PPN_OCC_COL] = len(c) ppn_pred[i, PPN_CLASS_COLS] = pool_fn(type_scores[c], 0) ppn_pred[i, PPN_SHAPE_COL] = type_c if ppn_ends is not None: ppn_pred[i, PPN_END_COLS] = pool_fn(end_scores[c], 0) return ppn_pred
[docs] def dbscan_points(self, coordinates): """Form clusters of predited points based on proximity. Parameters ---------- coordinates : np.ndarray Coordinates of the points to cluster Returns ------- List[np.ndarray] List of proposed point cluster indexes """ # Assign cluster labels to all proposed poins labels = sm.cluster.dbscan(coordinates, eps=self.pool_dist, min_samples=1) # Convert the list of labels into a list of cluster indexes clusts = [] for c in np.unique(labels): clusts.append(np.where(labels == c)[0]) return clusts
[docs] class ParticlePointPredictor: """Produces start/end points given a list of particles and PPN predictions. Given a list particle or fragment clusters, leverage the raw PPN output to produce a list of start points for shower objects and of start/end points for track objects: - For showers, pick the most likely PPN point - For tracks, pick the two points farthest away from each other """ def __init__( self, use_numpy=True, contained_first=True, anchor_points=True, enhance_track_points=False, approx_farthest_points=True, ): """Initialize the particle point predictor. Parameters ---------- use_numpy : bool, default True Compute the particle start/end points on CPU using numpy+numba contained_first : bool, default True If `True`, for shower points, give precedence to voxels which predict a point within one voxel of their location anchor_points : bool, default True If `True`, the point estimates are brought to the closest cluster voxel enhance_track_points, default False If `True`, tracks leverage PPN predictions to provide a more accurate estimate of the end points. This needs to be avoided for track fragments, as PPN is typically not trained to find end points for them. If set to `False`, the two voxels farthest away from each other are picked. approx_farthest_points: bool, default True If `True`, approximate the computation of the two farthest points """ # Store the point predictor parameters self.use_numpy = use_numpy self.contained_first = contained_first self.anchor_points = anchor_points self.enhance_track_points = enhance_track_points self.approx_farthest_points = approx_farthest_points def __call__(self, data, clusts, clust_shapes, ppn_points): """Assign start/end points to one batch of events. Parameters ---------- data : Union[np.ndarray, torch.Tensor, TensorBatch] (N, 1 + D + N_f) tensor of voxel/value pairs - N is the the total number of voxels in the image - 1 is the batch ID - D is the number of dimensions in the input image - N_f is the number of features per voxel clusts : Union[List[np.ndarray], IndexBatch] List of particle clusters clust_shapes : Union[np.ndarray, torch.Tensor, TensorBatch] Semantic type of each of the clusters ppn_points : TensorBatch Raw output of PPN """ # Bring TensorBatch to torch.Tensor, if needed is_batch = False if isinstance(data, TensorBatch): is_batch = True if not isinstance(clusts, IndexBatch): raise TypeError( "Batched particle point prediction expects an IndexBatch." ) if not clusts.is_list: raise TypeError( "Particle point prediction expects list-backed cluster indexes." ) data, clust_shapes, ppn_points = ( data.tensor, clust_shapes.tensor, ppn_points.tensor, ) clusts, counts = clusts.index_list, clusts.counts # Get cluster point coordinates, dispatch points = data[:, COORD_COLS] if not self.use_numpy: # Check type, pass to torch function assert isinstance( data, torch.Tensor ), "When using `use_numpy=False`, must use torch data." end_points = self.get_end_points_torch( points, clusts, clust_shapes, ppn_points ) else: # Pass to numpy function (takes care of object conversion) end_points = self.get_end_points_numpy( points, clusts, clust_shapes, ppn_points ) # Transform back to TensorBatch, if needed, then return if is_batch: end_points = TensorBatch(end_points, counts, coord_cols=np.arange(6)) return end_points
[docs] def get_end_points_torch(self, points, clusts, clusts_seg, ppn_points): """Torch function to fetch each of the cluster end points. Parameters ---------- points : torch.Tensor (N, 3) Image point coordinates clusts : List[np.ndarray] List of particle clusters clust_shapes : np.ndarray Semantic type of each of the clusters ppn_points : torch.Tensor Raw output of PPN """ # Loop over the relevant clusters end_points = torch.empty( (len(clusts), 6), dtype=points.dtype, device=points.device ) for i, c in enumerate(clusts): # Get cluster coordinates points_c = points[c] ppn_points_c = ppn_points[c] # For tracks, find the two poins farthest away from each other if clusts_seg[i] == TRACK_SHP: # Get the two most separated points in the cluster idx = torch.argmax(cdist_fast(points_c, points_c)) idxs = sorted([int(idx // len(points_c)), int(idx % len(points_c))]) track_points = points_c[idxs] # If requested, enhance using the PPN predictions. Only consider # points in the cluster that have a positive score if self.enhance_track_points: pos_mask = ( ppn_points_c[idxs, PPN_RPOS_COLS[1]] >= ppn_points_c[idxs, PPN_RPOS_COLS[0]] ) track_points += pos_mask * ( ppn_points_c[idxs][:, PPN_ROFF_COLS] + 0.5 ) # If needed, anchor the track endpoints to the track cluster if self.anchor_points: dist_mat = cdist_fast(track_points, points_c) track_points = points_c[torch.argmin(dist_mat, 1)] # Store end_points[i] = track_points.flatten() # For showers, find the most likely point else: # Only use positive voxels and give precedence to predictions # that are contained within the voxel making the prediction. ppn_scores = torch.softmax(ppn_points_c[:, PPN_RPOS_COLS], 1)[:, -1] if self.contained_first: dists = torch.abs(ppn_points_c[:, PPN_ROFF_COLS]) val_index = torch.where((ppn_scores > 0.5) & (dists < 1.0).all(1))[ 0 ] if len(val_index): best_id = val_index[torch.argmax(ppn_scores[val_index])] else: best_id = torch.argmax(ppn_scores) else: best_id = torch.argmax(ppn_scores) start_point = ( points_c[best_id] + ppn_points_c[best_id, PPN_ROFF_COLS] + 0.5 ) # If needed, anchor the shower start point to the shower cluster if self.anchor_points: dists = cdist_fast(start_point[None, :], points_c) start_point = points_c[torch.argmin(dists)] # Store twice to preserve the feature vector length end_points[i] = torch.cat((start_point, start_point), 0) # Return points return end_points
[docs] @numbafy( cast_args=["points", "ppn_points"], list_args=["clusts"], keep_torch=True, ref_arg="points", ) def get_end_points_numpy(self, points, clusts, clust_shapes, ppn_points): """Parallelized numba function to fetch each of the cluster end points. Parameters ---------- points : np.darray (N, 3) Image point coordinates clusts : List[np.ndarray] List of particle clusters clust_shapes : np.darray Semantic type of each of the clusters ppn_points : np.ndarray Raw output of PPN """ # If there are no clusters, nothing to do if len(clusts) == 0: return np.empty((0, 6), dtype=points.dtype) return self._get_end_points_numpy( points, clusts, clust_shapes, ppn_points, self.contained_first, self.anchor_points, self.enhance_track_points, self.approx_farthest_points, )
@staticmethod @nb.njit(cache=True, parallel=True, nogil=True) def _get_end_points_numpy( points: nb.float32[:, :], clusts: nb.types.List(nb.int64[:]), clust_shapes: nb.int64[:], ppn_points: nb.float32[:, :], contained_first: nb.boolean, anchor_points: nb.boolean, enhance_track_points: nb.boolean, approx_farthest_pair: nb.boolean, ): # Loop over the relevant clusters end_points = np.empty((len(clusts), 6), dtype=points.dtype) for k in nb.prange(len(clusts)): # Get cluster coordinates c = clusts[k] points_c = points[c] ppn_points_c = ppn_points[c] # For tracks, find the two poins farthest away from each other if clust_shapes[k] == TRACK_SHP: # Get the two most separated points in the cluster idxs = np.sort( np.array( sm.distance.farthest_pair(points_c, approx_farthest_pair)[:2] ) ) track_points = points_c[idxs] # If requested, enhance using the PPN predictions. Only consider # points in the cluster that have a positive score if enhance_track_points: pos_mask = ( ppn_points_c[idxs, PPN_RPOS_COLS[1]] >= ppn_points_c[idxs, PPN_RPOS_COLS[0]] ) track_points += pos_mask * ( ppn_points_c[idxs][:, PPN_ROFF_COLS_LO:PPN_ROFF_COLS_HI] + np.array(0.5, dtype=points.dtype) ) # If needed, anchor the track endpoints to the track cluster if anchor_points: dist_mat = sm.distance.cdist(track_points, points_c) track_points = points_c[np.argmin(dist_mat, 1)] # Store end_points[k] = track_points.flatten() # For showers, find the most likely point else: # Only use positive voxels and give precedence to predictions # that are contained within the voxel making the prediction. ppn_scores = sm.softmax( ppn_points_c[:, PPN_RPOS_COLS_LO:PPN_RPOS_COLS_HI], 1 )[:, -1] if contained_first: dists = np.abs(ppn_points_c[:, PPN_ROFF_COLS_LO:PPN_ROFF_COLS_HI]) val_index = np.where((ppn_scores > 0.5) & sm.all(dists < 1.0, 1))[0] if len(val_index): best_id = val_index[np.argmax(ppn_scores[val_index])] else: best_id = np.argmax(ppn_scores) else: best_id = np.argmax(ppn_scores) start_point = ( points_c[best_id] + ppn_points_c[best_id, PPN_ROFF_COLS_LO:PPN_ROFF_COLS_HI] + np.array(0.5, dtype=points.dtype) ) # If needed, anchor the shower start point to the shower cluster if anchor_points: dists = sm.distance.cdist(start_point[None, :], points_c) start_point = points_c[np.argmin(dists)] # Store twice to preserve the feature vector length end_points[k] = np.concatenate((start_point, start_point)) return end_points
[docs] def check_track_orientation_ppn(start_point, end_point, ppn_candidates): """Use PPN end point predictions to predict track orientation. Use the PPN point assignments as a basis to orient a track. Match the end points of a track to the closest PPN candidate and pick the candidate with the highest start score as the start point Parameters ---------- start_point : np.ndarray (3) Start point of the track end_point : np.ndarray (3) End point of the track ppn_candidates : np.ndarray (N, 10) PPN point candidates and their associated scores Returns ------- bool Returns `True` if the start point provided is correct, `False` if the end point is more likely to be the start point. """ # If there's no PPN candidates, nothing to do here if not len(ppn_candidates): return True # Get the candidate coordinates and end point classification predictions ppn_points = ppn_candidates[:, COORD_COLS] end_scores = ppn_candidates[:, PPN_END_COLS] # Compute the distance between the track end points and the PPN candidates end_points = np.vstack([start_point, end_point]) dist_mat = sm.distance.cdist(end_points, ppn_points) # If both track end points are closest to the same PPN point, the start # point must be closest to it if the score is high, farthest otherwise argmins = np.argmin(dist_mat, axis=1) if argmins[0] == argmins[1]: label = np.argmax(end_scores[argmins[0]]) dists = dist_mat[[0, 1], argmins] return (label == 0 and dists[0] < dists[1]) or ( label == 1 and dists[1] < dists[0] ) # In all other cases, check that the start point is associated with the PPN # point with the lowest end score end_scores = end_scores[argmins, -1] return end_scores[0] < end_scores[1]
[docs] def get_ppn_labels( particle_v, meta, dtype, dim=3, min_voxel_count=1, min_energy_deposit=0, include_point_tagging=True, ): """Gets particle point coordinates and informations for running PPN. We skip some particles under specific conditions (e.g. low energy deposit, low voxel count, nucleus track, etc.) Parameters ---------- particle_v : List[larcv.Particle] List of LArCV particle objects in the image meta : larcv::Voxel3DMeta or larcv::ImageMeta Metadata information dtype : str Typing of the output PPN labels dim : int, default 3 Number of dimensions of the image min_voxel_count : int, default 5 Minimum number of voxels associated with a particle to be included min_energy_deposit : float, default 0 Minimum energy deposition associated with a particle to be included include_point_tagging : bool, default True If True, include an a label of 0 for start points and 1 for end points Returns ------- np.array Array of points of shape (N, 5/6) where 5/6 = x,y,z + point type + particle index [+ start (0) or end (1) point tagging] """ # Check on dimension if dim not in [2, 3]: raise ValueError( "The image dimension must be either 2 or 3, " f"got {dim} instead." ) # Loop over true particles part_info = [] for part_index, particle in enumerate(particle_v): # Check that the particle has the expected index if part_index != particle.id(): warn("Particle list index does not match its `id` attribute.") # If the particle does not meet minimum energy/size requirements, skip if ( particle.energy_deposit() < min_energy_deposit or particle.num_voxels() < min_voxel_count ): continue # If the particle is a nucleus, skip. # TODO: check if it's useful pdg_code = abs(particle.pdg_code()) if pdg_code > 1000000000: continue # If a shower has its first step outside of detector boundaries, skip # TODO: check if it's useful if pdg_code == 11 or pdg_code == 22: if not image_contains(meta, particle.first_step(), dim): continue # Skip low energy scatters and unknown shapes shape = particle.shape() if particle.shape() in [LOWES_SHP, UNKWN_SHP]: continue # Append the start point with the rest of the particle information first_step = image_coordinates(meta, particle.first_step(), dim) part_extra = ( [shape, part_index, 0] if include_point_tagging else [shape, part_index] ) part_info.append(first_step + part_extra) # Append the end point as well, for tracks only if shape == TRACK_SHP: last_step = image_coordinates(meta, particle.last_step(), dim) part_extra = ( [shape, part_index, 1] if include_point_tagging else [shape, part_index] ) part_info.append(last_step + part_extra) if not len(part_info): return np.empty((0, 5 + include_point_tagging), dtype=dtype) return np.array(part_info, dtype=dtype)
[docs] def get_vertex_labels(particle_v, neutrino_v, meta, dtype): """Gets particle vertex coordinates. It provides the coordinates of points where multiple particles originate: - If `neutrino_v` is provided, it uses the neutrino interaction points. - If `particle_v` is provided instead, it looks for ancestor positions shared by at least two primary particles. Parameters ---------- particle_v : List[larcv.Particle] List of LArCV particle objects in the image neutrino_v : List[larcv.Neutrino] List of LArCV neutrino objects in the image meta : larcv::Voxel3DMeta or larcv::ImageMeta Metadata information dtype : str Typing of the output PPN labels Returns ------- np.array Array of points of shape (N, 4) where 4 = x, y, z, vertex_id """ # If the particles are provided, find unique ancestors vertexes = [] if particle_v is not None: # Fetch all ancestor positions of primary particles anc_positions = [] for i, p in enumerate(particle_v): if p.parent_id() == p.id() or p.ancestor_pdg_code() == 111: if image_contains(meta, p.ancestor_position()): anc_pos = image_coordinates(meta, p.ancestor_position()) anc_positions.append(anc_pos) # If there is no primary, nothing to do if not len(anc_positions): return np.empty((0, 4), dtype=dtype) # Find those that appear > once anc_positions = np.vstack(anc_positions) unique_positions, counts = np.unique(anc_positions, return_counts=True, axis=0) for i, idx in enumerate(np.where(counts > 1)[0]): vertexes.append([*unique_positions[idx], i]) # If the neutrinos are provided, straightforward if neutrino_v is not None: for i, n in enumerate(neutrino_v): if image_contains(meta, n.position()): nu_pos = image_coordinates(meta, n.position()) vertexes.append([*nu_pos, i]) # If there are no vertex, nothing to do if not len(vertexes): return np.empty((0, 4), dtype=dtype) return np.vstack(vertexes).astype(dtype)
[docs] def image_contains(meta, point, dim=3): """Checks whether a point is contained in the image box defined by meta. Parameters ---------- meta : larcv::Voxel3DMeta or larcv::ImageMeta Metadata information point : larcv::Point3D or larcv::Point2D Point to check on dim: int, default 3 Number of dimensions of the image Returns ------- bool True if the point is contained in the image box """ if dim == 3: return ( point.x() >= meta.min_x() and point.y() >= meta.min_y() and point.z() >= meta.min_z() and point.x() <= meta.max_x() and point.y() <= meta.max_y() and point.z() <= meta.max_z() ) else: return ( point.x() >= meta.min_x() and point.x() <= meta.max_x() and point.y() >= meta.min_y() and point.y() <= meta.max_y() )
[docs] def image_coordinates(meta, point, dim=3): """Returns the coordinates of a point in units of pixels with an image. Parameters ---------- meta : larcv::Voxel3DMeta or larcv::ImageMeta Metadata information point : larcv::Point3D or larcv::Point2D Point to convert the units of dim: int, default 3 Number of dimensions of the image Returns ------- bool True if the point is contained in the image box """ x, y, z = point.x(), point.y(), point.z() if dim == 3: x = (x - meta.min_x()) / meta.size_voxel_x() y = (y - meta.min_y()) / meta.size_voxel_y() z = (z - meta.min_z()) / meta.size_voxel_z() return [x, y, z] else: x = (x - meta.min_x()) / meta.size_voxel_x() y = (y - meta.min_y()) / meta.size_voxel_y() return [x, y]