Source code for spine.utils.vertex

import numba as nb
import numpy as np

import spine.math as sm
from spine.constants import INTER_COL, PRINT_COL, TRACK_SHP, VTX_COLS


[docs] def get_vertex( start_points, end_points, directions, semantics, anchor_vertex=True, touching_threshold=2.0, return_mode=False, ): """Reconstruct the vertex of an individual interaction. Parameters ---------- start_points : np.ndarray (P, 3) Particle start points end_points : np.ndarray (P, 3) Particle end points directions : np.ndarray (P, 3) Particle directions semantics : np.ndarray (P) : particle semantic type anchor_vertex : bool, default True If true, anchor the candidate vertex to particle objects, with the expection of interactions only composed of showers. touching_threshold : float, default 2.0 Maximum distance for two particle points to be considered touching return_mode : bool, default False If `True`, return the method used to find the vertex """ # If there is no particle: return default values if not len(start_points): if return_mode: return np.full(3, -np.inf), "no_particle" return np.full(3, -np.inf) # If there is only one particle: choose the start point if len(start_points) == 1: if return_mode: return start_points[0], "single_start" return start_points[0] # If this more than one particle, anchor the vertex to a particle point track_mask = semantics == TRACK_SHP track_ids = np.where(track_mask)[0] if anchor_vertex: # If there is a unique point where >=2 particles meet, pick it. Include # track start and end points, to not rely on direction predictions vertices = get_confluence_points(start_points, end_points, touching_threshold) if len(vertices) == 1: if return_mode: return vertices[0], "confluence_nodir" return vertices[0] # If there is more than one option, restrict track end points to the # predicted start points (relies on direction prediction), check again if len(vertices) > 1 and len(track_ids): vertices = get_confluence_points( start_points, touching_threshold=touching_threshold ) if len(vertices) == 1: if return_mode: return vertices[0], "confluence_dir" return vertices[0] # If there are no obvious confluence points, but there is a single # track and N showers, pick the track end-point which minimizes the # cosine distance between the normalized vertex-to-point vectors # and the shower direction estimates. if len(track_ids) == 1: candidates = np.vstack([start_points[track_mask], end_points[track_mask]]) losses = angular_loss( candidates, start_points[~track_mask], directions[~track_mask] ) if return_mode: return candidates[np.argmin(losses)], "confluence_track_showers" return candidates[np.argmin(losses)] # If all else fails on track groups, simply pick the longest # track, take its starting point and hope for the best... if len(track_ids): track_lengths = np.linalg.norm( end_points[track_mask] - start_points[track_mask], axis=-1 ) if return_mode: return ( start_points[track_mask][np.argmax(track_lengths)], "track_length", ) return start_points[track_mask][np.argmax(track_lengths)] # If there is only showers (or the vertex is not anchored): find the point which # minimizes the sum of distances from the vertex w.r.t. to all direction lines. # TODO: Find the point which minimizes the angular difference between the # vertex-to-point vector and the shower direction estimates (much better). if return_mode: return get_pseudovertex(start_points, directions), "pseudo_vertex" return get_pseudovertex(start_points, directions)
[docs] @nb.njit(cache=True) def angular_loss( candidates: nb.float32[:, :], points: nb.float32[:, :], directions: nb.float32[:, :], use_cos: bool = True, ) -> nb.float32: """Computes the angular/cosine distance between vectors that join candidate points to the start points of particles and their respective direction estimates. Values are normalized between 0 (perfect fit) and 1 (complete disagreement). Parameters ---------- candidates : np.ndarray (C, 3) Vertex coordinates points : np.ndarray (P, 3) Particle start points directions : np.ndarray (P, 3) Particle directions use_cos : bool Whether or not to use the cosine as a metric Returns ------- np.ndarray (C) Loss for each of the candidates """ n_c = len(candidates) losses = np.empty(n_c, dtype=np.float32) for i, c in enumerate(candidates): loss = 0.0 for p, d in zip(points, directions): v = p - c v /= np.linalg.norm(v) if use_cos: loss += (1.0 - np.sum(v * d)) / 2 / n_c else: loss += np.arccos(np.sum(v * d)) / np.pi / n_c losses[i] = loss return losses
[docs] @nb.njit(cache=True) def get_confluence_points( start_points: nb.float32[:, :], end_points: nb.float32[:, :] = None, touching_threshold: nb.float32 = 2.0, ) -> nb.types.List(nb.float32[:]): """Find the points where multiple particles touch. Parameters ---------- start_points : np.ndarray (P, 3) Particle start points end_points : np.ndarray, optional (P, 3) Particle end points touching_threshold : float, default 2.0 Maximum distance for two particle points to be considered touching Returns ------- List[np.ndarray] List of vertices that correspond to the confluence points """ # Create a particle-to-particle distance matrix n_part = len(start_points) dist_mat = np.zeros((n_part, n_part), dtype=start_points.dtype) end_mat = np.zeros((n_part, n_part), dtype=np.int32) if end_points is None: for i, si in enumerate(start_points): for j, sj in enumerate(start_points): if j > i: dist_mat[i, j] = np.linalg.norm(sj - si) if j < i: dist_mat[i, j] = dist_mat[j, i] else: for i, (si, ei) in enumerate(zip(start_points, end_points)): pointsi = np.vstack((si, ei)) for j, (sj, ej) in enumerate(zip(start_points, end_points)): if j > i: pointsj = np.vstack((sj, ej)) submat = sm.distance.cdist(pointsi, pointsj) mini, minj = np.argmin(submat) // 2, np.argmin(submat) % 2 dist_mat[i, j] = submat[mini, minj] end_mat[i, j], end_mat[j, i] = mini, minj if j < i: dist_mat[i, j] = dist_mat[j, i] # Convert distance matrix to an adjacency matrix, compute # the square graphic to find the number of walks adj_mat = (dist_mat < touching_threshold).astype( np.float32 ) # @ does not like integers walk_mat = adj_mat @ adj_mat for i in range(n_part): walk_mat[i, i] = np.max(walk_mat[i][np.arange(n_part) != i]) # Find cycles to build particle groups and confluence points (vertices) leftover = np.ones(n_part, dtype=np.bool_) max_walks = sm.amax(walk_mat, axis=1) vertices = nb.typed.List.empty_list(np.empty(0, dtype=start_points.dtype)) while np.any(leftover): # Find the longest available cycle (must be at least 2 particles) left_ids = np.where(leftover)[0] max_id = left_ids[np.argmax(max_walks[leftover])] max_walk = max_walks[max_id] if max_walk < 2: break # Form the particle group that make up the cycle leftover[walk_mat[max_id] == max_walk] = False group = np.where(walk_mat[max_id] == max_walk)[0] # Take the barycenter of the touching particle ends as the vertex if end_points is None: vertices.append(sm.mean(start_points[group], axis=0)) else: vertex = np.zeros(3, dtype=start_points.dtype) for i, t in enumerate(group): end_id = np.argmax( np.bincount(end_mat[t][group][np.arange(len(group)) != i]) ) vertex += ( start_points[t] / len(group) if not end_id else end_points[t] / len(group) ) vertices.append(vertex) return vertices
[docs] @nb.njit(cache=True) def get_pseudovertex( start_points: nb.float32[:, :], directions: nb.float32[:, :], dim: int = 3 ) -> nb.float32[:]: """Finds the vertex which minimizes the total distance from itself to all the lines defined by the start points of particles and their directions. Parameters ---------- start_points : np.ndarray (P, 3) Particle start points directions : np.ndarray (P, 3) Particle directions dim : int Number of dimensions """ assert len(start_points), "Cannot reconstruct pseudovertex without points." if len(start_points) == 1: return start_points[0] S = np.zeros((dim, dim), dtype=start_points.dtype) C = np.zeros((dim,), dtype=start_points.dtype) for p, d in zip(start_points, directions): x = np.outer(d, d) - np.eye(dim, dtype=start_points.dtype) S += x C += x @ np.ascontiguousarray(p) if np.linalg.det(S) == 0.0: return sm.mean(start_points, axis=0) # TODO remove casting to float64 when the crash goes away # TODO go back to pinv when fixed and get rid of singularity check return np.linalg.inv(S.astype(np.float64)).astype(start_points.dtype) @ C
[docs] @nb.njit(cache=True) def get_weighted_pseudovertex( start_points: nb.float32[:, :], directions: nb.float32[:, :], weights: nb.float32[:], dim: int = 3, ) -> nb.float32[:]: """Finds the vertex which minimizes the total distance from itself to all the lines defined by the start points of particles and their directions. Parameters ---------- start_points : np.ndarray (P, 3) Particle start points directions : np.ndarray (P, 3) Particle directions dim : int Number of dimensions """ assert len(start_points), "Cannot reconstruct pseudovertex without points." if len(start_points) == 1: return start_points[0] S = np.zeros((dim, dim), dtype=start_points.dtype) C = np.zeros((dim,), dtype=start_points.dtype) for i, (p, d) in enumerate(zip(start_points, directions)): x = weights[i] * (np.outer(d, d) - np.eye(dim, dtype=start_points.dtype)) S += x C += x @ np.ascontiguousarray(p) if np.linalg.det(S) == 0.0: return sm.sum(weights * start_points, axis=0) / np.sum(weights) # TODO remove casting to float64 when the crash goes away # TODO go back to pinv when fixed and get rid of singularity check return np.linalg.inv(S.astype(np.float64)).astype(start_points.dtype) @ C