Data Structures¶
Provides custom data structures used by other Ataraxis and Sun Lab libraries.
- class ataraxis_data_structures.data_structures.JobState(job_name, specifier='', status=ProcessingStatus.SCHEDULED, executor_id=None, error_message=None, started_at=None, completed_at=None)¶
Bases:
objectStores the metadata and the current runtime status of a single job in the processing pipeline.
- completed_at: int | None¶
The UTC timestamp (microsecond-precision epoch) when the job completed (succeeded or failed).
- error_message: str | None¶
An optional error message describing why the job failed.
- executor_id: str | None¶
An optional identifier for the executor running the job (e.g. a SLURM job ID, a process PID, or any user-defined string).
- job_name: str¶
The descriptive name of the job.
- specifier: str¶
An optional specifier that differentiates instances of the same job, for example, when running the same job over multiple batches of data.
- started_at: int | None¶
The UTC timestamp (microsecond-precision epoch) when the job started running.
- status: ProcessingStatus¶
The current status of the job.
- class ataraxis_data_structures.data_structures.ProcessingStatus(*values)¶
Bases:
IntEnumDefines the status codes used by the ProcessingTracker instances to communicate the runtime state of each job making up the managed data processing pipeline.
- FAILED = 3¶
Indicates the job encountered a runtime error and was not completed.
- RUNNING = 1¶
Indicates the job is currently being executed.
- SCHEDULED = 0¶
Indicates the job is scheduled for execution.
- SUCCEEDED = 2¶
Indicates the job has been completed successfully.
- class ataraxis_data_structures.data_structures.ProcessingTracker(file_path, jobs=<factory>)¶
Bases:
YamlConfigTracks the state of a data processing pipeline and provides tools for communicating this state between multiple processes and host-machines.
Notes
All modifications to the tracker file require the acquisition of the .lock file, which ensures exclusive access to the tracker’s data, allowing multiple independent processes (jobs) to safely work with the same tracker file.
- property complete: bool¶
Returns True if the tracked processing pipeline has been completed successfully.
Notes
The pipeline is considered complete if all jobs have been marked as succeeded.
- complete_job(job_id)¶
Marks a target job as successfully completed.
- Parameters:
job_id (
str) – The unique identifier of the job to mark as complete.- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
ValueError – If the specified job ID is not found in the managed tracker file.
- Return type:
None
- property encountered_error: bool¶
Returns True if the tracked processing pipeline has been terminated due to a runtime error.
Notes
The pipeline is considered to have encountered an error if any job has been marked as failed.
- fail_job(job_id, error_message=None)¶
Marks the target job as failed.
- Parameters:
job_id (
str) – The unique identifier of the job to mark as failed.error_message (
str|None, default:None) – An optional error message describing why the job failed.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
ValueError – If the specified job ID is not found in the managed tracker file.
- Return type:
None
- file_path: Path¶
The path to the .YAML file used to cache the tracker’s data on disk.
- find_jobs(job_name=None, specifier=None)¶
Searches for jobs matching the given name and/or specifier patterns.
Supports partial matching (substring search) on job names and specifiers. If both parameters are provided, jobs must match both patterns.
- Parameters:
job_name (
str|None, default:None) – A substring to match against job names. If None, matches any job name.specifier (
str|None, default:None) – A substring to match against specifiers. If None, matches any specifier.
- Return type:
dict[str,tuple[str,str]]- Returns:
A dictionary mapping matching job IDs to (job_name, specifier) tuples.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
ValueError – If both job_name and specifier are None.
- static generate_job_id(job_name, specifier='')¶
Generates a unique hexadecimal job identifier based on the job’s name and optional specifier using the xxHash64 checksum generator.
- Parameters:
job_name (
str) – The descriptive name for the processing job (e.g., ‘process_data’).specifier (
str, default:'') – An optional specifier that differentiates instances of the same job (e.g., ‘batch_101’).
- Return type:
str- Returns:
The unique hexadecimal identifier for the target job.
- get_job_info(job_id)¶
Returns the full JobState object for the specified job.
- Parameters:
job_id (
str) – The unique identifier of the job to query.- Return type:
- Returns:
The JobState object containing all metadata for the job.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
ValueError – If the specified job ID is not found in the managed tracker file.
- get_job_status(job_id)¶
Queries the current runtime status of the target job.
- Parameters:
job_id (
str) – The unique identifier of the job for which to query the runtime status.- Return type:
- Returns:
The current runtime status of the job.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
ValueError – If the specified job ID is not found in the managed tracker file.
- get_jobs_by_status(status)¶
Returns all job IDs that have the specified status.
- Parameters:
status (
ProcessingStatus|str) – The status to filter jobs by.- Return type:
list[str]- Returns:
A list of job IDs with the specified status.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
- get_summary()¶
Returns a summary of job counts by status.
- Return type:
dict[ProcessingStatus,int]- Returns:
A dictionary mapping each ProcessingStatus to the count of jobs with that status.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
- initialize_jobs(jobs)¶
Configures the tracker with the list of one or more jobs to be executed during the pipeline’s runtime.
Notes
If the job already has a section in the tracker, this method does not duplicate or modify the existing job entry. Use the reset() method to clear all cached job states.
- Parameters:
jobs (
list[tuple[str,str]]) – A list of (job_name, specifier) tuples defining the jobs to track. Each tuple contains the descriptive job name and an optional specifier string. Use an empty string for jobs without a specifier.- Return type:
list[str]- Returns:
A list of job IDs corresponding to the input jobs.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
- jobs: dict[str, JobState]¶
Maps the unique identifiers of the jobs that make up the processing pipeline to their current state and metadata.
- lock_path: str¶
The path to the .LOCK file used to ensure thread-safe access to the tracker’s data.
- reset()¶
Resets the tracker file to the default state.
- Return type:
None
- retry_failed_jobs()¶
Resets all failed jobs back to SCHEDULED status for retry.
This clears the error_message, started_at, and completed_at fields for each failed job.
- Return type:
list[str]- Returns:
A list of job IDs that were reset for retry.
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
- start_job(job_id, executor_id=None)¶
Marks the target job as running and optionally records the executor identifier.
- Parameters:
job_id (
str) – The unique identifier of the job to mark as started.executor_id (
str|None, default:None) – An optional identifier for the executor running the job (e.g. a SLURM job ID, a process PID, or any user-defined string).
- Raises:
TimeoutError – If the .LOCK file for the tracker .YAML file cannot be acquired within the timeout period.
ValueError – If the specified job ID is not found in the managed tracker file.
- Return type:
None
- class ataraxis_data_structures.data_structures.YamlConfig¶
Bases:
objectExtends the standard Python dataclass with methods to save and load its data from a .yaml (YAML) file.
Notes
This class is designed to be subclassed by custom dataclasses so that they inherit the YAML saving and loading functionality. Serialization automatically converts Path instances to strings, Enum members to their raw values, and tuples to lists. Deserialization reverses these conversions based on the dataclass’s type annotations.
- classmethod from_yaml(file_path)¶
Instantiates the class using the data loaded from the provided .yaml (YAML) file.
Notes
Deserialization automatically converts YAML-native types back to the annotated Python types: strings to Path instances, raw values to Enum members, and lists to tuples where applicable. Type hooks are derived from the dataclass’s field annotations, so no manual conversion boilerplate is needed in subclasses.
- Parameters:
file_path (
Path) – The path to the .yaml file that stores the instance’s data.- Return type:
Self- Returns:
A new class instance that stores the data read from the .yaml file.
- Raises:
ValueError – If the provided file path does not point to a .yaml or .yml file.
- to_yaml(file_path)¶
Saves the instance’s data as the specified .yaml (YAML) file.
Notes
Path fields are serialized as strings, Enum fields as their raw values, and tuples as lists. This keeps YAML files human-readable while preserving full type fidelity on round-trip via
from_yaml().- Parameters:
file_path (
Path) – The path to the .yaml file to write.- Raises:
ValueError – If the file_path does not point to a file with a ‘.yaml’ or ‘.yml’ extension.
- Return type:
None
Data Loggers¶
Provides assets for saving (logging) various forms of data to disk.
- class ataraxis_data_structures.data_loggers.DataLogger(output_directory, instance_name, thread_count=5, poll_interval=5)¶
Bases:
objectManages the runtime of a data logger that saves serialized data collected from multiple concurrently active sources.
This class manages the runtime of a data logger running in a separate process. The logger uses multiple concurrent threads to optimize the I/O operations associated with saving the data to disk, achieving high throughput under a wide range of scenarios.
Notes
Initializing the class does not start the logger! Call the start() method to ensure that the logger is fully initialized before submitting data for logging.
Use the multiprocessing Queue exposed via the ‘input_queue’ property, to send the data to the logger. The data must be packaged into the LogPackage class instance before it is submitted to the queue.
- Parameters:
output_directory (
Path) – The directory where to save the logged data. The data is saved under a subdirectory named after the logger instance.instance_name (
str) – The name of the logger instance. This name has to be unique across all concurrently active DataLogger instances.thread_count (
int, default:5) – The number of threads to use for saving the data to disk. It is recommended to use multiple threads to parallelize the I/O operations associated with writing the logged data to disk.poll_interval (
int, default:5) – The interval, in milliseconds, between polling the input queue. Primarily, this is designed to optimize the CPU usage during light workloads. Setting this to 0 disables the polling delay mechanism.
- _started¶
Tracks whether the logger process is running.
- _mp_manager¶
Stores the manager object used to instantiate and manage the multiprocessing Queue.
- _thread_count¶
Stores the number of concurrently active data saving threads.
- _poll_interval¶
Stores the data queue poll interval, in milliseconds.
- _name¶
Stores the name of the data logger instance.
- _output_directory¶
Stores the directory where the data is saved.
- _input_queue¶
Stores the multiprocessing Queue used to buffer and pipe the data to the logger process.
- _logger_process¶
Stores the Process object that runs the data logging cycle.
- _terminator_array¶
Stores the shared memory array used to terminate (shut down) the logger process.
- _watchdog_thread¶
Stores the thread used to monitor the runtime status of the remote logger process.
- property alive: bool¶
Returns True if the instance’s logger process is currently running.
- property input_queue: Queue¶
Returns the multiprocessing Queue used to buffer and pipe the data to the logger process.
- property name: str¶
Returns the name of the instance.
- property output_directory: Path¶
Returns the path to the directory where the data is saved.
- start()¶
Starts the remote logger process and the assets used to control and monitor the logger’s uptime.
- Return type:
None
- stop()¶
Stops the logger process once it saves all buffered data and releases reserved resources.
- Return type:
None
- class ataraxis_data_structures.data_loggers.LogArchiveReader(archive_path, onset_us=None)¶
Bases:
objectReads and iterates through .npz log archives generated by DataLogger instances.
This class provides efficient access to log archive contents with support for parallel batch processing. It handles onset timestamp discovery, message iteration, and batch assignment for multiprocessing workflows.
Notes
Each .npz archive contains messages from a single source (producer). Messages are stored with the structure: [source_id (1 byte)][timestamp (8 bytes)][payload (N bytes)].
The first message with a timestamp value of 0 contains the onset timestamp as its payload. This onset timestamp is the UTC epoch reference used to convert elapsed microseconds to absolute timestamps.
For multiprocessing workflows, the main process should create a reader to discover the onset timestamp and generate batch assignments. Worker processes can then create lightweight reader instances by passing the pre-discovered onset_us to skip redundant onset scanning.
- Parameters:
archive_path (
Path) – The path to the .npz log archive file to read.onset_us (
uint64|None, default:None) – The pre-discovered onset timestamp in microseconds since epoch. If provided, skips onset discovery. Use this in worker processes to avoid redundant scanning.
- _archive_path¶
Stores the path to the log archive file.
- _onset_us¶
Stores the onset timestamp if pre-provided.
- _message_keys¶
Caches the list of message keys (excluding the onset message).
- Raises:
FileNotFoundError – If the specified archive file does not exist.
- get_batches(workers=-1, batch_multiplier=4)¶
Divides message keys into batches optimized for parallel processing.
Notes
Uses over-batching (creating more batches than workers) to improve load balancing when message processing times vary. The batch_multiplier parameter controls the degree of over-batching.
For archives with fewer messages than the parallel processing threshold (2000), returns a single batch containing all message keys.
- Parameters:
workers (
int, default:-1) – The number of worker processes to optimize batching for. A value less than 1 uses all available CPU cores minus 2.batch_multiplier (
int, default:4) – The over-batching factor. Creates (workers * batch_multiplier) batches for better load distribution.
- Return type:
list[list[str]]- Returns:
A list of message key batches. Each batch is a list of string keys that can be passed to iter_messages().
- iter_messages(keys=None)¶
Iterates through messages in the archive, yielding LogMessage instances.
Notes
Opens the archive with memory mapping for efficient access. The archive is kept open for the duration of iteration.
If keys is provided, only iterates through the specified messages. This is useful for processing a batch of messages in a worker process.
- Parameters:
keys (
list[str] |None, default:None) – Optional list of message keys to iterate. If None, iterates through all data messages in the archive.- Yields:
LogMessage instances containing the absolute timestamp and payload for each message.
- Return type:
Iterator[LogMessage]
- property message_count: int¶
Returns the number of data messages in the archive, excluding the onset message.
- Returns:
The count of data messages available for iteration.
- property onset_timestamp_us: uint64¶
Returns the onset timestamp in microseconds since epoch.
Notes
The onset timestamp is the UTC epoch reference stored in the first message with a timestamp value of 0. All other message timestamps are stored as elapsed microseconds relative to this onset.
If onset_us was provided during initialization, returns that value without scanning the archive. Otherwise, scans the archive to discover the onset timestamp and caches the result.
- Returns:
The onset timestamp as a numpy uint64 value representing microseconds since epoch.
- Raises:
ValueError – If the archive does not contain a valid onset timestamp message.
- read_all_messages()¶
Reads all messages from the archive and returns them as arrays.
Notes
This method loads all messages into memory at once. For very large archives, consider using iter_messages() with batching instead.
- Return type:
tuple[ndarray[tuple[Any,...],dtype[uint64]],list[ndarray[tuple[Any,...],dtype[uint8]]]]- Returns:
A tuple of two elements. The first element is a numpy array of absolute timestamps in microseconds. The second element is a list of payload arrays, one for each message.
- class ataraxis_data_structures.data_loggers.LogMessage(timestamp_us, payload)¶
Bases:
objectStores a single message extracted from a log archive.
Notes
This class is yielded by the LogArchiveReader.iter_messages() method for each message in the archive. The structure of the payload is domain-specific and must be parsed by the consumer.
- payload: ndarray[tuple[Any, ...], dtype[uint8]]¶
The message payload as a byte array.
- timestamp_us: uint64¶
The absolute UTC timestamp of when the message was logged, in microseconds since epoch.
- class ataraxis_data_structures.data_loggers.LogPackage(source_id, acquisition_time, serialized_data)¶
Bases:
objectStores the data and ID information to be logged by the DataLogger class and exposes methods for packaging this data into the format expected by the logger.
Notes
During runtime, the DataLogger class expects all data sent for logging via the input Queue object to be packaged into an instance of this class.
- acquisition_time: uint64¶
The timestamp of when the data was acquired. This value typically communicates the number of microseconds elapsed since the onset of the data acquisition runtime.
- property data: tuple[str, ndarray[tuple[Any, ...], dtype[uint8]]]¶
Returns the filename and the serialized data package to be processed by a DataLogger instance.
Notes
This property is designed to be exclusively accessed by the DataLogger instance.
- serialized_data: ndarray[tuple[Any, ...], dtype[uint8]]¶
The serialized data to be logged, stored as a one-dimensional bytes’ NumPy array.
- source_id: uint8¶
The ID code of the source that produced the data. Has to be unique across all systems that send data to the same DataLogger instance during runtime.
- ataraxis_data_structures.data_loggers.assemble_log_archives(log_directory, max_workers=None, *, remove_sources=True, memory_mapping=True, verbose=False, verify_integrity=False)¶
Consolidates all .npy files in the target log directory into .npz archives, one for each unique source.
This function is designed to post-process the directories filled by DataLogger instances during runtime.
Notes
All log entries inside each archive are grouped by their acquisition timestamp value before consolidation. The consolidated archive names include the ID code of the source that generated the original log entries.
- Parameters:
log_directory (
Path) – The path to the directory that stores the log entries as .npy files.max_workers (
int|None, default:None) – Determines the number of threads used to process the data in parallel. If set to None, the function uses the number of CPU cores - 2 threads.remove_sources (
bool, default:True) – Determines whether to remove the .npy files after consolidating their data into .npz archives.memory_mapping (
bool, default:True) – Determines whether to memory-map or load the processed data into RAM during processing. Due to Windows not releasing memory-mapped file handles, this function always loads the data into RAM when running on Windows.verbose (
bool, default:False) – Determines whether to communicate the log assembly progress via the terminal.verify_integrity (
bool, default:False) – Determines whether to verify the integrity of the created archives against the original log entries before removing sources.
- Return type:
None
Processing¶
Provides utilities for data integrity verification, directory transfer, and data interpolation.
- ataraxis_data_structures.processing.calculate_directory_checksum(directory, num_processes=None, *, progress=False, save_checksum=True, excluded_files=None)¶
Calculates the xxHash3-128 checksum for the input directory.
Notes
The function can be configured to write the generated checksum as a hexadecimal string to the ax_checksum.txt file stored at the highest level of the input directory.
The xxHash3 checksum is not suitable for security purposes and is only used to ensure data integrity.
The returned checksum accounts for both the contents of each file and the layout of the input directory structure.
- Parameters:
directory (
Path) – The path to the directory for which to generate the checksum.num_processes (
int|None, default:None) – The number of processes to use for parallelizing checksum calculation. If set to None, the function uses all available CPU cores.progress (
bool, default:False) – Determines whether to track the checksum calculation progress using a progress bar.save_checksum (
bool, default:True) – Determines whether the checksum should be saved (written to) a .txt file.excluded_files (
set[str] |None, default:None) – The set of filenames to exclude from the checksum calculation. If set to None, defaults to{"ax_checksum.txt"}.
- Return type:
str- Returns:
The xxHash3-128 checksum for the input directory as a hexadecimal string.
- ataraxis_data_structures.processing.delete_directory(directory_path)¶
Deletes the target directory and all its subdirectories using parallel processing.
- Parameters:
directory_path (
Path) – The path to the directory to delete.- Return type:
None
- ataraxis_data_structures.processing.interpolate_data(source_coordinates, source_values, target_coordinates, *, is_discrete)¶
Interpolates the data values at the requested coordinates using the source coordinate-value distribution.
Notes
This function expects ‘source_coordinates’ and ‘target_coordinates’ arrays to be monotonically increasing.
Discrete interpolated data is returned as an array with the same datatype as the input data. Continuous interpolated data is returned as a float64 datatype array.
Continuous data is interpolated using the linear interpolation method. Discrete data is interpolated to the last known value to the left of each interpolated coordinate.
- Parameters:
source_coordinates (
ndarray[tuple[Any,...],dtype[number[Any]]]) – The source coordinate values.source_values (
ndarray[tuple[Any,...],dtype[number[Any]]]) – The data values at each source coordinate.target_coordinates (
ndarray[tuple[Any,...],dtype[number[Any]]]) – The target coordinates for which to interpolate the data values.is_discrete (
bool) – Determines whether the interpolated data is discrete or continuous.
- Return type:
ndarray[tuple[Any,...],dtype[number[Any]]]- Returns:
A one-dimensional NumPy array with the same length as the ‘target_coordinates’ array that stores the interpolated data values.
- ataraxis_data_structures.processing.transfer_directory(source, destination, num_threads=1, *, verify_integrity=False, remove_source=False, progress=False)¶
Copies the contents of the input source directory to the destination directory while preserving the underlying directory hierarchy.
Notes
This function recreates the moved directory hierarchy on the destination if the hierarchy does not exist. This is done before copying the files.
The function executes a multithreaded copy operation and does not by default remove the source data after the copy is complete.
If the function is configured to verify the transferred data’s integrity, it generates an xxHash-128 checksum of the data before and after the transfer and compares the two checksums to detect data corruption.
- Parameters:
source (
Path) – The path to the directory to be transferred.destination (
Path) – The path to the destination directory where to move the contents of the source directory.num_threads (
int, default:1) – The number of threads to use for the parallel file transfer. Setting this value to a number below 1 instructs the function to use all available CPU threads.verify_integrity (
bool, default:False) – Determines whether to perform integrity verification for the transferred files.remove_source (
bool, default:False) – Determines whether to remove the source directory after the transfer is complete and (optionally) verified.progress (
bool, default:False) – Determines whether to track the transfer progress using a progress bar.
- Raises:
RuntimeError – If the transferred files do not pass the xxHas3-128 checksum integrity verification.
- Return type:
None