Nested Dictionary
This module contains the NestedDictionary class that wraps a nested Python dictionary and exposes methods for manipulating dictionary keys and values through a path-based interface.
The primary advantage of NestedDictionary class is that it simplifies working with python nested dictionaries without having a-priori knowledge of their structure. In turn, this is helpful when writing pipelines that should work for a wide range of under-specified dictionary layouts. Additionally, even for cases where dictionary layout is known, many methods of the class conveniently simplify complex operations like replacing the datatype of all dictionary keys.
- class ataraxis_data_structures.data_structures.nested_dictionary.NestedDictionary(seed_dictionary=None, path_delimiter='.')
Bases:
object
Wraps a nested (hierarchical) python dictionary and provides methods for manipulating its values.
This class is primarily designed to abstract working with nested dictionaries without a-priori knowledge of dictionary layout. For example, when using data logging methods that produce nested .json or .yml files that can be parsed into dictionaries, the data can subsequently be wrapped and processed using this class. Alternatively, this class is used in AtaraxisData class to provide a compressed map of the data stored as the unified hdf data to optimize data access and manipulation.
Notes
All class methods that modify the wrapped dictionary can be used to either modify the dictionary in-place or to return a new NestedDictionary class instance that wraps the modified dictionary.
While this class will work for both nested and shallow (one-level) dictionaries, it would be inefficient to leverage the class machinery for non-nested dictionaries. For shallow dictionaries, using pure python methods is preferred.
- _valid_datatypes
Stores supported dictionary key datatypes as a tuple. The class is guaranteed to recognize and work with these datatypes. This variable is used during input checks and for error messages related to key datatype conversion errors.
- _nested_dictionary
Stores the managed dictionary object. This object should never be accessed directly!
- _path_delimiter
Stores the sequence used to separate path nodes for input and output dictionary variable paths. The paths are used for purposes like accessing specific nested values.
- _key_datatypes
A set that stores the unique string names for the datatypes used by the keys in the dictionary. The datatype names are extracted from the __name__ property of the keys, so the class should be able to recognize more or less any type of keys. That said, support beyond the standard key datatypes listed in valid_datatypes is not guaranteed.
- Parameters:
seed_dictionary (
dict
[Any
,Any
] |None
, default:None
) – The ‘seed’ dictionary object to be used by the class. If not provided, the class will generate an empty shallow dictionary and use that as the initial object. This argument allows to re-initialize nested dictionaries when they are loaded from .yaml or .json files, by passing loaded dictionaries as seeds.path_delimiter (
str
, default:'.'
) – The delimiter used to separate keys in string variable paths. It is generally advised to stick to the default delimiter for most use cases. Only use custom delimiter if any of the dictionary or sub-dictionary keys reserve default delimiter for other purposes (for example, if the delimiter is part of a string key). Note, all methods in the class refer to this variable during runtime, so all inputs to the class have to use the class delimiter where necessary to avoid unexpected behavior.
- Raises:
TypeError – If input arguments are not of the supported type.
- __repr__()
Returns a string representation of the class instance.
- Return type:
str
- _convert_key_to_datatype(key, datatype)
Converts the input key to the requested datatype.
This method is designed to be used by other class methods when working with dictionary paths and should not be called directly by the user.
- Parameters:
key (
Any
) – The key to convert to the requested datatype. Generally expected to be one of the standard variable types (int, str and float).datatype (
Literal
['int'
,'str'
,'float'
,'NoneType'
]) – The string-option that specifies the datatype to convert the key into. Available options are: “int”, “str”, “float” and “NoneType”.
- Return type:
int
|str
|float
|None
- Returns:
The key converted to the requested datatype.
- Raises:
ValueError – If the requested datatype is not one of the supported datatypes. If the key value cannot be converted to the requested datatype.
- _convert_variable_path_to_keys(variable_path)
Converts the input variable_path to the tuple of keys, which is the format preferred by all class methods.
This is a utility method not intended to be called from outside the class. It verifies the input variable_path in addition to handling the necessary type-conversions to transform the input path into a valid tuple of nested key-values. For string variable_path inputs, it converts all keys in the string to the datatype used by the dictionary. For tuple, list or numpy array inputs, it assumes that the keys inside the iterable are formatted correctly, but checks other iterable properties, such as the number of dimensions.
Notes
Numpy arrays are not valid inputs if the dictionary uses more than a single datatype as they cannot represent mixed key types.
- Parameters:
variable_path (
str
|ndarray
[tuple
[int
,...
],dtype
[Any
]] |tuple
[Any
,...
] |list
[Any
]) – A string, tuple, list or numpy array that provides the sequence of keys pointing to the variable of interest inside the wrapped nested dictionary.- Return type:
tuple
[Any
,...
]- Returns:
The tuple of keys that point to a specific unique value in the dictionary. For input string paths, the keys are converted to the (only) datatype used by the dictionary. For input key iterables, the input is converted into a tuple, but does not undergo datatype-conversion for individual keys.
- Raises:
TypeError – If the input variable_path is not of a correct type.
ValueError – If the input variable_path is a string that ends with the class delimiter. If the input variable_path is a string or numpy array, and the dictionary keys use more than a single datatype. If the input numpy array has more than a single dimension. If the dictionary has an undefined key_datatypes property (most often an empty set), likely due to the class wrapping an empty dictionary.
- _extract_key_datatypes()
Extracts datatype names used by keys in the wrapped dictionary and returns them as a set.
Saves extracted datatypes as a set and keeps only unique datatype names. Primarily, this information is useful for determining whether dictionary variables can be safely indexed using string paths. For example, if the length of the set is greater than 1, the dictionary uses at least two unique datatypes for keys and, otherwise, the dictionary only uses one datatype. The latter case enables the use of string variable paths, whereas the former only allows key iterables to be used as variable paths.
- Return type:
set
[str
]- Returns:
A set of string-names that describe unique datatypes used by the dictionary keys. The names are extracted from each key class using its __name__ property.
- convert_all_keys_to_datatype(datatype, *, modify_class_dictionary=True)
Converts all keys inside the class dictionary to use the requested datatype.
This method is designed to un-mix dictionaries that use multiple key datatypes. Generally, it is preferable for dictionaries to use the same datatype (most commonly, string) for all keys. Working with these dictionaries is more efficient, and it is possible to use path strings, rather than key tuples, for improved user experience. Therefore, successfully running this method on mixed-datatype dictionaries can often lead to better user-experience.
- Parameters:
datatype (
Literal
['str'
,'int'
]) – The datatype to convert the dictionary keys to. Currently, only accepts ‘int’ and ‘str’ string-options as valid arguments, as these are the two most common (and most likely to be successfully resolved) datatypes.modify_class_dictionary (
bool
, default:True
) – Determines whether the method will replace the class dictionary instance with the modified dictionary generated during runtime (if True) or generate and return a new NestedDictionary instance built around the modified dictionary (if False). In the latter case, the new class will inherit the ‘path_separator’ attribute from the original class.
- Return type:
Optional
[NestedDictionary
]- Returns:
If modify_class_dictionary flag is False, a NestedDictionary instance that wraps the modified dictionary. If modify_class_dictionary flag is True, returns None and replaces the class dictionary with the altered dictionary.
- Raises:
ValueError – If the value for the datatype argument is not a supported datatype string-option.
- delete_nested_value(variable_path, *, modify_class_dictionary=True, delete_empty_sections=True, allow_missing=False)
Deletes the target value from nested dictionary using the provided variable_path.
This method recursively crawls the nested dictionary hierarchy using the provided variable_path until it reaches the terminal key. For that key, deletes the variable or hierarchy (sub-dictionary) referenced by the key. If requested, the method can remove hierarchical trees if they were vacated via terminal key deletion, potentially optimizing the dictionary structure by removing unused (empty) subdirectories.
Notes
This method uses recursive self-calls to crawl the dictionary. This can lead to stackoverflow for very deep nested dictionaries, although this is not a concern for most use cases.
- Parameters:
variable_path (
str
|tuple
[Any
,...
] |list
[Any
] |ndarray
[tuple
[int
,...
],dtype
[Any
]]) – The string specifying the hierarchical path to the variable to be deleted, using the class ‘path_delimiter’ to separate successive keys (nesting hierarchy levels). Example: ‘outer_sub_dict.inner_sub_dict.var_1’ (using dot (.) delimiters). Alternatively, a tuple, list or numpy array of keys that make up the full terminal variable path. Example: (‘outer_sub_dict’, 1, ‘variable_6’).modify_class_dictionary (
bool
, default:True
) – Determines whether the method will replace the class dictionary instance with the modified dictionary generated during runtime (if True) or generate and return a new NestedDictionary instance built around the modified dictionary (if False). In the latter case, the new class will inherit the ‘path_separator’ attribute from the original class.delete_empty_sections (
bool
, default:True
) – Determines whether dictionary sections made empty by the deletion of underlying section / variable keys are also deleted. It is generally recommended to keep this flag set to True to optimize memory usage.allow_missing (
bool
, default:False
) – Determine whether missing keys in the variable_path should trigger exceptions. If True, missing keys are treated like deleted keys and the method will handle them as if the deletion was carried out as expected. If False, the method will notify the user if a particular key is not found in the dictionary by raising an appropriate KeyError exception.
- Return type:
Optional
[NestedDictionary
]- Returns:
If modify_class_dictionary flag is False, a NestedDictionary instance that wraps the modified dictionary. If modify_class_dictionary flag is True, returns None and replaces the class dictionary with the altered dictionary.
- Raises:
KeyError – If any of the target keys are not found at the expected dictionary level, and missing keys are not allowed.
- extract_nested_variable_paths(*, return_raw=False)
Crawls the wrapped nested dictionary and extracts the full path from the top of the dictionary to each non-dictionary value.
The extracted paths can be converted to delimiter-delimited strings or returned as a tuple of key tuples. The former format is more user-friendly, but may not contain enough information to fully individuate each pat. The latter format allows for each path to be truly unique at the cost of being less user-friendly.
Notes
The output format to choose depends on the configuration of the nested dictionary. If the dictionary only contains keys of the same datatype, the delimited strings are the preferred path format and otherwise the raw tuple is the preferred format. When this method is called from other NestedDictionary methods, the most optimal format is selected automatically.
This method uses recursive self-calls to crawl the dictionary. This can lead to stack overflow for very deep nested dictionaries, although this is not a concern for most use cases.
This method treats empty sub-dictionaries as valid terminal paths and returns them alongside the paths to terminal values.
- Parameters:
return_raw (
bool
, default:False
) – Determines whether the method formats the result as the tuple of key tuples or the tuple of delimiter-delimited strings. See notes above for more information.- Return type:
tuple
[str
] |tuple
[tuple
[Any
,...
],...
]- Returns:
If return_raw is true, a tuple of tuples, where each sub-tuple stores a sequence of dictionary path keys. If return_raw is false, a tuple of delimiter-delimited path strings.
- find_nested_variable_path(target_key, search_mode='terminal_only', *, return_raw=False)
Extracts the path(s) to the target variable (key) from the wrapped hierarchical dictionary.
This method is designed to ‘find’ requested variables and return their paths, so that they can be modified by other class methods. This is primarily helpful when no a-priori dictionary layout information is available.
To do so, the method uses extract_nested_dict_param_paths() method from this class to discover paths to each non-dictionary variable and then iterates over all keys in each of the extracted paths until it finds all keys that match the ‘target_key’ argument.
Notes
The method evaluates both the value and the datatype of the input key when searching for matches. If more than one match is found for the input target_key, all discovered paths will be returned as a tuple, in the order of discovery.
The output format to choose depends on the configuration of the nested dictionary. If the dictionary only contains keys of the same datatype, the delimited strings are the preferred path format and otherwise the raw tuple is the preferred format. When this method is called from other NestedDictionary methods, the most optimal format is selected automatically.
- Parameters:
target_key (
str
|int
|float
|None
) – A key which points to the value of interest (variable name). Can be a terminal key pointing to a variable value or an intermediate key pointing to a sub-dictionary (section). The method will account for the input key datatype when searching for the target variable inside the class dictionary.search_mode (
Literal
['terminal_only'
,'intermediate_only'
,'all'
], default:'terminal_only'
) – Specifies the search mode for the method. Currently, supports 3 search modes: ‘terminal_only’, ‘intermediate_only’ and ‘all’. ‘terminal_only’ mode only searches the terminal (non-dictionary) keys in each path. ‘intermediate_only’ mode only searches non-terminal (section / dictionary) keys in each path. ‘all’ searches all keys in each path.return_raw (
bool
, default:False
) – Determines whether the method formats the result as the tuple of key tuples or the tuple of delimiter-delimited strings. See notes above for more information.
- Return type:
tuple
[tuple
[Any
,...
] |str
,...
] |tuple
[Any
,...
] |str
|None
- Returns:
If return_raw is true, a tuple of tuples, where each sub-tuple stores a sequence of dictionary path keys. If return_raw is false, returns a tuple of delimiter-delimited path strings. If only a single matching path was found, returns it as a tuple of keys or a string, depending on the value of the return_raw flag. If no matching path was found, returns None.
- Raises:
TypeError – If the input target_key argument is not of the correct type.
ValueError – If the input search_mode is not one of the supported options.
- property key_datatypes: tuple[str, ...]
Returns unique datatypes used by dictionary keys as a sorted tuple.
- property path_delimiter: str
Returns the delimiter used to separate keys in string variable paths.
- read_nested_value(variable_path)
Reads the requested value from the nested dictionary using the provided variable_path.
This method allows accessing individual values stored anywhere across the nested dictionary structure. It can return both primitive types and dictionaries of any dimensionality. Therefore, it can be used to slice the nested dictionary as needed in addition to reading concrete values.
- Parameters:
variable_path (
str
|tuple
[Any
,...
] |list
[Any
] |ndarray
[tuple
[int
,...
],dtype
[Any
]]) – The string specifying the retrievable variable path using the class ‘path_delimiter’ to separate successive keys (nesting hierarchy levels). Example: ‘outer_sub_dict.inner_sub_dict.var_1’ (using dot (.) delimiters). Alternatively, a tuple, list or numpy array of keys that make up the full terminal variable path. Example: (‘outer_sub_dict’, 1, ‘variable_6’). Regardless of the input format, the path has to be relative to the highest level of the nested dictionary.- Return type:
Any
- Returns:
The value retrieved from the dictionary using the provided hierarchical variable path. The value can be a variable or a section (dictionary).
- Raises:
KeyError – If any key in the variable_path is not found at the expected nested dictionary level. If a non-terminal key in the key sequence returns a non-dictionary value, forcing the retrieval to be aborted before fully evaluating the entire variable path.
- set_path_delimiter(new_delimiter)
Sets the path_delimiter class attribute to the provided delimiter value.
This method can be used to replace the string-path delimiter of an initialized NestedDictionary class.
- Parameters:
new_delimiter (
str
) – The new delimiter to be used for separating keys in string variable paths.- Raises:
TypeError – If new_delimiter argument is not a string.
- Return type:
None
- write_nested_value(variable_path, value, *, modify_class_dictionary=True, allow_terminal_overwrite=True, allow_intermediate_overwrite=False)
Writes the input value to the requested level of the nested dictionary using the provided variable_path.
This method allows modifying individual values stored anywhere across the nested dictionary structure. It can be used to target both terminal values and sections (sub-dictionaries). If any of the keys in the variable_path are missing from the dictionary, the method will create and insert new empty sub-dictionaries to add the missing keys to the dictionary. This way, the method can be used to set up whole new hierarchies of keys.
Since the dictionary is modified, rather than re-created, all new subsections will be inserted after existing subsections, for each respective hierarchy. For example, when adding ‘variable_3’ subsection to a section that contains ‘variable_1, variable_2 and variable_4’ (in that order), the result will be: ‘variable_1, variable_2, variable_4, variable_3’.
- Parameters:
variable_path (
str
|tuple
[Any
,...
] |list
[Any
] |ndarray
[tuple
[int
,...
],dtype
[Any
]]) – The string specifying the hierarchical path to the variable to be modified / written, using the class ‘path_delimiter’ to separate successive keys (nesting hierarchy levels). Example: ‘outer_sub_dict.inner_sub_dict.var_1’ (using dot (.) delimiters). Alternatively, a tuple, list or numpy array of keys that make up the full terminal variable path. Example: (‘outer_sub_dict’, 1, ‘variable_6’). You can use multiple non-existent keys to specify a new hierarchy to add to the dictionary, as each missing key will be used to create an empty section (sub-dictionary) within the parent dictionary.value (
Any
) – The value to be written. The value is written using the terminal key of the sequence.modify_class_dictionary (
bool
, default:True
) – Determines whether the method will replace the class dictionary instance with the modified dictionary generated during runtime (if True) or generate and return a new NestedDictionary instance built around the modified dictionary (if False). In the latter case, the new class will inherit the ‘path_separator’ attribute from the original class.allow_terminal_overwrite (
bool
, default:True
) – Determines whether the method is allowed to overwrite already existing terminal key values (to replace the values associated with the last key in the sequence).allow_intermediate_overwrite (
bool
, default:False
) – Determines whether the method is allowed to overwrite non-dictionary intermediate key values (to replace a variable with a section if the variable is encountered when indexing one of the intermediate keys).
- Return type:
Optional
[NestedDictionary
]- Returns:
If modify_class_dictionary flag is False, a NestedDictionary instance that wraps the modified dictionary. If modify_class_dictionary flag is True, returns None and replaces the class dictionary with the altered dictionary.
- Raises:
KeyError – If overwriting is disabled, but the evaluated terminal key is already in target dictionary. If any of the intermediate (non-terminal) keys points to an existing non-dictionary variable and overwriting intermediate values is not allowed.
Yaml Config
This module contains the YamlConfig class, which is an extension of the standard Python dataclass that comes with methods to save and load itself to / from a .yml (YAML) file.
Primarily, this class is designed to be used for storing configuration data used by other runtimes in non-volatile memory in a human-readable format. However, it can also be adapted for intermediate-term data storage, if needed.
- class ataraxis_data_structures.data_structures.yaml_config.YamlConfig
Bases:
object
A Python dataclass bundled with methods to save and load itself from a .yml (YAML) file.
This class extends the base functionality of Python dataclasses by bundling them with the ability to serialize the data into non-volatile memory as .yml files. Primarily, this is used to store configuration information for various runtimes, but this can also be adapted as a method of storing data.
Notes
The class is intentionally kept as minimal as possible and does not include built-in data verification. You need to implement your own data verification methods if you need that functionality. NestedDictionary class from this library may be of help, as it was explicitly designed to simplify working with complex dictionary structures, such as those obtained by casting a deeply nested dataclass as a dictionary.
To use this class, use it as a superclass for your custom dataclass. This way, the subclass automatically inherits methods to cast itself to .yaml and load itself rom .yaml.
- classmethod from_yaml(file_path)
Instantiates the class using the data loaded from the provided .yaml (YAML) file.
This method is designed to re-initialize dataclasses from the data stored in non-volatile memory as .yaml / .yml files. The method uses dacite, which adds support for complex nested configuration class structures.
Notes
This method disables built-in dacite type-checking before instantiating the class. Therefore, you may need to add explicit type-checking logic for the resultant class instance to verify it was instantiated correctly.
- Parameters:
file_path (
Path
) – The path to the .yaml file to read the class data from.- Return type:
- Returns:
A new dataclass instance created using the data read from the .yaml file.
- Raises:
ValueError – If the provided file path does not point to a .yaml or .yml file.
- to_yaml(file_path)
Converts the class instance to a dictionary and saves it as a .yml (YAML) file at the provided path.
This method is designed to dump the class data into an editable .yaml file. This allows storing the data in non-volatile memory and manually editing the data between save / load cycles.
- Parameters:
file_path (
Path
) – The path to the .yaml file to write. If the file does not exist, it will be created, alongside any missing directory nodes. If it exists, it will be overwritten (re-created). The path has to end with a ‘.yaml’ or ‘.yml’ extension suffix.- Raises:
ValueError – If the output path does not point to a file with a ‘.yaml’ or ‘.yml’ extension.
- Return type:
None
Data Loggers
This module contains the DataLogger class that allows efficiently saving serialized byte-array data collected from different Processes.
DataLogger works by creating the requested number of multithreaded logger processes and exposing a single shared Queue that is used to buffer and pipe the data to be logged to the saver processes. The class is optimized for working with byte-serialized payloads stored in Numpy arrays.
- class ataraxis_data_structures.data_loggers.serialized_data_logger.DataLogger(output_directory, instance_name='data_logger', process_count=1, thread_count=5, sleep_timer=5000, exist_ok=False)
Bases:
object
Saves input data as an uncompressed byte numpy array (.npy) files using the requested number of cores and threads.
This class instantiates and manages the runtime of a logger distributed over the requested number of cores and threads. The class exposes a shared multiprocessing Queue via the ‘input_queue’ property, which can be used to buffer and pipe the data to the logger from other Processes. The class expects the data to be first packaged into LogPackage class instance also available from this library, before it is sent to the logger via the queue object.
Notes
Initializing the class does not start the logger processes! Call start() method to initialize the logger processes.
Once the logger process(es) have been started, the class also initializes and maintains a watchdog thread that monitors the runtime status of the processes. If a process shuts down, the thread will detect this and raise the appropriate error to notify the user. Make sure the main process periodically releases GIL to allow the thread to assess the state of the remote process!
This class is designed to only be instantiated once. However, for particularly demanding use cases with many data producers, the shared Queue may become the bottleneck. In this case, you can initialize multiple DataLogger instances, each using a unique instance_name argument.
Tweak the number of processes and threads as necessary to keep up with the load and share the input_queue of the initialized DataLogger with all classes that need to log serialized data. For most use cases, using a single process (core) with 5-10 threads will be enough to prevent the buffer from filling up. For demanding runtimes, you can increase the number of cores as necessary to keep up with the demand.
This class will log data from all sources and Processes into the same directory to allow for the most efficient post-runtime compression. Since all arrays are saved using the source_id as part of the filename, it is possible to demix the data based on its source during post-processing. Additionally, the sequence numbers of logged arrays are also used in file names to aid sorting saved data.
- Parameters:
output_directory (
Path
) – The directory where the log folder will be created.instance_name (
str
, default:'data_logger'
) – The name of the data logger instance. Critically, this is the name used to initialize the SharedMemory buffer used to control the child processes, so it has to be unique across all other Ataraxis codebase instances that also use shared memory.process_count (
int
, default:1
) – The number of processes to use for logging data.thread_count (
int
, default:5
) – The number of threads to use for logging data. Note, this number of threads will be created for each process.sleep_timer (
int
, default:5000
) – The time in microseconds to delay between polling the queue. This parameter may help with managing the power and thermal load of the cores assigned to the data logger by temporarily suspending their activity. It is likely that delays below 1 millisecond (1000 microseconds) will not produce a measurable impact, as the cores execute a ‘busy’ wait sequence for very short delay periods. Set this argument to 0 to disable delays entirely.exist_ok (
bool
, default:False
) – Determines how the class behaves if a SharedMemory buffer with the same name as the one used by the class already exists. If this argument is set to True, the class will destroy the existing buffer and make a new buffer for itself. If the class is used correctly, the only case where a buffer would already exist is if the class ran into an error during the previous runtime, so setting this to True should be safe for most runtimes.
- _process_count
The number of processes to use for data saving.
- _thread_count
The number of threads to use for data saving. Note, this number of threads will be created for each process.
- _sleep_timer
The time in microseconds to delay between polling the queue.
- _name
Stores the name of the data logger instance.
- _output_directory
The directory where the log folder will be created.
- _started
A boolean flag used to track whether Logger processes are running.
- _mp_manager
A manager object used to instantiate and manage the multiprocessing Queue.
- _input_queue
The multiprocessing Queue used to buffer and pipe the data to the logger processes.
- _logger_processes
A tuple of Process objects, each representing a logger process.
- _terminator_array
A shared memory array used to terminate (shut down) the logger processes.
- _watchdog_thread
A thread used to monitor the runtime status of remote logger processes.
- _exist_ok
Determines how the class handles already existing shared memory buffer errors.
- __del__()
Ensures that logger resources are properly released when the class is garbage collected.
- Return type:
None
- __repr__()
Returns a string representation of the DataLogger instance.
- Return type:
str
- static _log_cycle(input_queue, terminator_array, output_directory, thread_count, sleep_time=1000)
The function passed to Process classes to log the data.
This function sets up the necessary assets (threads and queues) to accept, preprocess, and save the input data as .npy files.
- Parameters:
input_queue (
Queue
) – The multiprocessing Queue object used to buffer and pipe the data to the logger processes.terminator_array (
SharedMemoryArray
) – A shared memory array used to terminate (shut down) the logger processes.output_directory (
Path
) – The path to the directory where to save the data.thread_count (
int
) – The number of threads to use for logging.sleep_time (
int
, default:1000
) – The time in microseconds to delay between polling the queue once it has been emptied. If the queue is not empty, this process will not sleep.
- Return type:
None
- static _save_data(filename, data)
Thread worker function that saves the data.
- Parameters:
filename (
Path
) – The name of the file to save the data to. Note, the name has to be suffix-less, as ‘.npy’ suffix will be appended automatically.data (
ndarray
[tuple
[int
,...
],dtype
[uint8
]]) – The data to be saved, packaged into a one-dimensional bytes array.
- Return type:
None
Since data saving is primarily IO-bound, using multiple threads per each Process is likely to achieve the best saving performance.
- _watchdog()
This function should be used by the watchdog thread to ensure the logger processes are alive during runtime.
This function will raise a RuntimeError if it detects that a process has prematurely shut down. It will verify process states every ~20 ms and will release the GIL between checking the states.
- Return type:
None
- compress_logs(remove_sources=False, memory_mapping=False, verbose=False, compress=True, verify_integrity=False, max_workers=None)
Consolidates all .npy files in the target log directory into a compressed .npz archive for each source_id.
All entries within each source are grouped by their acquisition timestamp value before compression. The compressed archive names include the ID code of the source that generated original log entries. This function can compress any log directory generated by a DataLogger instance and can be used without an initialized DataLogger.
Notes
Primarily, this method functions as a wrapper around the instance-independent ‘compress_npy_logs’ methods exposed by this library. It automatically resolves the path to the uncompressed log directory using instance attributes.
To improve runtime efficiency, the function parallelizes all data processing steps. The exact number of parallel threads used by the function depends on the number of available CPU cores. This number can be further adjusting by modifying the max_workers argument.
This function requires all data from the same source to be loaded into RAM before it is added to the .npz archive. While this should not be an issue for most runtimes and expected use patterns, this function can be configured to use memory-mapping instead of directly loading data into RAM. This has a noticeable processing speed reduction and is not recommended for most users.
Since this function is intended to optimize how logs are stored on disk, it is statically configured to remove the source .npy files after generating compressed .npz entries. As an extra security measure, it is possible to request the function to verify the integrity of the compressed data against the sources before removing source files. It is heavily discouraged however, as this adds a noticeable performance (runtime speed) overhead and data corruption is generally extremely uncommon and unlikely.
Additionally, it is possible to disable log compression and instead just aggregated the log entries into an uncompressed .npz file. This is not recommended, since compression speed is very fast and does not majorly affect the runtime speed, but may noticeably reduce disk usage. However, decompression takes a considerable time, so some processing runtimes may benefit from not compressing the generated log runtimes if fast decompression speed is a priority.
- Parameters:
remove_sources (
bool
, default:False
) – Determines whether to remove the individual .npy files after they have been consolidated into .npz archives.memory_mapping (
bool
, default:False
) – Determines whether the function uses memory-mapping (disk) to stage the data before compression or loads all data into RAM. Disabling this option makes the function considerably faster, but may lead to out-of-memory errors in very rare use cases. Note, due to collisions with Windows not releasing memory-mapped files, this argument does not do anything on Windows.verbose (
bool
, default:False
) – Determines whether to print compression progress to terminal.compress (
bool
, default:True
) – Determines whether to compress the output .npz archive file for each source. While the intention behind this function is to compress archive data, it is possible to use the function to just aggregate the data into .npz files without compression.verify_integrity (
bool
, default:False
) – Determines whether to verify the integrity of compressed data against the original log entries before removing sources. Since it is highly unlikely that compression alters the data, it is recommended to have this option disabled for most runtimes.max_workers (
int
|None
, default:None
) – Determines the number of threads used to carry out various processing phases in-parallel. Note, some processing phases parallelize log source processing and other parallelize log entry processing. Therefore, it is generally desirable to use as many threads as possible. If set to None, the function uses the number of (logical) CPU cores - 2 threads.
- Return type:
None
- property input_queue: Queue
Returns the multiprocessing Queue used to buffer and pipe the data to the logger processes.
Share this queue with all source processes that need to log data. To ensure correct data packaging, package the data using the LogPackage class exposed by this library before putting it into the queue.
- property name: str
Returns the name of the DataLogger instance.
- property output_directory: Path
Returns the path to the directory where the data is saved.
- start()
Starts the logger processes and the assets used to control and ensure the processes are alive.
Once this method is called, data submitted to the ‘input_queue’ of the class instance will be saved to disk via the started Processes.
- Return type:
None
- property started: bool
Returns True if the DataLogger has been started and is actively logging data.
- stop()
Stops the logger processes once they save all buffered data and releases reserved resources.
- Return type:
None
- class ataraxis_data_structures.data_loggers.serialized_data_logger.LogPackage(source_id, time_stamp, serialized_data)
Bases:
object
Stores the data and ID information to be logged by the DataLogger class and exposes methods for packaging this data into the format expected by the logger.
This class collects, preprocesses, and stores the data to be logged by the DataLogger instance. To be logged, entries have to be packed into this class instance and submitted (put) into the logger input queue exposed by the DataLogger class.
Notes
This class is optimized for working with other Ataraxis libraries. It expects the time to come from ataraxis-time (PrecisionTimer) and other data from Ataraxis libraries designed to interface with various hardware.
- get_data()
Constructs and returns the filename and the serialized data package to be logged.
- Return type:
tuple
[str
,ndarray
[tuple
[int
,...
],dtype
[uint8
]]]- Returns:
A tuple of two elements. The first element is the name to use for the log file, which consists of zero-padded source id and zero-padded time stamp, separated by an underscore. The second element is the data to be logged as a one-dimensional bytes numpy array. The logged data includes the original data object and the pre-pended source id and time stamp.
-
serialized_data:
ndarray
[tuple
[int
,...
],dtype
[uint8
]] The data to be logged, stored as a one-dimensional bytes numpy array.
-
source_id:
uint8
The ID code of the source that produced the data. Has to be unique across all systems that send data to same DataLogger instance during runtime, as this information is used to identify sources inside log files!
-
time_stamp:
uint64
The data acquisition time. Tracks when the data was originally acquired.
- ataraxis_data_structures.data_loggers.serialized_data_logger._compare_arrays(source_id, stem, original_array, compressed_array)
Compares a pair of NumPy arrays for exact equality.
This is a service function used during log verification to compare source and compressed log entry data in-parallel.
- Parameters:
source_id (
int
) – The ID-code for the source, whose compressed data is verified by this function.stem (
str
) – The file name of the verified log entry.original_array (
ndarray
[tuple
[int
,...
],dtype
[Any
]]) – The source data array from the .npy file.compressed_array (
ndarray
[tuple
[int
,...
],dtype
[Any
]]) – The compressed array from the .npz archive.
- Raises:
ValueError – If the arrays don’t match.
- Return type:
None
- ataraxis_data_structures.data_loggers.serialized_data_logger._compress_source(output_directory, source_id, source_data, compress)
Compresses all log entries for a single source (producer) into an .npz archive.
This helper function is used during log compression to compress all available sources in parallel. If compression is enabled, the function uses the default NumPy compression method (Deflate), which typically has a fast compression speed, but very slow decompression speed.
Notes
Depending on the ‘compression’ flag, this function can be used to either aggregate the log entries into a file or to both aggregate and compress the entries. While it is recommended to always compress the log entries, this is not required.
- Parameters:
source_id (
int
) – The ID-code for the source whose data will be compressed by the function.source_data (
dict
[str
,ndarray
[tuple
[int
,...
],dtype
[Any
]]]) – A dictionary that uses log-entries (entry names) as keys and stores the loaded or memory-mapped source data as a NumPy array value for each key.compress (
bool
) – Determines whether to compress the output archive. If this flag is false, the data is saved as an uncompressed .npz archive. Note, compression speed is typically very fast, so it is advised to have this enabled for all use cases.the (verify_integrity; Determines whether to verify the integrity of the compressed log entries against) – original data before removing the source files. This is only used if remove_sources is True.
- Return type:
tuple
[int
,Path
]- Returns:
A tuple of two elements. The first element contains the archive file stem (file name without extension), and the second element contains the path to the compressed log file.
- ataraxis_data_structures.data_loggers.serialized_data_logger._load_numpy_archive(file_path)
Loads a numpy .npz archive containing multiple arrays as a dictionary.
This is a service function used during compressed log verification to load all entries from a compressed log archive into memory in-parallel. To achieve the best runtime performance, this function should be passed to a process executor. Assuming archives are compressed with Deflate (default behavior of the log compression method), this is usually the longest step of the log processing sequence.
- Parameters:
file_path (
Path
) – the path to the .npz log archive to load.- Return type:
dict
[str
,ndarray
[tuple
[int
,...
],dtype
[Any
]]]- Returns:
A dictionary that uses log entry names as keys and loaded log entry data arrays as values.
- ataraxis_data_structures.data_loggers.serialized_data_logger._load_numpy_files(file_paths, mem_map=False)
Loads multiple .npy files either into memory or as a memory-mapped array.
This is a service function used during log compression to load all raw log files into memory in-parallel for faster processing. This function should be used by a parallel executor to process the entire raw .npy dataset evenly split between all available workers to achieve maximum loading speed.
- Parameters:
file_paths (
tuple
[Path
,...
]) – The paths to the .npy files to load.mem_map (
bool
, default:False
) – Determines whether to memory-map the files or load them into RAM.
- Return type:
tuple
[tuple
[str
,...
],tuple
[ndarray
[tuple
[int
,...
],dtype
[Any
]],...
]]- Returns:
A tuple of two elements. The first element stores a tuple of loaded file names (without extension), and the second stores a tuple of loaded data arrays.
- ataraxis_data_structures.data_loggers.serialized_data_logger.compress_npy_logs(log_directory, remove_sources=False, memory_mapping=False, verbose=False, compress=True, verify_integrity=False, max_workers=None)
Consolidates all .npy files in the target log directory into a compressed .npz archive for each source_id.
All entries within each source are grouped by their acquisition timestamp value before compression. The compressed archive names include the ID code of the source that generated original log entries. This function can compress any log directory generated by a DataLogger instance and can be used without an initialized DataLogger.
Notes
To improve runtime efficiency, the function parallelizes all data processing steps. The exact number of parallel threads used by the function depends on the number of available CPU cores. This number can be further adjusting by modifying the max_workers argument.
This function requires all data from the same source to be loaded into RAM before it is added to the .npz archive. While this should not be an issue for most runtimes and expected use patterns, this function can be configured to use memory-mapping instead of directly loading data into RAM. This has a noticeable processing speed reduction and is not recommended for most users.
Since this function is intended to optimize how logs are stored on disk, it is statically configured to remove the source .npy files after generating compressed .npz entries. As an extra security measure, it is possible to request the function to verify the integrity of the compressed data against the sources before removing source files. It is heavily discouraged however, as this adds a noticeable performance (runtime speed) overhead and data corruption is generally extremely uncommon and unlikely.
Additionally, it is possible to disable log compression and instead just aggregated the log entries into an uncompressed .npz file. This is not recommended, since compression speed is very fast and does not majorly affect the runtime speed, but may noticeably reduce disk usage. However, decompression takes a considerable time, so some processing runtimes may benefit from not compressing the generated log runtimes if fast decompression speed is a priority.
- Parameters:
log_directory (
Path
) – The path to the directory used to store uncompressed log .npy files. Usually, this path is obtained from the ‘output_directory’ property of the DataLogger class.remove_sources (
bool
, default:False
) – Determines whether to remove the individual .npy files after they have been consolidated into .npz archives.memory_mapping (
bool
, default:False
) – Determines whether the function uses memory-mapping (disk) to stage the data before compression or loads all data into RAM. Disabling this option makes the function considerably faster, but may lead to out-of-memory errors in very rare use cases. Note, due to collisions with Windows not releasing memory-mapped files, this argument does not do anything on Windows.verbose (
bool
, default:False
) – Determines whether to print compression progress to terminal.compress (
bool
, default:True
) – Determines whether to compress the output .npz archive file for each source. While the intention behind this function is to compress archive data, it is possible to use the function to just aggregate the data into .npz files without compression.verify_integrity (
bool
, default:False
) – Determines whether to verify the integrity of compressed data against the original log entries before removing sources. Since it is highly unlikely that compression alters the data, it is recommended to have this option disabled for most runtimes.max_workers (
int
|None
, default:None
) – Determines the number of threads used to carry out various processing phases in-parallel. Note, some processing phases parallelize log source processing and other parallelize log entry processing. Therefore, it is generally desirable to use as many threads as possible. If set to None, the function uses the number of (logical) CPU cores - 2 threads.
- Return type:
None