Source code for spine.ana.manager

"""Manages the operation of analysis scripts."""

from collections import OrderedDict, defaultdict
from copy import deepcopy

import numpy as np

from spine.utils.stopwatch import StopwatchManager

from .factories import ana_script_factory


[docs] class AnaManager: """Manager class to initialize and execute analysis scripts. Analysis scripts use the output of the reconstruction chain and the post-processors and produce simple CSV files. It loads all the analysis scripts and feeds them data. It initializes CSV writers needed to store the output of the analysis scripts. """ def __init__(self, cfg, log_dir=None, prefix=None): """Initialize the analysis manager. Parameters ---------- cfg : dict Analysis script configurations log_dir : str Output CSV file directory (shared with driver log) prefix : str, optional Input file prefix. If requested, it will be used to prefix all the output CSV files. """ # Parse the analysis block configuration self.parse_config(log_dir, prefix, **cfg)
[docs] def parse_config( self, log_dir, prefix, overwrite=None, prefix_output=False, buffer_size=1, **modules, ): """Parse the analysis tool configuration. Parameters ---------- log_dir : str Output CSV file directory (shared with driver log) prefix : str Input file prefix. If requested, it will be used to prefix all the output CSV files. overwrite : bool, optional If `True`, overwrite the CSV logs if they already exist prefix_output : bool, optional If `True`, will prefix the output CSV names with the input file name buffer_size : int, default 1 CSV file buffer size. 1 is line buffered (default), -1 uses system default, 0 is unbuffered, >1 is buffer size in bytes **modules : dict List of analysis script modules """ # Loop over the analyzer modules and get their priorities modules = deepcopy(modules) keys = np.array(list(modules.keys())) priorities = -np.ones(len(keys), dtype=np.int32) for i, k in enumerate(keys): if "priority" in modules[k]: priorities[i] = modules[k].pop("priority") # Only use the prefix if the output is to be prefixed if not prefix_output: prefix = None # Add the modules to a processor list in decreasing order of priority self.watch = StopwatchManager() self.modules = OrderedDict() keys = keys[np.argsort(-priorities)] for k in keys: # Profile the module self.watch.initialize(k) # Append self.modules[k] = ana_script_factory( k, modules[k], overwrite, log_dir, prefix, buffer_size )
def __call__(self, data): """Pass one batch of data through the analysis scripts Parameters ---------- data : dict Dictionary of data products """ # Reset active stopwatches self.watch.reset_if_active() # Loop over the analysis script modules single_entry = np.isscalar(data["index"]) for key, module in self.modules.items(): # Run the analysis script on each entry self.watch.start(key) if single_entry: num_entries = 1 result = module(data) else: num_entries = len(data["index"]) result = defaultdict(list) for entry in range(num_entries): result_e = module(data, entry) if result_e is not None: for k, v in result_e.items(): result[k].append(v) self.watch.stop(key) # Update the input dictionary if result is not None: for key, val in result.items(): if not single_entry: assert len(val) == num_entries, ( f"The number {key} ({len(val)}) does not match " f"the number of entries ({num_entries})." ) data[key] = val
[docs] def close(self): """Close all analysis script writers and flush remaining data. This should be called when analysis is complete to ensure all CSV files are properly closed and data is written. """ for module in self.modules.values(): module.close_writers()
[docs] def flush(self): """Flush all analysis script writer buffers. This forces any buffered data to be written to disk without closing the files. """ for module in self.modules.values(): module.flush_writers()
def __del__(self): """Destructor to ensure analysis writers are closed. Acts as a safety net in case close() is not called explicitly. """ self.close()