spine.driver
SPINE driver class.
Takes care of everything in one centralized place: - Data loading - ML model and loss forward pass - Batch unwrapping - Representation building - Post-processing - Analysis script execution - Writing output to file
Classes
|
Central SPINE driver. |
- class spine.driver.Driver(cfg: dict[str, Any], rank: int | None = None)[source]
Central SPINE driver.
- Processes global configuration and runs the appropriate modules:
Load data
Run the model forward (including loss) and backward (if training)
Unwrap batched data
Build representations
Run post-processing
Run analysis scripts
Write to file
It takes a configuration dictionary of the form:
base: <Base driver configuration> geo: <Geometry configuration> io: <Input/output configuration> model: <Model architecture> build: <Rules as to how to build reconstructed object representations> post: <Post-processors> ana: <Analysis scripts>
Methods
apply_filter([n_entry, n_skip, entry_list, ...])Restrict the list of entries.
cleanup()Close output resources owned by the driver.
Extract and validate the base keys owned by
Driver.initialize_ana([ana])Initialize analysis scripts.
initialize_base(seed, world_size[, dtype, ...])Initialize the driver state derived from the
baseblock.initialize_builder([build])Initialize reconstructed/truth representation building.
initialize_geo([geo])Initialize the detector geometry singleton.
initialize_io(io)Initialize the input/output manager.
Initialize CSV and optional TensorBoard logging backends.
initialize_model([model, train])Initialize the model manager, if requested.
initialize_post([post])Initialize post-processing modules.
log(data, tstamp, iteration[, epoch])Log relevant information to CSV files and stdout.
normalize_seed_config(base, io)Normalize driver and sampler seed configuration in place.
process([entry, run, subrun, event, ...])Process one entry or a batch of entries.
process_config([io, base, geo, model, ...])Normalize the configuration and record the resolved state.
run()Loop over the requested number of iterations, process them.
should_log_stdout(iteration)Return
Truewhen a formatted stdout summary should be emitted.- DRIVER_BASE_KEYS: frozenset[str] = frozenset({'csv_buffer_size', 'distributed', 'dtype', 'epochs', 'iterations', 'log_dir', 'log_step', 'overwrite_log', 'parent_path', 'prefix_log', 'seed', 'split_output', 'tensorboard', 'train', 'unwrap', 'world_size'})
- RUNTIME_BASE_KEYS = frozenset({'gpus', 'torch_sharing_strategy', 'verbosity'})
- process_config(io: dict[str, Any] | None = None, base: dict[str, Any] | None = None, geo: dict[str, Any] | None = None, model: dict[str, Any] | None = None, build: dict[str, Any] | None = None, post: dict[str, Any] | None = None, ana: dict[str, Any] | None = None, rank: int | None = None) tuple[dict[str, Any], dict[str, Any], dict[str, Any] | None, dict[str, Any] | None, dict[str, Any] | None, dict[str, Any] | None, dict[str, Any] | None][source]
Normalize the configuration and record the resolved state.
- Parameters:
io (dict[str, Any] | None, optional) – I/O configuration dictionary. This section is mandatory.
base (dict[str, Any] | None, optional) – Base driver configuration dictionary.
geo (dict[str, Any] | None, optional) – Geometry configuration dictionary.
model (dict[str, Any] | None, optional) – Model configuration dictionary.
build (dict[str, Any] | None, optional) – Representation-building configuration dictionary.
post (dict[str, Any] | None, optional) – Post-processor configuration dictionary.
ana (dict[str, Any] | None, optional) – Analysis script configuration dictionary.
rank (int, optional) – Rank of the current process.
- Returns:
Tuple containing the normalized
base,io,geo,model,build,post, andanaconfiguration dictionaries in that order.- Return type:
tuple
- normalize_seed_config(base: dict[str, Any], io: dict[str, Any]) None[source]
Normalize driver and sampler seed configuration in place.
- Parameters:
base (dict[str, Any]) – Resolved base configuration dictionary.
io (dict[str, Any]) – Resolved I/O configuration dictionary.
- classmethod extract_driver_base_config(base: Mapping[str, Any]) dict[str, Any][source]
Extract and validate the base keys owned by
Driver.- Parameters:
base (Mapping[str, Any]) – Resolved base configuration dictionary.
- Returns:
Subset of the base configuration used to initialize
Driverstate.- Return type:
dict[str, Any]
Notes
Keys consumed by launcher/runtime code are permitted in
basebut are intentionally not forwarded intoinitialize_base(). Any other key is treated as a configuration error and rejected.
- initialize_base(seed: int, world_size: int, dtype: str = 'float32', log_dir: str = 'logs', prefix_log: bool = False, overwrite_log: bool = False, csv_buffer_size: int = 1, parent_path: str | None = None, iterations: int | None = None, epochs: float | None = None, unwrap: bool = False, rank: int | None = None, log_step: int = 1, distributed: bool = False, split_output: bool = False, train: dict[str, Any] | None = None, tensorboard: bool | Mapping[str, Any] | None = None) dict[str, Any] | None[source]
Initialize the driver state derived from the
baseblock.- Parameters:
seed (int) – Random number generator seed.
world_size (int) – Number of visible accelerator devices available to the run.
dtype (str, default 'float32') – Floating-point dtype used by the model and numerical I/O paths.
log_dir (str, default 'logs') – Directory where CSV logs should be written.
prefix_log (bool, default False) – If
True, prefix log file names with an input-derived stem.overwrite_log (bool, default False) – If
True, allow the CSV writer to overwrite an existing log.csv_buffer_size (int, default 1) – CSV file buffer size. 1 is line buffered (default, safe), -1 uses system default, 0 is unbuffered, >1 is buffer size in bytes
parent_path (str, optional) – Parent path used to resolve relative analysis-script paths.
iterations (int, optional) – Number of entries or batches to process.
Nonemeans use the full dataset/loader.epochs (float, optional) – Number of passes over the full dataset when iterating with a loader.
unwrap (bool, default False) – If
True, unwrap batched data into per-entry outputs.rank (int, optional) – Rank of the current process in distributed execution.
log_step (int, default 1) – Logging period in iterations.
distributed (bool, default False) – If
True, mark this process as participating in distributed execution.split_output (bool, default False) – If
True, write one output file per input file.train (dict[str, Any] | None, optional) – Training configuration dictionary. This method does not interpret the content; it returns it so the model manager can do so.
tensorboard (bool | Mapping[str, Any] | None, optional) – TensorBoard logging configuration.
FalseorNonedisable TensorBoard logging,Trueuses default settings, and a mapping overrides defaults such as output directory and flush interval.
- Returns:
Training configuration dictionary to forward into the model manager, if any.
- Return type:
dict[str, Any] | None
- initialize_io(io: Mapping[str, Any]) None[source]
Initialize the input/output manager.
- Parameters:
io (Mapping[str, Any]) – Top-level I/O configuration mapping. This may contain
loader,reader, and/orwritersections.
- initialize_geo(geo: Mapping[str, Any] | None = None) None[source]
Initialize the detector geometry singleton.
- Parameters:
geo (Mapping[str, Any] | None, optional) – Geometry configuration mapping. If
None, geometry-dependent modules are left uninitialized until they are explicitly requested.
- initialize_model(model: Mapping[str, Any] | None = None, train: Mapping[str, Any] | None = None) None[source]
Initialize the model manager, if requested.
- Parameters:
model (Mapping[str, Any] | None, optional) – Model configuration mapping.
train (Mapping[str, Any] | None, optional) – Training configuration mapping extracted from the base block.
Notes
A model requires a loader-backed input pipeline. If a
trainblock is provided without a model block, initialization fails because there is no model to optimize.
- initialize_builder(build: Mapping[str, Any] | None = None) None[source]
Initialize reconstructed/truth representation building.
- Parameters:
build (Mapping[str, Any] | None, optional) – Representation-building configuration mapping.
Notes
Builder execution happens after optional model forwarding and optional unwrapping. If a model is present, its output must be unwrapped and converted to NumPy before representations can be built.
- initialize_post(post: Mapping[str, Any] | None = None) None[source]
Initialize post-processing modules.
- Parameters:
post (Mapping[str, Any] | None, optional) – Post-processing configuration mapping.
Notes
Post-processors operate on per-entry data products. When used after a model, the model output must therefore be unwrapped first.
- initialize_ana(ana: Mapping[str, Any] | None = None) None[source]
Initialize analysis scripts.
- Parameters:
ana (Mapping[str, Any] | None, optional) – Analysis configuration mapping.
Notes
Analysis scripts run on the same per-entry view of the data as post-processors. When used after a model, the model output must be unwrapped first.
- process(entry: int | None = None, run: int | None = None, subrun: int | None = None, event: int | None = None, iteration: int | None = None, epoch: float | None = None) dict[str, Any][source]
Process one entry or a batch of entries.
Run single step of main SPINE driver. This includes data loading, model forwarding, data structure building, post-processing and appending desired information to each row of output csv files.
- Parameters:
entry (int, optional) – Entry number to load
run (int, optional) – Run number to load
subrun (int, optional) – Subrun number to load
event (int, optional) – Event number to load
iteration (int, optional) – Iteration number. Only needed to train models and/or to apply time-dependant model losses, no-op otherwise
epoch (float, optional) – Epoch fraction. Only needed to train models, no-op otherwise
- Returns:
Processed data dictionary. If loader output was unwrapped, values inside the dictionary may be per-entry lists.
- Return type:
dict[str, Any]
- apply_filter(n_entry: int | None = None, n_skip: int | None = None, entry_list: list[int] | None = None, skip_entry_list: list[int] | None = None, run_event_list: list[tuple[int, int, int]] | None = None, skip_run_event_list: list[tuple[int, int, int]] | None = None) None[source]
Restrict the list of entries.
- Parameters:
n_entry (int, optional) – Maximum number of entries to load
n_skip (int, optional) – Number of entries to skip at the beginning
entry_list (list, optional) – List of integer entry IDs to add to the index
skip_entry_list (list, optional) – List of integer entry IDs to skip from the index
run_event_list (list((int, int, int)), optional) – List of (run, subrun, event) triplets to add to the index
skip_run_event_list (list((int, int, int)), optional) – List of (run, subrun, event) triplets to skip from the index
- log(data: dict[str, Any], tstamp: str, iteration: int, epoch: float | None = None) None[source]
Log relevant information to CSV files and stdout.
- Parameters:
data (dict) – Dictionary of data products to extract scalars from
tstamp (str) – Time when this iteration was run
iteration (int) – Iteration counter
epoch (float) – Progress in the training process in number of epochs