Source code for spine.utils.tracking

import numba as nb
import numpy as np
from scipy.interpolate import UnivariateSpline

import spine.math as sm


[docs] def get_track_length( coordinates: nb.float32[:, :], segment_length: nb.float32 = None, point: nb.float32[:] = None, method: str = "bin_pca", anchor_point: bool = True, min_count: int = 10, spline_smooth: float = None, ) -> nb.float32: """Given a set of point coordinates associated with a track and one of its end points, compute its length. Parameters ---------- coordinates : np.ndarray (N, 3) Coordinates of the points that make up the track segment_length : float, optional Segment length in the units that specify the coordinates point : np.ndarray, optional (3) An end point of the track method : str, default 'bin_pca' Method used to compute the track length (one of 'displacement', 'step', 'step_next', 'bin_pca' or 'spline') anchor_point : bool, default True Weather or not to collapse end point onto the closest track point min_count : int, default 10 Minimum number of points in a segment to use it to evaluate the direction of the next step along the track. spline_smooth : float, optional The smoothing factor to be used in spline regression, when used Returns ------- float Total length of the track """ if method == "displacement": # Project points along the principal component, compute displacement track_dir = sm.decomposition.principal_components(coordinates)[0] pcoordinates = np.dot(coordinates, track_dir) return np.max(pcoordinates) - np.min(pcoordinates) if method in ["step", "step_next", "bin_pca"]: # Segment the track and sum the segment lengths seg_lengths = get_track_segments( coordinates, segment_length, point, method, anchor_point, min_count )[-1] return np.sum(seg_lengths) elif method == "splines": # Fit point along the track with a spline, compute spline length return get_track_spline(coordinates, segment_length, spline_smooth)[-1] else: raise ValueError(f"Track length estimation method not recognized: `{method}`.")
[docs] @nb.njit(cache=True) def check_track_orientation( coordinates: nb.float32[:, :], values: nb.float32[:], start_point: nb.float32[:], end_point: nb.float32[:], method: str = "local", anchor_points: bool = True, local_radius: nb.float32 = 5, segment_method: str = "step_next", segment_length: nb.float32 = 5, segment_min_count: int = 10, ) -> bool: """Given a set of track point coordinates and the track end points, checks which end point is most likely to be the correct start, based on the rate of energy deposition in the track. Parameters ---------- coordinates : np.ndarray (N, 3) Coordinates of the points that make up the track values : np.ndarray (N) Values associated with each point start_point : np.ndarray (3) Start point of the track end_point : np.ndarray (3) End point of the track method : str, default 'local' Method used to orient the track (one of 'local', 'gradient') local_radius : float, default 5 Radius around the end points to used to evaluate the local dE/dx anchor_points : bool, default True Weather or not to collapse end point onto the closest track point segment_method : str, default 'step_next' Method used to segment the track when using the 'gradient' method segment_length : float, default 5 Segment length when using the 'gradient' method segment_min_count : int, default 10 Minimum number of points in a segment when using the 'gradient' method Returns ------- bool Returns `True` if the start point provided is correct, `False` if the end point is more likely to be the start point. """ if method == "local": # If requested, anchor the end points to the closest track points end_points = np.vstack((start_point, end_point)) if anchor_points: dist_mat = sm.distance.cdist(end_points, coordinates) end_ids = sm.argmin(dist_mat, axis=1) end_points = coordinates[end_ids] # Compute the local dE/dx around each end, pick the end with the lowest start_dedx = np.sum(values[dist_mat[0] < local_radius]) / local_radius end_dedx = np.sum(values[dist_mat[1] < local_radius]) / local_radius return start_dedx < end_dedx elif method == "gradient": # Compute the track gradient with respect to either ends grad_start = get_track_deposition_gradient( coordinates, values, start_point, segment_length, segment_method, anchor_points, segment_min_count, )[0] grad_end = get_track_deposition_gradient( coordinates, values, end_point, segment_length, segment_method, anchor_points, segment_min_count, )[0] # Compute the deposition gradient as an average of the two gradient = (grad_start - grad_end) / 2.0 return bool(gradient >= 0.0) else: raise ValueError("Track orientation method not recognized")
[docs] @nb.njit(cache=True) def get_track_deposition_gradient( coordinates: nb.float32[:, :], values: nb.float32[:], start_point: nb.float32[:], segment_length: nb.float32 = 5.0, method: str = "step_next", anchor_point: bool = True, min_count: int = 10, ) -> (nb.float32, nb.float32[:], nb.float32[:], nb.float32[:]): """Given a set of point coordinates and their values associated with a track and a start point, compute the deposition gradient with respect to the start point. Parameters ---------- coordinates : np.ndarray (N, 3) Coordinates of the points that make up the track values : np.ndarray (N) Values associated with each point start_point : np.ndarray (3) End point of the track segment_length : float, default 5 Segment length in the units that specify the coordinates method : str, default 'step_next' Method used to segment the track (one of 'step', 'step_next' or 'bin_pca') anchor_point : bool, default True Weather or not to collapse end point onto the closest track point min_count : int, default 10 Minimum number of points in a segment for it to be valid. If not valid, the dedx value for that segment is not used to compute the gradient. Returns ------- gradient : float Deposition gradient along the track from the start point segment_dedxs : np.ndarray (S) Array of energy/charge deposition rate values segment_rrs : np.ndarray (S) Array of residual ranges (center of the segment w.r.t. end point) segment_lengths : np.ndarray (S) Array of segment lengths """ # Compute the track segment dedxs seg_dedxs, _, seg_rrs, _, _, seg_lengths = get_track_segment_dedxs( coordinates, values, start_point, segment_length, method, anchor_point, min_count, ) valid_index = np.where(seg_dedxs > -1)[0] if not len(valid_index): return 0.0, seg_dedxs, seg_rrs, seg_lengths seg_dedxs = seg_dedxs[valid_index] seg_rrs = seg_rrs[valid_index] # Compute the dE/dx gradient = Cov(x,y) / Var(x) cov = np.cov(seg_rrs, seg_dedxs) gradient = cov[0, 1] / cov[0, 0] if cov[0, 0] > 0.0 else 0.0 return gradient, seg_dedxs, seg_rrs, seg_lengths
[docs] @nb.njit(cache=True) def get_track_segment_dedxs( coordinates: nb.float32[:, :], values: nb.float32[:], end_point: nb.float32[:] = None, segment_length: nb.float32 = 5.0, method: str = "step_next", anchor_point: bool = True, min_count: int = 10, ) -> ( nb.float32[:], nb.float32[:], nb.float32[:], nb.float32[:], nb.float32[:], nb.float32[:], ): """Given a set of point coordinates and their values associated with a track and one of its end points, compute the energy/charge deposition rate as a function of the residual range. Parameters ---------- coordinates : np.ndarray (N, 3) Coordinates of the points that make up the track values : np.ndarray (N) Values associated with each point end_point : np.ndarray, optional (3) End point of the track segment_length : float, default 5. Segment length in the units that specify the coordinates method : str, default 'step_next' Method used to segment the track (one of 'step', 'step_next' or 'bin_pca') anchor_point : bool, default True Weather or not to collapse end point onto the closest track point min_count : int, default 10 Minimum number of points in a segment for it to be valid. If not valid, the dedx value returned for the segment is -1. Returns ------- seg_dedxs : np.ndarray (S) Array of energy/charge deposition rate values seg_errs : np.ndarray (S) Array of uncertainties on the energy/charge deposition rate seg_rrs : np.ndarray (S) Array of residual ranges (center of the segment w.r.t. end point) seg_clusts : List[np.ndarray] (S) List of indexes which correspond to each segment cluster of points seg_dirs : np.ndarray (S, 3) Array of segment direction vectors seg_lengths : np.ndarray (S) Array of segment lengths """ # Get the segment indexes and their lengths seg_clusts, seg_dirs, seg_lengths = get_track_segments( coordinates, segment_length, end_point, method, anchor_point, min_count ) # Compute the dQdxs and residual ranges seg_dedxs = np.empty(len(seg_clusts), dtype=np.float32) seg_errs = np.empty(len(seg_clusts), dtype=np.float32) seg_rrs = np.empty(len(seg_clusts), dtype=np.float32) residual_range = 0.0 for i, seg in enumerate(seg_clusts): # Compute the rate of energy/charge deposition # If the segment has insufficient content, return dummy values dx = seg_lengths[i] if len(seg) >= min_count and dx > 0.0: de = np.sum(values[seg]) dde = np.std(values[seg]) * np.sqrt(len(seg)) seg_dedxs[i] = de / dx seg_errs[i] = dde / dx else: seg_dedxs[i] = -1.0 seg_errs[i] = -1.0 # Compute the residual_range seg_rrs[i] = residual_range + dx / 2.0 residual_range += dx return seg_dedxs, seg_errs, seg_rrs, seg_clusts, seg_dirs, seg_lengths
[docs] @nb.njit(cache=True) def get_track_segments( coordinates: nb.float32[:, :], segment_length: nb.float32, point: nb.float32[:] = None, method: str = "step_next", anchor_point: bool = True, min_count: int = 10, ) -> (nb.types.List(nb.int64[:]), nb.float32[:, :], nb.float64[:]): """Given a set of point coordinates associated with a track and one of its end points, divide the track into segments of the requested length. Parameters ---------- coordinates : np.ndarray (N, 3) Coordinates of the points that make up the track segment_length : float Segment length in the units that specify the coordinates point : np.ndarray, optional (3) A preferred end point of the track from which to start method : str, default 'step_next' Method used to segment the track (one of 'step', 'step_next' or 'bin_pca') anchor_point : bool, default True Weather or not to collapse end point onto the closest track point min_count : int, default 10 Minimum number of points in a segment to use it to evaluate the direction of the next step along the track. Returns ------- segment_clusts : List[np.ndarray] (S) List of indexes which correspond to each segment cluster of points segment_dirs : np.ndarray (S, 3) Array of segment direction vectors segment_lengths : np.ndarray (S) Array of segment lengths """ if method == "step" or method == "step_next": # Determine which point to start stepping from if point is not None: start_point = point if anchor_point: start_id = np.argmin( sm.distance.cdist(np.atleast_2d(point), coordinates) ) start_point = coordinates[start_id] else: # If not specified, pick a random end point of the track start_id = sm.distance.farthest_pair(coordinates)[0] start_point = coordinates[start_id] # Step through the track iteratively empty_list = nb.typed.List.empty_list seg_start = np.copy(start_point) seg_clusts = empty_list(np.empty(0, dtype=np.int64)) seg_dirs_l = empty_list(np.empty(0, dtype=coordinates.dtype)) seg_lengths_l = empty_list(nb.float64) left_index = np.arange(len(coordinates)) while len(left_index): # Compute distances from the segment start point to the all # the leftover points dists = sm.distance.cdist( np.atleast_2d(seg_start), coordinates[left_index] ).flatten() # Select the points that belong to this segment dist_mask = dists <= segment_length pass_index = np.where(dist_mask)[0] fail_index = np.where(~dist_mask)[0] seg_index = left_index[pass_index] # If the next closest point is backwards, make it the last segment if len(fail_index): next_id = np.argmin(dists[fail_index]) next_index = left_index[fail_index][next_id] next_closest = coordinates[next_index] if np.dot(next_closest - seg_start, seg_start - start_point) < 0.0: fail_index = fail_index[:0] if not len(pass_index): break # Estimate the direction of the segment seg_coords = coordinates[seg_index] if ( method == "step" and len(seg_index) > min_count and np.max(dists[pass_index]) > 0.0 ): # Estimate direction w.r.t. the segment start point ('step') direction = sm.mean(seg_coords - seg_start, axis=0) elif len(fail_index): # Take direction as the vector joining the next closest point # ('step_next'). Also apply this method is the `min_count` # condition is not satisfied for this segment. direction = next_closest - seg_start else: # If this is the last segment, find the farthest point from # its start and make it the last point. last_id = np.argmax(dists[pass_index]) direction = seg_coords[last_id] - seg_start if np.linalg.norm(direction): direction /= np.linalg.norm(direction) else: direction = np.array([1.0, 0.0, 0.0], dtype=coordinates.dtype) # Compute the segment length. If it's the last segment, # track the distance to the fathest point in the segment if len(fail_index): length = segment_length else: last_id = np.argmax(dists[pass_index]) to_end = seg_coords[last_id] - seg_start length = np.dot(to_end, direction) # Step the segment start point in the direction of the segment seg_start += length * direction # Update the leftover index left_index = left_index[fail_index] # Store the segment information seg_clusts.append(seg_index) seg_dirs_l.append(direction) seg_lengths_l.append(length) # Convert lists of directions and lengths to numpy.ndarray objects shape = (len(seg_dirs_l), coordinates.shape[1]) seg_dirs = np.empty(shape, dtype=coordinates.dtype) seg_lengths = np.empty(shape[0], dtype=coordinates.dtype) for i in range(len(seg_clusts)): seg_dirs[i] = seg_dirs_l[i] seg_lengths[i] = seg_lengths_l[i] return seg_clusts, seg_dirs, seg_lengths elif method == "bin_pca": # Find the principal component of the whole track track_dir = sm.decomposition.principal_components(coordinates)[0] # If a track end point is provided, check which end the track end point # is on and flip the principal axis coordinates, if needed pcoordinates = np.dot(coordinates, track_dir) if point is not None: pstart = np.dot(point, track_dir) if np.abs(np.min(pcoordinates) - pstart) > np.abs( np.max(pcoordinates) - pstart ): pstart = -pstart pcoordinates = -pcoordinates # Bin the track along the principal component vector if point is not None: boundaries = np.arange( min(pstart, np.min(pcoordinates)), np.max(pcoordinates), segment_length ) else: boundaries = np.arange( np.min(pcoordinates), np.max(pcoordinates), segment_length ) seg_labels = np.digitize(pcoordinates, boundaries) seg_clusts = nb.typed.List.empty_list(np.empty(0, dtype=np.int64)) for l in range(len(boundaries)): seg_clusts.append(np.where(seg_labels == l + 1)[0]) # Compute the segment directions and lengths seg_dirs = np.empty( (len(seg_clusts), coordinates.shape[1]), dtype=coordinates.dtype ) seg_lengths = np.empty(len(seg_clusts), dtype=coordinates.dtype) for i, seg in enumerate(seg_clusts): # If this segment is empty, use track-level information if not len(seg): seg_dirs[i] = track_dir seg_lengths[i] = segment_length continue # Compute principal component of the segment, use it as direction if len(seg) > min_count: direction = sm.decomposition.principal_components(coordinates[seg])[0] if np.dot(direction, track_dir) < 0.0: direction = -direction else: direction = track_dir seg_dirs[i] = direction # Evaluate length of the segment as constrained by the track # principal axis bin that defines it if i < len(seg_clusts) - 1: length = segment_length else: length = np.max(pcoordinates) - boundaries[-1] costh = np.dot(direction, track_dir) if costh != 0.0: length /= costh seg_lengths[i] = length return seg_clusts, seg_dirs, seg_lengths else: raise ValueError("Track segmentation method not recognized")
[docs] def get_track_spline(coordinates, segment_length, s=None): """Estimate the best approximating curve defined by a point cloud using univariate 3D splines. The length is computed by measuring the length of the piecewise linear interpolation of the spline at points defined by the bin size. Parameters ---------- coordinatea : np.ndarray (N, 3) point cloud segment_length : float The subdivision length at which to sample points from the spline. If the track length is less than the segment_length, then the returned length will be computed from the farthest two projected points along the track's principal direction. s : float, optional The smoothing factor to be used in spline regression, by default None Returns ------- u : np.ndarray (N) The principal axis parametrization of the curve C(u) = (spx(u), spy(u), spz(u)) sppoints : np.ndarray (N, 3) The graph of the spline at points u splines : scipy.interpolate.UnivariateSpline Approximating splines for the point cloud defined by points length : float The estimate of the total length of the curve """ # Compute the principal component along which to segment the track track_dir = sm.decomposition.principal_components(coordinates)[0] pcoords = np.dot(coordinates, track_dir) perm = np.argsort(pcoords.squeeze()) u = pcoords[perm] # If there is less than four points, cannot fit a 3D spline if len(coordinates) < 4: # Fall back on displacement to estimate length length = np.max(pcoords) - np.min(pcoords) return u.squeeze, None, None, length # Compute the univariate splines along each axis spx = UnivariateSpline(u, coordinates[perm][:, 0], s=s) spy = UnivariateSpline(u, coordinates[perm][:, 1], s=s) spz = UnivariateSpline(u, coordinates[perm][:, 2], s=s) sppoints = np.hstack([spx(u), spy(u), spz(u)]) splines = [spx, spy, spz] # If track length is less than segment_length, just return length. # Otherwise estimate length by piecewise linear interpolation start, end = u.min(), u.max() length = end - start if length > segment_length: bins = np.arange(u.min(), u.max(), segment_length) bins = np.hstack([bins, np.array([u.max()])]) pt_approx = np.hstack([sp(bins).reshape(-1, 1) for sp in splines]) segments = np.linalg.norm(pt_approx[1:] - pt_approx[:-1], axis=1) length = segments.sum() return u.squeeze(), sppoints, splines, length