"""Module with a general-purpose geometry class.
This class supports the storage of:
- TPC boundaries
- Optical detector shape/locations
- CRT detector shape/locations
It also provides a plethora of useful functions to query the geometry.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import numpy as np
from scipy.spatial.distance import cdist
from .detector import Box, CRTDetector, OptDetector, TPCDetector
from .utils import normalize_version
[docs]
@dataclass
class Geometry(Box):
"""Handles all geometry functions for a collection of box-shaped TPCs with
a arbitrary set of optical detectors organized in optical volumes and
auxiliary CRT planes.
Attributes
----------
name : str
Name of the detector
tag : str
Tag or label for the geometry instance
version : int
Version number of the geometry
tpc : TPCDetector
TPC detector properties
optical : OptDetector, optional
Optical detector properties
crt : CRTDetector, optional
CRT detector properties
gdml : str, optional
GDML file name associated with the geometry
crs_files : List[str], optional
CRS (Charge Readout System) geometry file(s) reference (flow only)
lrs_file : str, optional
LRS (Light Readout System) geometry file reference (flow only)
"""
name: str
tag: str
version: str
tpc: TPCDetector
optical: OptDetector | None = None
crt: CRTDetector | None = None
gdml: str | None = None
crs_files: list[str] | None = None
lrs_file: str | None = None
[docs]
def __init__(
self,
name: str,
tag: str,
version: str,
tpc: dict[str, Any],
optical: dict[str, Any] | None = None,
crt: dict[str, Any] | None = None,
gdml: str | None = None,
crs_files: list[str] | None = None,
lrs_file: str | None = None,
) -> None:
"""Initialize the detector geometry.
Parameters
----------
name : str
Name of the detector
tag : str
Tag or label for the geometry instance
version : int
Version number of the geometry
tpc : dict
Detector boundary configuration
optical : dict, optional
Optical detector configuration
crt : dict, optional
CRT detector configuration
gdml : str, optional
GDML file name associated with the geometry
crs_files : str or list of str, optional
CRS (Cosmic Ray System) geometry file(s)
lrs_file : str, optional
LRS (Light Readout System) geometry file
Returns
-------
np.ndarray
Lower boundaries of the overall geometry
np.ndarray
Upper boundaries of the overall geometry
"""
# Store basic geometry information
self.name = name
self.tag = tag
self.version = normalize_version(version) or ""
self.gdml = gdml
self.crs_files = crs_files
self.lrs_file = lrs_file
# Load the charge detector boundaries
self.tpc = TPCDetector(**tpc)
lower = self.tpc.lower
upper = self.tpc.upper
# Load the optical detectors
if optical is not None:
self.optical = self.parse_optical(**optical)
lower = np.minimum(lower, self.optical.lower)
upper = np.maximum(upper, self.optical.upper)
# Load the CRT detectors
if crt is not None:
self.crt = CRTDetector(**crt)
lower = np.minimum(lower, self.crt.lower)
upper = np.maximum(upper, self.crt.upper)
# Initialize the parent Box
super().__init__(lower, upper)
[docs]
def parse_optical(self, volume: str, **optical: Any) -> OptDetector:
"""Parse the optical detector configuration.
Parameters
----------
volume : str
Optical volume boundaries (one of 'tpc' or 'module')
**optical : dict
Reset of the optical detector configuration
Returns
-------
OptDetector
Optical detector object
"""
# Get the number of optical volumes based on the the volume type
if volume not in ("module", "tpc"):
raise ValueError(
"Optical detector positions must be provided by TPC or module."
)
if volume == "module":
offsets = [module.center for module in self.tpc.modules]
else:
offsets = [chamber.center for chamber in self.tpc.chambers]
# Initialize the optical detector object
return OptDetector(volume, offsets, **optical)
[docs]
@dataclass
class ContDefinition:
"""Helper class to store containment volume definitions.
Attributes
----------
volumes : List[np.ndarray]
(N, 3, 2) set of volume boundaries
use_source : bool
Whether to use the source of the points to determine the volume
limit_normals : List[np.ndarray]
List of limit plane normals
limit_thresholds : List[float]
List of limit thresholds along the plane normal
"""
volumes: np.ndarray
use_source: bool
limit_normals: list[np.ndarray]
limit_thresholds: list[float]
[docs]
def get_boundaries(
self, with_optical: bool = True, with_crt: bool = True
) -> np.ndarray:
"""Fetch the overall geometry boundaries, optionally including
optical and CRT detectors.
Parameters
----------
with_optical : bool, default True
Whether to include optical detector boundaries
with_crt : bool, default True
Whether to include CRT detector boundaries
Returns
-------
np.ndarray
Lower and upper boundaries of the overall geometry
"""
# Get the TPC boundaries
lower = self.tpc.lower
upper = self.tpc.upper
# Expand to include optical and CRT detectors, if requested
if with_optical:
if self.optical is None:
raise ValueError(
"This geometry does not have optical detectors to include."
)
lower = np.minimum(lower, self.optical.lower)
upper = np.maximum(upper, self.optical.upper)
if with_crt:
if self.crt is None:
raise ValueError(
"This geometry does not have CRT detectors to include."
)
lower = np.minimum(lower, self.crt.lower)
upper = np.maximum(upper, self.crt.upper)
return np.vstack((lower, upper)).T
[docs]
def get_sources(self, sources: np.ndarray) -> np.ndarray:
"""Converts logical TPC indexes to physical TPC indexes.
Parameters
----------
sources : np.ndarray
(N, 2) Array of logical [module ID, tpc ID] pairs, one per point
Returns
----------
np.ndarray
(N, 2) Array of physical [module ID, tpc ID] pairs, one per point
"""
# If logical and physical TPCs are aligned, nothing to do
if self.tpc.det_ids is None:
return sources.astype(int)
# Otherwise, map logical to physical
sources = np.copy(sources)
sources[:, 1] = self.tpc.det_ids[sources[:, 1].astype(int)]
return sources
[docs]
def get_chambers(self, sources: np.ndarray) -> np.ndarray:
"""Converts logical TPC indexes to unique chamber indexes.
Parameters
----------
sources : np.ndarray
(N, 2) Array of logical [module ID, tpc ID] pairs, one per point
Returns
----------
np.ndarray
(N) Array of physical chamber indexes, one per point
"""
# Convert the physical indexes to unique chamber indexes
sources = self.get_sources(sources)
return sources[:, 0] * self.tpc.num_chambers_per_module + sources[:, 1]
[docs]
def get_contributors(self, sources: np.ndarray) -> list[np.ndarray]:
"""Gets the list of [module ID, tpc ID] pairs that contributed to an
object, as defined in this geometry.
Parameters
----------
sources : np.ndarray
(N, 2) Array of [module ID, tpc ID] pairs, one per point
Returns
-------
List[np.ndarray]
(2, N_t) Pair of arrays: the first contains the list of
contributing modules, the second of contributing tpcs.
"""
# Fetch the list of unique logical [module ID, tpc ID] pairs
sources = np.unique(sources, axis=0)
# If the logical TPCs differ from the physical TPCs, convert
if self.tpc.det_ids is not None:
sources = self.get_sources(sources)
sources = np.unique(sources, axis=0)
# Return as a list of physical [module ID, tpc ID] pairs
return list(sources.T)
[docs]
def get_volume_index(
self, sources: np.ndarray, module_id: int, tpc_id: int | None = None
) -> np.ndarray:
"""Gets the list of indices of points that belong to a certain
detector volume (module or individual TPC).
Parameters
----------
sources : np.ndarray
(N, 2) Array of [module ID, tpc ID] pairs, one per point
module_id : int
ID of the module
tpc_id : int, optional
ID of the TPC within the module. If not specified, the volume
offsets are estimated w.r.t. the module itself
Returns
-------
np.ndarray
(N) Index of points that belong to the requested detector volume
"""
# If the logical TPCs differ from the physical TPCs, convert
sources = self.get_sources(sources)
# Compute and return the index
if tpc_id is None:
return np.where(sources[:, 0] == module_id)[0]
else:
return np.where((sources == [module_id, tpc_id]).all(axis=-1))[0]
[docs]
def get_closest_tpc(self, points: np.ndarray) -> np.ndarray:
"""For each point, find the ID of the closest TPC.
There is a natural assumption that all TPCs are boxes of identical
sizes, so that the relative proximitity of a point to a TPC is
equivalent to its proximity to the TPC center.
Parameters
----------
points : np.ndarray
(N, 3) Set of point coordinates
Returns
-------
np.ndarray
(N) List of TPC indexes, one per input point
"""
# Get the TPC centers
centers = np.asarray([chamber.center for chamber in self.tpc.chambers])
# Compute the pair-wise distances between points and TPC centers
dists = cdist(points, centers)
# Return the closest center index as the closest centers
return np.argmin(dists, axis=1)
[docs]
def get_closest_module(self, points: np.ndarray) -> np.ndarray:
"""For each point, find the ID of the closest module.
There is a natural assumption that all modules are boxes of identical
sizes, so that the relative proximitity of a point to a module is
equivalent to its proximity to the module center.
Parameters
----------
points : np.ndarray
(N, 3) Set of point coordinates
Returns
-------
np.ndarray
(N) List of module indexes, one per input point
"""
# Get the module centers
centers = np.asarray([module.center for module in self.tpc.modules])
# Compute the pair-wise distances between points and module centers
dists = cdist(points, centers)
# Return the closest center index as the closest centers
return np.argmin(dists, axis=1)
[docs]
def get_closest_tpc_indexes(self, points: np.ndarray) -> list[np.ndarray]:
"""For each TPC, get the list of points that live closer to it than any
other TPC in the detector.
Parameters
----------
points : np.ndarray
(N, 3) Set of point coordinates
Returns
-------
List[np.ndarray]
List of index of points that belong to each TPC
"""
# Start by finding the closest TPC to each of the points
closest_ids = self.get_closest_tpc(points)
# For each TPC, append the list of point indices associated with it
tpc_indexes = []
for t in range(self.tpc.num_chambers):
tpc_indexes.append(np.where(closest_ids == t)[0])
return tpc_indexes
[docs]
def get_closest_module_indexes(self, points: np.ndarray) -> list[np.ndarray]:
"""For each module, get the list of points that live closer to it
than any other module in the detector.
Parameters
----------
points : np.ndarray
(N, 3) Set of point coordinates
Returns
-------
List[np.ndarray]
List of index of points that belong to each module
"""
# Start by finding the closest TPC to each of the points
closest_ids = self.get_closest_module(points)
# For each module, append the list of point indices associated with it
module_indexes = []
for m in range(self.tpc.num_modules):
module_indexes.append(np.where(closest_ids == m)[0])
return module_indexes
[docs]
def get_volume_offsets(
self, points: np.ndarray, module_id: int, tpc_id: int | None = None
) -> np.ndarray:
"""Compute how far each point is from a certain detector volume.
Parameters
----------
points : np.ndarray
(N, 3) : Point coordinates
module_id : int
ID of the module
tpc_id : int, optional
ID of the TPC within the module. If not specified, the volume
offsets are estimated w.r.t. the module itself
Returns
-------
np.ndarray
(N, 3) Offsets w.r.t. to the volume boundaries
"""
# Compute the axis-wise distances of each point to each boundary
module = self.tpc[module_id]
if tpc_id is None:
boundaries = module.boundaries
else:
boundaries = module[tpc_id].boundaries
dists = points[..., None] - boundaries
# If a point is between two boundaries, the distance is 0. If it is
# outside, the distance is that of the closest boundary
signs = (np.sign(dists[..., 0]) + np.sign(dists[..., 1])) / 2
offsets = signs * np.min(np.abs(dists), axis=-1)
return offsets
[docs]
def get_min_volume_offset(
self, points: np.ndarray, module_id: int, tpc_id: int | None = None
) -> np.ndarray:
"""Get the minimum offset to apply to a point cloud to bring it
within the boundaries of a volume.
Parameters
----------
points : np.ndarray
(N, 3) : Point coordinates
module_id : int
ID of the module
tpc_id : int, optional
ID of the TPC within the module. If not specified, the volume
offsets are estimated w.r.t. the module itself
Returns
-------
np.ndarray
(3) Offsets w.r.t. to the volume location
"""
# Compute the distance for each point, get the maximum necessary offset
offsets = self.get_volume_offsets(points, module_id, tpc_id)
offsets = offsets[np.argmax(np.abs(offsets), axis=0), np.arange(3)]
return offsets
[docs]
def translate(
self,
points: np.ndarray,
source_id: int,
target_id: int,
factor: float | np.ndarray | None = None,
) -> np.ndarray:
"""Moves a point cloud from one module to another one
Parameters
----------
points : np.ndarray
(N, 3) Set of point coordinates
source_id: int
Module ID from which to move the point cloud
target_id : int
Module ID to which to move the point cloud
factor : Union[float, np.ndarray], optional
Multiplicative factor to apply to the offset. This is necessary if
the points are not expressed in detector coordinates
Returns
-------
np.ndarray
(N, 3) Set of translated point coordinates
"""
# If the source and target are the same, nothing to do here
if target_id == source_id:
return np.copy(points)
# Fetch the inter-module shift
offset = self.tpc[target_id].center - self.tpc[source_id].center
if factor is not None:
offset *= factor
# Translate
return points + offset
[docs]
def split(
self,
points: np.ndarray,
target_id: int,
sources: np.ndarray | None = None,
meta: Any | None = None,
) -> tuple[np.ndarray, list[np.ndarray]]:
"""Migrate all points to a target module, organize them by module ID.
Parameters
----------
points : np.ndarray
(N, 3) Set of point coordinates
target_id : int
Module ID to which to move the point cloud
sources : np.ndarray, optional
(N, 2) Array of [module ID, tpc ID] pairs, one per voxel
meta : Meta, optional
Meta information about the voxelized image. If provided, the
points are assumed to be provided in voxel coordinates.
Returns
-------
np.ndarray
(N, 3) Shifted set of points
List[np.ndarray]
List of index of points that belong to each module
"""
# Check that the target ID exists
if target_id < 0 or target_id >= self.tpc.num_modules:
raise ValueError("Target ID should be in [0, N_modules[.")
# Get the module ID of each of the input points
convert = False
if sources is not None:
# If sources are provided, simply use that
module_indexes = []
for m in range(self.tpc.num_modules):
module_indexes.append(np.where(sources[:, 0] == m)[0])
else:
# If the points are expressed in pixel coordinates, translate
if meta is not None:
convert = True
points = meta.to_cm(points, center=True)
# If not provided, find module each point belongs to by proximity
module_indexes = self.get_closest_module_indexes(points)
# Now shifts all points that are not in the target
for module_id, module_index in enumerate(module_indexes):
# If this is the target module, nothing to do here
if module_id == target_id:
continue
# Shift the coordinates
points[module_index] = self.translate(
points[module_index], module_id, target_id
)
# Bring the coordinates back to pixels, if they were shifted
if meta is not None and convert:
points = meta.to_px(points, floor=True)
return points, module_indexes
[docs]
def check_containment(
self,
definition: ContDefinition,
points: np.ndarray,
sources: np.ndarray | None = None,
allow_multi_module: bool = False,
summarize: bool = True,
) -> bool | np.ndarray:
"""Check whether a point cloud comes within some distance of the boundaries
of a certain subset of detector volumes, depending on the mode.
Must define containment volumes first using `define_containment_volumes`.
Parameters
----------
definition : ContDefinition
Containment volume definition to check against
points : np.ndarray
(N, 3) Set of point coordinates
sources : np.ndarray, optional
(S, 2) : List of [module ID, tpc ID] pairs that created the
point cloud
allow_multi_module : bool, default `False`
Whether to allow points to span multiple modules
summarize : bool, default `True`
If `True`, only returns a single flag for the whole cloud.
Otherwise, returns a boolean array corresponding to each point.
Returns
-------
Union[bool, np.ndarray]
`True` if the points are contained, `False` if not
"""
# If sources are provided, only consider source volumes
if definition.use_source:
# Get the contributing TPCs
if sources is None or len(points) != len(sources):
raise ValueError(
"Need to provide sources to make a source-based check."
)
contributors = self.get_contributors(sources)
if not allow_multi_module and len(np.unique(contributors[0])) > 1:
return False
# Define the smallest box containing all contributing TPCs
# TODO: this is not ideal, should check each source separately?
index = contributors[0] * self.tpc.num_chambers_per_module + contributors[1]
volume = self.merge_volumes(definition.volumes[index])
volumes = [volume]
else:
volumes = definition.volumes
# Loop over volumes, make sure the cloud is contained in at least one
if summarize:
contained = False
for v in volumes:
if (points > v[:, 0]).all() and (points < v[:, 1]).all():
contained = True
break
else:
contained = np.zeros(len(points), dtype=bool)
for v in volumes:
contained |= (points > v[:, 0]).all(axis=1) & (points < v[:, 1]).all(
axis=1
)
# If requested, check points against the active volume limits
if not summarize or contained:
for normal, threshold in zip(
definition.limit_normals, definition.limit_thresholds
):
active = np.dot(points, normal) < threshold
if summarize:
active = bool(active.all())
contained &= active
return contained
[docs]
def define_containment_volumes(
self,
margin: float | list[float] | np.ndarray,
cathode_margin: float | None = None,
mode: str = "module",
include_limits: bool = True,
) -> ContDefinition:
"""This function defines a list of volumes to check containment against.
If the containment is checked against a constant volume, it is a lot
more efficient to call this function once and call `check_containment`
reapitedly after.
Parameters
----------
margin : Union[float, List[float], np.array]
Minimum distance from a detector wall to be considered contained.
A scalar applies the same buffer to all six walls. A three-element
array applies one shared distance per axis. A `(3, 2)` array
specifies the lower and upper margin independently for each axis.
cathode_margin : float, optional
If specified, sets a different margin for the cathode boundaries
mode : str, default 'module'
Containment criterion. `tpc` requires containment within a single
TPC, `module` within a single module, `detector` within the full
detector volume and `source` uses voxel provenance to determine the
contributing TPCs and define the containment volumes accordingly.
include_limits : bool, default True
If `True`, the TPC active region limits are checked against
Returns
-------
ContDefinition
Object containing the list of containment volumes, and other
information to check containment against
"""
# Translate the margin parameter to a (3,2) matrix
margin = np.array(margin)
if len(margin.shape) == 0:
margin = np.full((3, 2), margin)
elif len(margin.shape) == 1:
if len(margin) != 3:
raise ValueError("Must provide one value per axis.")
margin = np.tile(margin, (2, 1)).T
else:
if np.array(margin).shape != (3, 2):
raise ValueError("Must provide two values per axis.")
# Establish the volumes to check against
cont_volumes = []
cont_use_source = False
if mode in ["tpc", "source"]:
for m, module in enumerate(self.tpc):
for t, tpc in enumerate(module):
vol = self.adapt_volume(
tpc.boundaries, margin, cathode_margin, m, t
)
cont_volumes.append(vol)
cont_use_source = mode == "source"
elif mode == "module":
for module in self.tpc:
vol = self.adapt_volume(module.boundaries, margin)
cont_volumes.append(vol)
elif mode == "detector":
vol = self.adapt_volume(self.tpc.boundaries, margin)
cont_volumes.append(vol)
else:
raise ValueError(f"Containement check mode not recognized: {mode}.")
cont_volumes = np.array(cont_volumes)
# Establish active region limits to check against, if requested
limit_normals, limit_thresholds = [], []
if include_limits and self.tpc.limits is not None:
if not np.all(margin == margin[0, 0]):
raise ValueError(
"No clear way to include active region limit checks when "
"margins are different in different axes. Abort."
)
limit_margin = margin[0, 0]
for limit in self.tpc.limits:
limit_normals.append(limit.norm)
limit_thresholds.append(limit.boundary - limit_margin)
# Return containment conditions as a ContDefinition object
return self.ContDefinition(
volumes=cont_volumes,
use_source=cont_use_source,
limit_normals=limit_normals,
limit_thresholds=limit_thresholds,
)
[docs]
def adapt_volume(
self,
ref_volume: np.ndarray,
margin: np.ndarray,
cathode_margin: float | None = None,
module_id: int | None = None,
tpc_id: int | None = None,
) -> np.ndarray:
"""Apply margins from a given volume. Takes care of subtleties
associated with the cathode, if needed.
Parameters
----------
ref_volume : np.ndarray
(3, 2) Array of volume boundaries
margin : np.ndarray
Minimum distance from a detector wall to be considered contained as
[[x_low,x_up], [y_low,y_up], [z_low,z_up]], i.e. distance is
specified individually of each wall.
cathode_margin : float, optional
If specified, sets a different margin for the cathode boundaries
module_id : int, optional
ID of the module
tpc_id : int, optional
ID of the TPC within the module
Returns
-------
np.ndarray
(3, 2) Updated array of volume boundaries
"""
# Reduce the volume according to the margin
volume = np.copy(ref_volume)
volume[:, 0] += margin[:, 0]
volume[:, 1] -= margin[:, 1]
# If a cathode margin is provided, adapt the cathode wall differently
if cathode_margin is not None:
if module_id is None or tpc_id is None:
raise ValueError(
"Module and TPC ID must be provided when using a cathode margin."
)
axis = self.tpc[module_id][tpc_id].drift_axis
side = self.tpc[module_id][tpc_id].cathode_side
flip = (-1) ** side
volume[axis, side] += flip * (cathode_margin - margin[axis, side])
return volume
[docs]
@staticmethod
def merge_volumes(volumes: np.ndarray) -> np.ndarray:
"""Given a list of volumes and their boundaries, find the smallest box
that encompass all volumes combined.
Parameters
----------
volumes : np.ndarray
(N, 3, 2) List of volume boundaries
Returns
-------
np.ndarray
(3, 2) Boundaries of the combined volume
"""
volume = np.empty((3, 2))
volume[:, 0] = np.min(volumes, axis=0)[:, 0]
volume[:, 1] = np.max(volumes, axis=0)[:, 1]
return volume