Source code for spine.io.parse.clean_data

"""Module which contains functions used to clean up cluster data.

When loading :class:`larcv.Cluster3DVoxelTensor` objects into tensors,
there can be duplicate voxels. These routines are used to remove these
duplicates and ensure the ordering of the output.
"""

from __future__ import annotations

from typing import Literal, overload

import numba as nb
import numba.typed as nbt
import numpy as np

from spine.constants import SHAPE_COL, SHAPE_PREC


@overload
def clean_sparse_data(
    cluster_voxels: np.ndarray,
    cluster_data: np.ndarray,
    sparse_voxels: np.ndarray | None = None,
    sum_cols: np.ndarray | None = None,
    avg_cols: np.ndarray | None = None,
    prec_col: int | None = SHAPE_COL,
    precedence: np.ndarray | list[int] | tuple[int, ...] | None = SHAPE_PREC,
    *,
    return_index: Literal[False] = False,
) -> tuple[np.ndarray, np.ndarray]: ...


@overload
def clean_sparse_data(
    cluster_voxels: np.ndarray,
    cluster_data: np.ndarray,
    sparse_voxels: np.ndarray | None = None,
    sum_cols: np.ndarray | None = None,
    avg_cols: np.ndarray | None = None,
    prec_col: int | None = SHAPE_COL,
    precedence: np.ndarray | list[int] | tuple[int, ...] | None = SHAPE_PREC,
    *,
    return_index: Literal[True],
) -> tuple[np.ndarray, np.ndarray, np.ndarray]: ...


[docs] def clean_sparse_data( cluster_voxels: np.ndarray, cluster_data: np.ndarray, sparse_voxels: np.ndarray | None = None, sum_cols: np.ndarray | None = None, avg_cols: np.ndarray | None = None, prec_col: int | None = SHAPE_COL, precedence: np.ndarray | list[int] | tuple[int, ...] | None = SHAPE_PREC, *, return_index: bool = False, ) -> tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, np.ndarray]: """Clean and align cluster voxels against an optional sparse reference. This function does the following: 1. Lexicographically sort group data (images are lexicographically sorted) 2. Choose only one group per voxel (by lexicographic order or precedence) 3. Remove voxels from cluster data that are not in the image data (optional) The set of sparse voxels must be a subset of the set of cluster voxels and it must not contain any duplicates. If not provided, this function can also be used to remove duplicates when overlaying multiple images together. Parameters ---------- cluster_voxels : np.ndarray (N, 3) Matrix of voxel coordinates in the cluster3d tensor cluster_data : np.ndarray (N, F) Matrix of voxel values corresponding to each voxel in the cluster3d tensor sparse_voxels : np.ndarray, optional (M, 3) Matrix of voxel coordinates in the reference sparse tensor sum_cols : np.ndarray, optional List of feature columns to sum when removing duplicates avg_cols : np.ndarray, optional List of feature columns to average when removing duplicates prec_col : int, default SHAPE_COL Column in the input feature tensor to use as a precdence source precedence : np.ndarray or list[int], default SHAPE_PREC (C) Array of classes in the reference array, ordered by precedence return_index : bool, default False If ``True``, also return the selected row indexes into the original ``cluster_voxels`` / ``cluster_data`` arrays. Returns ------- cluster_voxels : np.ndarray (M, 3) Ordered and filtered set of voxel coordinates cluster_data : np.ndarray (M, F) Ordered and filtered set of voxel values index : np.ndarray, optional (M) Selected row indexes in the original input ordering. Only returned when ``return_index`` is ``True``. """ # Lexicographically sort cluster and sparse data perm = np.lexsort(cluster_voxels.T) cluster_voxels = cluster_voxels[perm] cluster_data = cluster_data[perm] # Find duplicates (and the groups they belong to) if prec_col is not None or sum_cols is not None or avg_cols is not None: if prec_col is not None and precedence is None: raise ValueError("Precedence must be provided if `prec_col` is not None.") reference = cluster_data[:, prec_col] if prec_col is not None else None precedence_list = nbt.List(precedence) if prec_col is not None else None # type: ignore[attr-defined] duplicate_mask, groups = filter_duplicate_voxels_group( cluster_voxels, reference, precedence_list ) else: duplicate_mask = filter_duplicate_voxels(cluster_voxels) groups = None # Sum the values of duplicate voxels, if requested if sum_cols is not None and groups is not None and len(groups) > 0: cluster_data = aggregate_sum_features(cluster_data, groups, sum_cols) # Average the values of duplicate voxels, if requested if avg_cols is not None and groups is not None and len(groups) > 0: cluster_data = aggregate_mean_features(cluster_data, groups, avg_cols) # Remove duplicates duplicate_index = np.where(duplicate_mask)[0] cluster_voxels = cluster_voxels[duplicate_index] cluster_data = cluster_data[duplicate_index] index = perm[duplicate_index] # Remove voxels not present in the sparse matrix, if needed if sparse_voxels is not None: perm = np.lexsort(sparse_voxels.T) sparse_voxels = sparse_voxels[perm] non_ref_mask = filter_voxels_ref(cluster_voxels, sparse_voxels) non_ref_index = np.where(non_ref_mask)[0] cluster_voxels = cluster_voxels[non_ref_index] cluster_data = cluster_data[non_ref_index] index = index[non_ref_index] if return_index: return cluster_voxels, cluster_data, index return cluster_voxels, cluster_data
[docs] @nb.njit(cache=True) def filter_duplicate_voxels(data: np.ndarray) -> np.ndarray: """Returns a mask of non-duplicate voxels. If there are multiple voxels with the same coordinates, this algorithm simply keeps the last one in the list. Parameters ---------- data: np.ndarray (N, 3) Lexicographically sorted matrix of voxel coordinates Returns ------- np.ndarray (N) Boolean mask which is False for pixels to remove """ # For each voxel, check if the next one shares its coordinates num_voxels = data.shape[0] mask = np.ones(num_voxels, dtype=np.bool_) for i in range(1, num_voxels): if np.all(data[i - 1] == data[i]): mask[i - 1] = False return mask
[docs] @nb.njit(cache=True) def filter_duplicate_voxels_group( data: np.ndarray, reference: np.ndarray | None = None, precedence: list[int] | None = None, ) -> tuple[np.ndarray, dict[int, np.ndarray]]: """Returns a mask of non-duplicate voxels and a list of duplicate groups. If there are multiple voxels with the same coordinates, this algorithm simply keeps the last one in the list. If a precedence is defined and there are multiple voxels with the same coordinates, this algorithm picks the voxel which has the label that comes first in order of precedence. If multiple voxels with the same precedence index share voxel coordinates, the last one is picked. The duplicate voxel groups map the chosen voxel indices to the set of voxels which share voxel coordinates. Parameters ---------- data: np.ndarray (N, 3) Lexicographically sorted matrix of voxel coordinates reference: np.ndarray, optional (N) Array of values which have to follow the precedence order precedence: list, optional (C) Array of classes in the reference array, ordered by precedence Returns ------- np.ndarray (N) Boolean mask which is False for pixels to remove dict[int, np.ndarray] Map between kept voxel indexes onto voxels which share the same coordinates """ # Precedence cannot be None if reference is not None if reference is not None and precedence is None: raise ValueError("Precedence must be provided if reference is provided.") # Find all the voxels which are duplicated and organize them in groups num_voxels = data.shape[0] mask = np.ones(num_voxels, dtype=np.bool_) tmp_list = nbt.List.empty_list(nb.int64) groups = [] for i in range(1, num_voxels): # Check if the current voxel matches the previous one same = np.all(data[i - 1] == data[i]) # If it does, create/append the duplicate group if same: if not tmp_list: tmp_list.extend([i - 1, i]) else: tmp_list.append(i) # If it does not, store an any existing duplicate group if tmp_list and (not same or i == num_voxels - 1): groups.append(np.asarray(tmp_list)) tmp_list = nbt.List.empty_list(nb.int64) # For each group, pick the voxel with the label that comes first # in order of precedence, track duplicate groups merge = nbt.Dict.empty(key_type=nb.int64, value_type=nb.int64[:]) for group in groups: if reference is not None and precedence is not None: # Order the voxels in the group by precedence ref = np.array([precedence.index(int(r)) for r in reference[group]]) args = np.argsort(-ref, kind="mergesort") # Preserve duplicate order # Modify the mask and store the group indexes mask[group[args[:-1]]] = False merge[group[args[-1]]] = group else: # Pick the last element in the group mask[group[:-1]] = False merge[group[-1]] = group return mask, merge
[docs] @nb.njit(cache=True) def filter_voxels_ref(data: np.ndarray, reference: np.ndarray) -> np.ndarray: """Removes voxels thsat do not appear in a reference tensor. Returns an array which does not contain any voxels which do not belong to the reference array. The reference array must contain a subset of the voxels in the array to be filtered. Assumes both arrays are lexicographically sorted, the reference matrix contains no duplicates and is a subset of the matrix to be filtered. Parameters ---------- data: np.ndarray (N, 3) Lexicographically sorted matrix of voxel coordinates to filter reference: np.ndarray (N, 3) Lexicographically sorted matrix of voxel coordinates to match Returns ------- np.ndarray (N) Boolean mask which is False for pixels to remove """ # Try to match each voxel in the data tensor to one in the reference tensor n_data, n_ref = data.shape[0], reference.shape[0] d, r = 0, 0 mask = np.ones(n_data, dtype=np.bool_) while d < n_data and r < n_ref: if np.all(data[d] == reference[r]): # Voxel is in both matrices d += 1 r += 1 else: # Voxel is in data, but not reference mask[d] = False d += 1 # Need to go through rest of data, if any is left while d < n_data: mask[d] = False d += 1 return mask
[docs] @nb.njit(cache=True) def aggregate_features( data: np.ndarray, groups: dict[int, np.ndarray], cols: np.ndarray ) -> np.ndarray: """Aggregate features by summing the requested columns over groups.""" return aggregate_sum_features(data, groups, cols)
[docs] @nb.njit(cache=True) def aggregate_sum_features( data: np.ndarray, groups: dict[int, np.ndarray], cols: np.ndarray ) -> np.ndarray: """Aggregate the information in pre-defined voxel groups. Parameters ---------- data : np.ndarray (N, F) Matrix of voxel features to aggregate groups : dict[int, np.ndarray] Map between kept voxel indexes onto voxels which share the same coordinates cols : np.ndarray List of feature columns to modify Returns ------- np.ndarray (N, F) Matrix of aggregated voxel features """ for col in cols: for idx, group in groups.items(): data[idx, col] = np.sum(data[group, col]) return data
[docs] @nb.njit(cache=True) def aggregate_mean_features( data: np.ndarray, groups: dict[int, np.ndarray], cols: np.ndarray ) -> np.ndarray: """Average the information in pre-defined voxel groups.""" for col in cols: for idx, group in groups.items(): data[idx, col] = np.mean(data[group, col]) return data