Source code for spine.io.write.csv

"""Module to write log files to CSV."""

import os
from types import TracebackType
from typing import Any

__all__ = ["CSVWriter"]


[docs] class CSVWriter: """Writes data to a CSV file with optimized performance. Builds a CSV file to store the output of the analysis tools. It can only be used to store relatively basic quantities (scalars, strings, etc.). **Performance Optimization**: This writer keeps the file handle open during its lifetime, eliminating the overhead of opening/closing the file on every write operation. This provides significant speedup when writing many rows. By default, uses line buffering (buffer_size=1) to ensure each row is safely written while maintaining excellent performance. **Usage**: The writer should be properly closed when done: 1. Using context manager (recommended): .. code-block:: python with CSVWriter('output.csv') as writer: writer.append({'col1': 1, 'col2': 2}) writer.append({'col1': 3, 'col2': 4}) # File automatically closed and flushed 2. Manual management (used by AnaBase): .. code-block:: python writer = CSVWriter('output.csv') writer.append({'col1': 1, 'col2': 2}) writer.close() # Must call explicitly! **Configuration**: Buffer size can be configured: - In analysis scripts (YAML config): .. code-block:: yaml ana: buffer_size: 1 # Line buffered (default, safe and fast) my_analysis: ... - In driver logging (YAML config): .. code-block:: yaml base: csv_buffer_size: 1 # For driver log file """ name = "csv"
[docs] def __init__( self, file_name: str = "output.csv", directory: str | None = None, overwrite: bool = False, append: bool = False, accept_missing: bool = False, buffer_size: int = 1, ) -> None: """Initialize the basics of the output file. Parameters ---------- file_name : str, default 'output.csv' Name of the output CSV file directory : str, optional Output directory. When provided, the CSV file is written under this directory using the basename of ``file_name``. overwrite : bool, default False If True, overwrite the output file if it already exists append : bool, default False If True, add more rows to an existing CSV file accept_missing : bool, default True Tolerate missing keys buffer_size : int, default 1 Buffer size for file writing. 1 is line buffered (default, safe), -1 uses system default buffering, 0 is unbuffered, >1 is buffer size in bytes """ if directory is not None: file_name = os.path.join(directory, os.path.basename(file_name)) # Check that output file does not already exist, if requested if not overwrite and not append and os.path.isfile(file_name): raise FileExistsError(f"File with name {file_name} already exists.") # Store persistent attributes self.file_name = file_name self.append_file = append self.accept_missing = accept_missing self.buffer_size = buffer_size self.keys = None self.file_handle = None # If appending, check that the file exists and read the header if self.append_file: if not os.path.isfile(file_name): raise FileNotFoundError( f"File not found at path: {file_name}. When using " "`append=True` in CSVWriter, the file must exist at " "the prescribed path before data is written to it." ) with open(self.file_name, "r", encoding="utf-8") as out_file: self.keys = out_file.readline().strip().split(",")
def __enter__(self) -> "CSVWriter": """Context manager entry. Opens the file handle. Returns ------- CSVWriter Self reference for context manager """ self.open() return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool: """Context manager exit. Closes the file handle. Parameters ---------- exc_type : type Exception type if an exception occurred exc_val : Exception Exception value if an exception occurred exc_tb : TracebackType Exception traceback if an exception occurred Returns ------- bool False to propagate exceptions """ self.close() return False
[docs] def open(self) -> None: """Open the file handle for writing. If the file handle is already open, this does nothing. The file is opened in append mode if append_file is True and the file exists, otherwise in write mode. """ if self.file_handle is None: mode = "a" if self.append_file and os.path.isfile(self.file_name) else "w" self.file_handle = open( self.file_name, mode, encoding="utf-8", buffering=self.buffer_size )
[docs] def close(self) -> None: """Close the file handle and ensure all data is written. This flushes any buffered data before closing. After calling this, the writer cannot be used unless open() is called again. """ if self.file_handle is not None: self.file_handle.flush() self.file_handle.close() self.file_handle = None
[docs] def flush(self) -> None: """Explicitly flush the file buffer to disk. This forces any buffered data to be written to disk without closing the file. Useful for ensuring data persistence at specific checkpoints. """ if self.file_handle is not None: self.file_handle.flush()
[docs] def create(self, data: dict[str, Any]) -> None: """Initialize the header of the CSV file, record the keys to be stored. Parameters ---------- data : dict Dictionary containing the output of the reconstruction chain """ # Save the list of keys to store self.keys = list(data.keys()) # Open the file handle if not already open self.open() # File handle is guaranteed to be open here assert self.file_handle is not None # Create a header and write it to file header_str = ",".join(self.keys) self.file_handle.write(header_str + "\n")
[docs] def append(self, data: dict[str, Any]) -> None: """Append the CSV file with the output. Parameters ---------- result_blob : dict Dictionary containing the output of the reconstruction chain """ # Fetch the values to store if self.keys is None: # If this function has never been called, initialiaze the CSV file self.create(data) else: # If it has, check that the list of keys is identical if list(data.keys()) != self.keys: # If it is not identical, check the discrepancies missing = self.array_diff(self.keys, data.keys()) excess = self.array_diff(data.keys(), self.keys) if len(excess): raise AssertionError( "There are keys in this entry which were not " "present when the CSV file was initialized. " f"New keys: {list(excess)}" ) if len(missing) and not self.accept_missing: raise AssertionError( "There are keys missing in this entry which were " "present when the CSV file was initialized. " f"Missing keys: {list(missing)}" ) new_data = {k: -1 for k in self.keys} for k, v in data.items(): new_data[k] = v data = new_data # Ensure file is open if self.file_handle is None: self.open() # File handle is guaranteed to be open here assert self.file_handle is not None assert self.keys is not None # Append to file (no open/close overhead!) result_str = ",".join([str(data[k]) for k in self.keys]) self.file_handle.write(result_str + "\n")
[docs] @staticmethod def array_diff(array_x: list[str], array_y: list[str]) -> set[str]: """Compare the content of two arrays. This functions returns the elemnts of the first array that do not appear in the second array. Parameters ---------- array_x : List[str] First array of strings array_y : List[str] Second array of strings Returns ------- Set[str] Set of keys that appear in `array_x` but not in `array_y`. """ return set(array_x).difference(set(array_y))