API Reference
Algorithm Utils
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class algorithm_utils.AlgorithmDeployer.AlgorithmDeployer(algorithm_directory: str)
The AlgorithmDeployer class is used to deploy an algorithm to the algorithm store in the database. The algorithm directory gets automatically separated into the algorithm module by detecting all .py files and zipping them as a python module and storing them in the module-store collection. Files other than .py files are stored as assets in the asset-store collection. The algorithm metadata is stored in the algorithm-store collection as a json file. The json file contains the algorithm name, major and minor version, the module id, the assets dictionary and the timestamp of when the algorithm was stored.
- Parameters:
algorithm_directory (str) – The path to the algorithm directory.
- static calculate_etag(file: bytes) str
Calculate the etag hash of a file.
- Parameters:
file (bytes) – The file.
- Returns:
The etag hash.
- Return type:
str
- static check_if_zip_is_importable(path_to_zip: str) bool
Check if a a zipped module is importable. :param path_to_zip: The path to the zip file. :type path_to_zip: str
- Returns:
True if the module is importable, False otherwise.
- Return type:
bool
- static find_other_than_py_files(directory: str, ignore_pycache: bool = True, ignore_gitignore: bool = True) list[str]
Find all the files in a directory other than .py files.
- Parameters:
directory (str) – The directory to search.
ignore_pycache (bool, optional) – Whether to ignore the __pycache__ directory. The default is True.
ignore_gitignore (bool, optional) – Whether to ignore the .gitignore file. The default is True.
- Returns:
The list of files other than .py files.
- Return type:
list[str]
- static find_py_files(directory: str, ignore_pycache: bool = True) list[str]
Find all the .py files in a directory recursively.
- Parameters:
directory (str) – The directory to search.
ignore_pycache (bool) – Whether to ignore __pycache__ directory
- Returns:
The list of .py files.
- Return type:
list[str]
- static generate_uuid(version: int = 1) str
Generate a uuid.
- Parameters:
version (int, optional) – The version of the uuid. The default is 1.
- Returns:
The uuid.
- Return type:
str
- Raises:
ValueError – if version of the uuid is not 1 or 4.
- static hash_directory(directory: str) str
Compute the md5 hash of a directory based on its contents.
- Parameters:
directory (str) – The path to the directory.
- Returns:
The md5 hash.
- Return type:
str
- static hash_py_file(file: str) str
Compute the md5 hash of a .py file.
- Parameters:
file (str) – The path to the file.
- Returns:
The md5 hash.
- Return type:
str
- parse_pyproject_toml(path_to_algorithm_directory: str) dict
Parse the pyproject.toml file in the algorithm directory to get the algorithm name, major version and minor version.
- Parameters:
path_to_algorithm_directory (str) – The path to the algorithm directory.
- Returns:
The algorithm name, major version and minor version.
- Return type:
dict
- Raises:
FileNotFoundError – If pyproject.toml not found in algorithm directory.
- static process_path_to_dict_key(path: str) str
This method takes a path to a file specified as directory and substitutes any backslashes and double backslashes with forward slashes. It also removes the leading forward slash if it exists. This is necessary to store the directory structure as a dictionary key which can then be accessed by the server independently of the operating system, where the deployment is performed.
- Parameters:
path (str) – A path to a file.
- Returns:
The processed path with forward slashes and without the leading forward slash.
- Return type:
str
- store_algorithm(database_connection: BaseConnection | None = None, separate_runner_path: str | None = None, algorithm_collection_name: str = 'algorithm-store', module_collection_name: str = 'module-store', asset_collection_name: str = 'asset-store') str
Store the algorithm to the algorithm store.
- Parameters:
database_connection (BaseConnection.BaseConnection | None) – The database connection object. Can be None if the algorithm is not supposed to be stored in the database (e.g. for local testing and development).
separate_runner_path (str | None , optional) – The path to the runner file. Use this if the runner file is not in the root of the algorithm directory. The default is None.
algorithm_collection_name (str, optional) – The name of the collection to store the algorithm. The default is “algorithm-store”.
module_collection_name (str, optional) – The name of the collection to store the module. The default is “module-store”.
asset_collection_name (str, optional) – The name of the collection to store the assets. The default is “asset-store”.
- Returns:
algorithm id
- Return type:
str
- Raises:
Exception – if algorithm module or assets store failed
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class algorithm_utils.AlgorithmManager.AlgorithmManager(database_connection: BaseConnection, algorithms_collection: str = 'algorithm-store', module_collection: str = 'module-store', assets_collection: str = 'asset-store')
This class is responsible for managing the algorithms, modules and assets in the database. It provides methods to list or delete algorithms, modules and assets. To store the algorithms, modules and assets, use the AlgorithmDeployer class.
- Parameters:
database_connection (BaseConnection.BaseConnection) – The database connection to use for the operations.
algorithms_collection (str) – The name of the collection where the algorithms are stored.
module_collection (str) – The name of the collection where the modules are stored.
assets_collection (str) – The name of the collection where the assets are stored.
- delete_algorithms(name: str | None = None, major_version: str | None = None, minor_version: str | None = None) None
Delete an algorithm and associated modules and assets.
- Parameters:
name (str | None) – The name of the algorithm to delete.
major_version (str | None) – The major version of the algorithm to delete.
minor_version (str | None) – The minor version of the algorithm to delete.
- Return type:
None
- Raises:
ValueError – if name, major_version or minor_version is not specified
- list_algorithms(name: str | None = None, major_version: str | None = None, minor_version: str | None = None) list[dict]
List all algorithms stored in the database. Optionally can filter by name, major version or minor version.
- Parameters:
name (str | None, optional) – Can be used to filter the algorithms by name.
major_version (str | None, optional) – Can be used to filter the algorithms by major version.
minor_version (str | None, optional) – Can be used to filter the algorithms by minor version.
- Returns:
The list of algorithms defined by their jsons
- Return type:
list[dict]
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class algorithm_utils.BaseRunner.BaseRunner
Base class for all runners. Specifies the architecture of a runner and the required methods.
When implementing a new runner, the following methods need to be implemented: - preprocess: Preprocess the input data. - inference: Run the inference on the output of the preprocessing. - postprocess: Postprocess the output of the inference.
- property device: str
Get the device on which the model and inference will be run. This is set during the initialization of the runner.
- Returns:
The device that will be used to run the model and inference
- Return type:
str
- fetch_asset(asset_path: str) BytesIO
Fetches an asset as bytes from the database by its path relative to the algorithm Runner class.
- Parameters:
asset_path (str) – TThe path to the asset relative to the algorithm Runner class. e.g. “files/weights.pth”
- Returns:
The asset as bytes.
- Return type:
io.BytesIO
- fetch_data(file_ids: list[dict], pydantic_data_schema: Type[DataSchema], *keys: str, parallel: bool = False) list[dict]
Fetches the data from the database. A pydantic schema must be provided to validate the data. The data is fetches as a list of dictionaries, where each dictionary represents a dataset. Specific keys can be provided to fetch from the HDF5 file, if not provided, all keys will be fetched. This method is wrapper around the fetch_data method of the TaskHandler class.
- Parameters:
file_ids (list[dict]) – The identifiers of the data files in the database.
pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.
*keys (str) – Optional keys to fetch from the HDF5 file, if not provided, all keys will be fetched.
parallel (bool, optional) – If True, the data will be fetched in parallel. Default is False.
- Returns:
List of the datasets fetched from the database as dictionaries.
- Return type:
list[dict]
- abstractmethod inference(data: Any, args: dict = {}) Any
Run the inference.
- Parameters:
data (Any) – The input data.
args (dict) – Additional arguments.
- Returns:
The output data.
- Return type:
Any
- Raises:
NotImplementedError –
- inference_base(data: Any, args: dict = {}) Any
Run the inference.
- Parameters:
data (Any) – The input data.
args (dict) – Additional arguments.
- Returns:
The output data.
- Return type:
Any
- initialize(device: str | None = None) None
Initialize the runner with the given device. This method is called by the TaskHandler when the algorithm is fetched. It is used to set the device on which the model and inference will be run.
- Parameters:
device (str | None) – The device on which the model and inference will be run. e.g. “cpu”, “cuda:0” or “cuda:1”. This is set during the initialization of the runner.
- Return type:
None
- load_assets()
This method should be overridden to load all necessary assets for the algorithm, such as trained models, precomputed data, or other resources.
Assets must be loaded using self.fetch_asset() instead of accessing the file system directly. All assets should be stored as attributes on the runner instance.
WARNING: The attributes set in this method will be protected against reassignment in other parts of the code, so they should not be modified after this method is called. However, this protection does not hold for mutating mutable types with in-place operations (e.g., appending to a list or modifying a dictionary). If you need to modify such attributes, consider using a different approach.
- load_item_from_session(key: str) Any
Fetch an item from the session cache.
- Parameters:
key (str) – The key to fetch the item.
- Returns:
The item fetched from the session cache.
- Return type:
Any
- log_message(message: str, logging_level: str = 'INFO') None
Log a message.
- Parameters:
message (str) – The message to log.
logging_level (str) – The logging level as defined in the logging module. Default is “INFO”.
- Return type:
None
- Raises:
ValueError – If an invalid logging level is provided.
- post_data(data: list[dict], pydantic_data_schema: Type[DataSchema], parallel: bool = False) list[str]
Uploads a list of datasets to the database. The dataset is a dictionary where the keys are the names of the datasets and the values are the datasets themselves (e.g. numpy arrays). A pydantic schema must be provided to validate the data before uploading. The data is uploaded as HDF5 files. This method is wrapper around the post_data method of the TaskHandler class.
- Parameters:
data (list[dict]) – List of the datasets to upload. Each dataset is a defined as a dictionary.
pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.
parallel (bool, optional) – If True, the data will be uploaded in parallel. Default is False.
- Returns:
List of the identifiers of the uploaded datasets.
- Return type:
list[str]
- abstractmethod postprocess(data: Any, args: dict = {}) list[str]
Postprocess the output data.
- Parameters:
data (Any) – The input data.
args (dict) – Additional arguments.
- Returns:
The ids of the output datasets.
- Return type:
list[str]
- Raises:
NotImplementedError –
- postprocess_base(data: Any, args: dict = {}) list[str]
Postprocess the output data.
- Parameters:
data (Any) – The input data.
args (dict) – Additional arguments.
- Returns:
The ids of the output datasets.
- Return type:
list[str]
- abstractmethod preprocess(input_data: dict, args: dict = {}) Any
Preprocess the input data.
- Parameters:
input_data (dict) – The input data.
args (dict) – Additional arguments.
- Returns:
The preprocessed input data.
- Return type:
Any
- Raises:
NotImplementedError –
- preprocess_base(input_data: dict, args: dict = {}) Any
Preprocess the input data.
- Parameters:
input_data (dict) – The input data.
args (dict) – The additional arguments
- Returns:
The preprocessed input data.
- Return type:
Any
- remove_item_from_session(key: str) None
Remove an item from the session cache.
- Parameters:
key (str) – The key to remove the item.
- Return type:
None
- run(input_data: dict, args: dict = {}) None
Run the algorithm.
- Parameters:
input_data (dict) – The input data.
args (dict) – Additional arguments.
- Return type:
None
- Raises:
Exception – If an error occurs during the execution.
- property runner_context: dict
Get the current runner context. This is used to access the runner context methods and attributes.
- Returns:
current runner context
- Return type:
dict
- save_item_to_session(obj: Any, key: str) None
Save an item to the session cache.
- Parameters:
obj (Any) – The item to save.
key (str) – The key to save the item.
- Return type:
None
- set_progress(progress: float) None
Set the progress of the execution. The progress must be a float between 0 and 1.
- Parameters:
progress (float) – The progress of the execution.
- Raises:
ValueError – If progress is not between 0 a 1 or float
- property task_handler: TaskHandler
Get the current task handler. This is used to access the task handler methods and attributes.
- Returns:
Current task handler.
- Return type:
- Raises:
ValueError – If task handler is not set.
Tasks
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class tasks.TaskHandler.TaskHandler(task_id: str, database_connection: S3Connection, database_update: bool = True, task_session: TaskSession | None = None)
Task handler class for the execution task. This class is used to update the progress, status and log of the execution task. Also contains methods to fetch the algorithm, assets and data from the database server of choice.
- Parameters:
task_id (str) – The identifier of the task. Typically a UUID.
database_connection (S3Connection) – The database connection object instance. Must inherit from the BaseConnection class and implement the required methods.
database_update (bool, optional) – Whether to the execution record in the database, by default True. Can be set to False for example when debugging locally.
task_session (TaskSession | None, optional) – The task session object instance. Must inherit from the TaskSession class, by default None.
- fetch_algorithm(algorithm_id: str, execution_device_override: str | None = None) object
Fetches the algorithm from the database and imports its corresponding Python module and runner class.
- Parameters:
algorithm_id (str) – The id of the algorithm.
execution_device_override (str | None, optional) – The computing device override, by default None. If provided, the device will be set to the computing device override (if the device is supported and available).
- Returns:
The algorithm Runner object.
- Return type:
object
- Raises:
ValueError – If fetch algorithm failed.
- fetch_asset(asset_path: str) BytesIO
Fetches an asset as bytes from the database by its path relative to the algorithm Runner class.
- Parameters:
asset_path (str) – The path to the asset relative to the algorithm Runner class. e.g. “files/weights.pth”
- Returns:
The asset as bytes.
- Return type:
io.BytesIO
- Raises:
ValueError – If fetch asset failed.
- fetch_data(file_ids: list[dict], pydantic_data_schema: Type[DataSchema], *keys: str, parallel: bool = False) list[dict]
Fetches the data from the database. A pydantic schema must be provided to validate the data. The data is fetches as a list of dictionaries, where each dictionary represents a dataset. Specific keys can be provided to fetch from the HDF5 file, if not provided, all keys will be fetched.
- Parameters:
file_ids (list[dict]) – The identifiers of the data files in the database.
pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.
*keys (str) – Optional keys to fetch from the HDF5 file, if not provided, all keys will be fetched.
parallel (bool, optional) – If True, the data will be fetched in parallel. Default is False.
- Returns:
List of the datasets fetched from the database as dictionaries.
- Return type:
list[dict]
- Raises:
Exception –
- load_item_from_session(key: str) Any
Load an object from the task session.
- Parameters:
key (str) – The key of the object to load.
- Returns:
The object loaded from the task session.
- Return type:
Any
- Raises:
Exception – If the task session is not initialized.
ValueError – If task session is not initialized.
- mark_as_completed(output_dataset_ids: list[str]) None
Mark the task as completed and update its record in the database. This will set the progress to 1.0, the status to “COMPLETED” and the time completed to the current time.
- Parameters:
output_dataset_ids (list[str]) – The output dataset identifiers of the task.
- Return type:
None
- mark_as_failed(e: Exception | None = None) None
Mark the task as failed and update its record in the database. This will set the progress to 1.0, the status to “FAILED” and the time completed to the current time. The exception that caused the task to fail will be logged in the task log.
- Parameters:
e (Exception | None, optional) – The exception that caused the task to fail, by default None. It will be logged in the task log.
- Return type:
None
- property output_dataset_ids
The output dataset identifiers of the task.
- Getter:
Returns the output dataset identifiers of the task.
- Setter:
Sets the output dataset identifiers of the task.
- Type:
list[str]
- post_data(result: list[dict], pydantic_data_schema: Type[DataSchema], parallel: bool = False) list[str]
Uploads a list of datasets to the database. The dataset is a dictionary where the keys are the names of the datasets and the values are the datasets themselves (e.g. numpy arrays). A pydantic schema must be provided to validate the data before uploading. The data is uploaded as HDF5 files.
- Parameters:
result (list[dict]) – The result to upload to the database.
pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.
parallel (bool, optional) – If True, the data will be uploaded in parallel. Default is False.
- Returns:
The dataset identifiers of the uploaded datasets.
- Return type:
list[str]
- Raises:
Exception –
- property progress
The progress of the task in the range [0., 1.].
- Getter:
Returns the progress of the task.
- Setter:
Sets the progress of the task.
- Type:
float
- remove_item_from_session(key: str) None
Remove an object from the task session.
- Parameters:
key (str) – The key of the object to remove.
- Return type:
None
- Raises:
Exception –
ValueError – If task session is not initialized.
- save_item_to_session(obj: Any, key: str) None
Save an object to the task session.
- Parameters:
obj (Any) – The object to save.
key (str) – The key to save the object under.
- Return type:
None
- Raises:
Exception –
ValueError – If task session is not initialized.
- property session_token
The identifier of the session. Typically a UUID.
- Getter:
Returns the session id.
- Setter:
Sets the session id.
- Type:
str
- set_as_current_task_handler() None
Set this task handler as the current task handler in the current_task_handler context variable. This is used to access the current task handler from anywhere in the code.
- Return type:
None
- property status
The status of the task. e.g. “RUNNING”, “COMPLETED”, “FAILED”
- Getter:
Returns the status of the task.
- Setter:
Sets the status of the task.
- Type:
str
- property task_id
The identifier of the task. Typically a UUID.
- Getter:
Returns the task id.
- Setter:
Sets the task id.
- Type:
str
- property time_completed
The time the task was completed.
- Getter:
Returns the time the task was completed.
- Setter:
Sets the time the task was completed.
- Type:
str
- update_log() None
Update the log of the task in the database. This method is called automatically when the task is completed or failed. It can also be called manually to update the log during the execution of the task.
- Return type:
None
- Raises:
Exception –
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class tasks.DebuggingTaskHandler.DebuggingTaskHandler(task_id: str)
TaskHandler for debugging algorithm runners locally, without the need to have a running server. Works in local filesystem instead of database server.
- Parameters:
task_id (str) – The task id.
- fetch_algorithm(path_to_algorithm: str, device: str = 'cpu') object
Fetches the algorithm from the local filesystem.
- Parameters:
path_to_algorithm (str) – The path to the algorithm.
device (str) – The device to run the algorithm on.
- Returns:
The algorithm runner instance.
- Return type:
object
- Raises:
ImportError – If algorithm runner could not be imported.
- fetch_asset(path_to_asset: str) BytesIO
Fetches the asset from the local filesystem.
- Parameters:
path_to_asset (str) – The path to the asset.
- Returns:
The asset as a BytesIO object.
- Return type:
io.BytesIO
Sessions
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class session.TaskSession.TaskSession(session_token: str | None = None, max_number_of_data_caches: int = 5, max_cache_size: int = 5, max_cache_memory_mb: int | None = None, expire_hours: int = 24, not_implemented: bool = False)
The TaskSession class is used to serve as a common interface for individual TaskHandler instances. The session is identified by a session token. The purpose of session is to mainly handle in-memory data caches for algorithms. This is useful, as in some algorithms, it is necessary to be able to quickly access and modify some data without the need to repeatedly store and fetch the data from the database. The data is stored under in a dictionary-like structure, where the key is the session token and the value is the data cache object. The session token is an unique identifier generated for each session. If the client wishes to continue the session, the session token is passed in execution response, and when the client sends in other requests, they can pass the session in the session_token field. A new session is then created with the session token and with the access to the data stored in the cache under the particular session token.
TODO: this currently only works for a single process. If we want to scale this to multiple processes, we need to use a shared memory object with access across the individual worker nodes.
- data_caches
Dictionary storing all session caches. Keys are session tokens.
- Type:
dict
- Parameters:
session_token (str | None) – The identifier of the session. Typically a UUID.
max_number_of_data_caches (int) – The maximum number of data caches which will be stored in memory.
max_cache_size (int) – The maximum size of the cache.
max_cache_memory_mb (int | None) – The maximum memory in MB that the cache can use.
expire_hours (int) – The number of hours after which the session expires.
not_implemented (bool) – Bool which marks the sessions as not supported. This is used currently used for marking the session as not supported for celery tasks.
- add_item(obj: Any, key: str) None
Store the item in the cache.
- Parameters:
obj (Any) – The item to store.
key (str) – The key to store the item with.
- Return type:
None
- Raises:
NotImplementedError –
- clear_cache()
Clear the cache.
- remove_item(key: str)
Remove the item from the cache.
- Parameters:
key (str) – The key to remove.
- Raises:
NotImplementedError –
- property session_token
The identifier of the session. Typically a UUID.
- Getter:
Returns the session id.
- Setter:
Sets the session id.
- Type:
str
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class session.DataCache.DataCache(max_size: int = 5, max_memory_mb: int | None = None)
This class serves as a data cache for the task handler. It is used to store data in memory for quick access and modification. The cache is identified by a key which is used to store and retrieve the data. The cache has a maximum size and memory limit. If the cache exceeds the maximum size, the oldest item is removed. If the cache exceeds the maximum memory limit, the cache is cleared.
- Parameters:
max_size (int) – The maximum size of the cache.
max_memory_mb (int | None) – The maximum memory in MB that the cache can use.
- add_item(obj: Any, key: str)
Add an item to the cache.
- Parameters:
obj (Any) – The item to add to the cache.
key (str) – The key of the item.
- clear()
Clear the cache.
- remove_item(key: str)
Remove an item from the cache.
- Parameters:
key (str) – The key of the item to remove.
Database Connection
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class database_connection.BaseConnection.BaseConnection
A generic database connection class. This class is meant to be inherited by specific database connection classes. It defines the methods for interacting with the object storage database. It assumes that the database is structured as a set of collections, where each collection contains a set of objects. The objects can be any type of data, such as files, images, or other objects. The objects are acessed by their names, and the collections are accessed by their names. For example, in an S3 database, the collections would be the buckets, and the objects would be the files in the buckets.
- check_collections_exists(collection_names: list[str]) list[bool]
Checks if collections exist.
- Parameters:
collection_names (list[str]) – The collection names.
- Returns:
The list of booleans indicating if the collections exist.
- Return type:
list[bool]
- Raises:
NotImplementedError –
- check_objects_exist(collection_name: str, object_names: list[str]) list[bool]
Checks if objects exist in a collection.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object names.
- Returns:
The list of booleans indicating if the objects exist.
- Return type:
list[bool]
- Raises:
NotImplementedError –
- create_collections(collection_names: list[str]) None
Creates collections.
- Parameters:
collection_names (list[str]) – The collection names.
- Raises:
NotImplementedError –
- delete_collections(collection_names: list[str]) None
Deletes collections.
- Parameters:
collection_names (list[str]) – The collection names.
- Raises:
NotImplementedError –
- delete_objects(collection_name: str, object_names: list[str]) None
Deletes objects from a collection.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object names.
- Return type:
None
- Raises:
NotImplementedError –
- get_objects(collection_name: str, object_names: list[str]) list[bytes]
Gets objects from a collection.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object names.
- Returns:
The list of bytes objects.
- Return type:
list[bytes]
- Raises:
NotImplementedError –
- list_collections() list
Lists all object collections.
- Returns:
The list of object collections.
- Return type:
list
- Raises:
NotImplementedError –
- list_objects(collection_name: str) list[dict] | list[str]
Lists all objects in a collection.
- Parameters:
collection_name (str) – The collection name.
- Returns:
The list of objects in the collection.
- Return type:
list[dict] | list[str]
- Raises:
NotImplementedError –
- put_objects(collection_name: str, object_names: list[str], object: list[bytes] | list[str]) None
Puts objects into a collection.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object names.
object (list[bytes] | list[str]) – The byte objects.
- Return type:
None
- Raises:
NotImplementedError –
- put_objects_with_duplicity_check(collection_name: str, object_names: list[str], object: list[bytes]) list[bool] | list[str]
Puts objects into a collection with duplicity check. Returns the list of object names, where the objects of which duplicates were found, substituted with the object names of the duplicates.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object names.
object (list[bytes]) – The byte objects.
- Returns:
The list of object names.
- Return type:
list[bool] | list[str]
- Raises:
NotImplementedError –
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class database_connection.S3Connection.S3Connection(endpoint_url: str, aws_access_key_id: str, aws_secret_access_key: str, region_name: str | None = None, data_store_expire_days: int = 1, collection_prefix: str = '')
A connection class for an S3 object storage database. This class inherits from the BaseConnection class and implements the methods for interacting with an S3 object storage database.
- Parameters:
endpoint_url (str) – The endpoint URL.
aws_access_key_id (str) – The AWS access key ID.
aws_secret_access_key (str) – The AWS secret access key.
region_name (str | None) – The region name.
data_store_expire_days (int) – The number of days after which the objects in the data-store bucket expire. Default is 1.
collection_prefix (str) – The prefix for the actual bucket names. The bucket names are constructed as {collection_prefix}{collection_name}. Default is an empty string.
- check_collections_exists(collection_names: list[str]) list[bool]
Checks if buckets exist.
- Parameters:
collection_names (list[str]) – The collection names.
- Returns:
The list of booleans indicating if the collection exist.
- Return type:
list[bool]
- check_objects_exist(collection_name: str, object_names: list[str]) list[bool]
Checks if objects exist in a bucket.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object keys.
- Returns:
The list of booleans indicating if the objects exist.
- Return type:
list[bool]
- create_collections(collection_names: list[str]) None
Creates collections.
- Parameters:
collection_names (list[str]) – The collection names.
- delete_collections(collection_names: list[str]) None
Deletes collections.
- Parameters:
collection_names (list[str]) – The collection names.
- delete_objects(collection_name: str, object_names: list[str]) None
Deletes objects in a collection.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object keys.
- generate_presigned_url(client_method: str, collection_name: str, object_name: str, expiration: int = 3600) str
Generate a generic presigned URL.
- Parameters:
client_method (str) – The S3 client method to use (e.g., ‘get_object’ or ‘put_object’).
collection_name (str) – The name of the bucket where the object will be stored.
object_name (str) – The key of the object in the bucket.
expiration (int, optional) – Time in seconds until the URL expires.
- Returns:
A presigned URL.
- Return type:
str
- get_objects(collection_name: str, object_names: list[str]) list[bytes]
Gets objects from a collection.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object keys.
- Returns:
The list of object bytes.
- Return type:
list[bytes]
- get_presigned_download_url(collection_name: str, object_name: str, expiration: int = 3600) str
Generate a presigned URL for downloading an object.
- Parameters:
collection_name (str) – The name of the bucket where the object is stored.
object_name (str) – The key of the object in the bucket.
expiration (int, optional) – Time in seconds until the URL expires.
- Returns:
A presigned URL that can be used to download the object.
- Return type:
str
- get_presigned_upload_url(collection_name: str, object_name: str, expiration: int = 3600) str
Generate a presigned URL for uploading an object.
- Parameters:
collection_name (str) – The name of the bucket where the object will be stored.
object_name (str) – The key of the object in the bucket.
expiration (int, optional) – Time in seconds until the URL expires.
- Returns:
A presigned URL that can be used to upload the object.
- Return type:
str
- list_collections() list
Lists all collections.
- Returns:
The list of collections.
- Return type:
list
- list_objects(collection_name: str) list[dict]
Lists all objects in a collection.
- Parameters:
collection_name (str) – The collection name.
- Returns:
The list of object keys.
- Return type:
list[dict]
- put_objects(collection_name: str, object_names: list[str], object: list[bytes] | list[str]) None
Puts objects into a collection.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object keys.
object (list[bytes] | list[str]) – The byte objects.
- put_objects_with_duplicity_check(collection_name: str, object_names: list[str], object: list[bytes]) list[str]
Puts objects into a collection with duplicity check. Returns the list of object keys, where the objects of which duplicates were found, substituted with the object keys of the duplicates. The check is based on the ETag.
- Parameters:
collection_name (str) – The collection name.
object_names (list[str]) – The object names.
object (list[bytes]) – The byte objects.
- Returns:
The list of object names.
- Return type:
list[str]
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class database_connection.TempfileConnection.TempfileConnection(temp_folder_name: str = 'pcb_temp')
A connection class for a local file system “database”. This class inherits from the BaseConnection class and implements the methods for interacting with a local tempfile file structure mimicking an object storage database. Can be used for testing and debugging purposes, or when a real database is not available for local deployment of the application.
- Parameters:
temp_folder_name (str) – The name of the temporary folder.
- check_collections_exists(collection_names: list[str]) list[bool]
Check if the subdirectories exist in the temporary folder.
- Parameters:
collection_names (list[str]) – The subdirectory names.
- Returns:
The list of booleans indicating if the subdirectories exist.
- Return type:
list[bool]
- check_objects_exist(collection_name: str, object_names: list[str]) list[bool]
Check if files exist in a subdirectory.
- Parameters:
collection_name (str) – The subdirectory name.
object_names (list[str]) – The file names.
- Returns:
The list of booleans indicating if the files exist.
- Return type:
list[bool]
- create_collections(collection_names: list[str]) None
Create subdirectories in the temporary folder.
- Parameters:
collection_names (list[str]) – The subdirectory names.
- delete_collections(collection_names: list[str]) None
Delete the subdirectories in the temporary folder including all files.
- Parameters:
collection_names (list[str]) – The subdirectory names.
- delete_objects(collection_name: str, object_names: list[str]) None
Delete files in a subdirectory.
- Parameters:
collection_name (str) – The subdirectory name.
object_names (list[str]) – The file names.
- get_objects(collection_name: str, object_names: list[str]) list[bytes]
Get files from a subdirectory.
- Parameters:
collection_name (str) – The subdirectory name.
object_names (list[str]) – The file names.
- Returns:
The list of file bytes.
- Return type:
list[bytes]
- list_collections() list
List all subdirectories in the temporary folder.
- Returns:
The list of subdirectories.
- Return type:
list
- list_objects(collection_name: str) list[str]
List all files in a subdirectory.
- Parameters:
collection_name (str) – The subdirectory name.
- Returns:
The list of files.
- Return type:
list[str]
- put_objects(collection_name: str, object_names: list[str], object: list[bytes] | list[str]) None
Put files in a subdirectory.
- Parameters:
collection_name (str) – The subdirectory name.
object_names (list[str]) – The file names.
object (list[bytes] | list[str]) – The file bytes.
- put_objects_with_duplicity_check(collection_name: str, object_names: list[str], object: list[bytes]) list[bool]
Put files in a subdirectory with a check for existing files.
- Parameters:
collection_name (str) – The subdirectory name.
object_names (list[str]) – The file names.
object (list[bytes]) – The file bytes.
- Returns:
The list of booleans indicating if the files were put.
- Return type:
list[bool]
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class database_connection.database_utils.S3FileUploader(s3_client: client, chunk_size: int = 8388608, num_threads: int = 8)
File uploader to S3.
- Parameters:
s3_client (boto3.client) – The s3 client.
chunk_size (int) – The size of the chunks to upload. The default is 8 * 1024 * 1024.
num_threads (int) – The number of threads to use. The default is 8.
- upload_file_multipart(bytes: bytes, key: str, bucket: str) None
Upload a file to S3 using multipart upload. This is useful for large files. We use a thread pool to upload the file in parallel.
- Parameters:
bytes (bytes) – The file bytes.
key (str) – The key of the file in the bucket.
bucket (str) – The bucket name.
- upload_part(part: bytes, key: str, bucket: str, part_number: int, upload_id: str) dict
Upload a part of a file to S3.
- Parameters:
part (bytes) – The part of the file.
key (str) – The key of the file in the bucket.
bucket (str) – The bucket name.
part_number (int) – The part number.
upload_id (str) – The upload id.
- Return type:
dict
- database_connection.database_utils.calculate_etag(bytes_obj: bytes) str
Calculate the etag hash of a file, the etag should be the same as the etag calculate internally by the boto3/minio client
- Parameters:
bytes_obj (bytes) – The file bytes to calculate the etag hash of.
- Returns:
The etag hash.
- Return type:
str
- database_connection.database_utils.calculate_etag_multipart(bytes_obj: bytes, chunk_size: int) str
Calculate the etag hash of a file uploaded using multipart upload. The etag should be the same as the etag calculate internally by the boto3/minio client.
- Parameters:
bytes_obj (bytes) – The file bytes to calculate the etag hash of.
chunk_size (int)
- Returns:
The etag hash.
- Return type:
str
Server Utils
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- server_utils.algorithm_cache(maxsize=None)
A cache decorator for algorithms. The cache is based on the algorithm_id and device. The cache is implemented as a dictionary with a maximum size. When the algorithm is requested the cache is checked and if the algorithm with the same algorithm_id and device is found the algorithm’s Runner object is returned from the cache. If the algorithm is not found in the cache the algorithm is executed and the result is stored in the cache. If the cache size limit is reached the oldest cache entry is invalidated.
- maxsizeint, optional
The maximum size of the cache. The default is None.
;
- server_utils.calculate_s3_etag(bytes_obj: BytesIO) str
Calculate the etag hash of a file, the etag should be the same as the etag calculate internally by the boto3/minio client
- Parameters:
bytes_obj (io.BytesIO) – The file bytes to calculate the etag hash of.
- Returns:
The etag hash.
- Return type:
str
- server_utils.check_and_create_database_collections(collection_names: list[str], database_connection: BaseConnection) list[str]
Checks if the collections exist in the database and creates them if they do not exist.
- Parameters:
collection_names (list[str]) – The collection names.
database_connection (BaseConnection.BaseConnection) – The database connection object.
- Returns:
The list of newly created collections.
- Return type:
list[str]
- server_utils.check_mps_availability() bool
Check if MacOS MPS (Metal Performance Shaders) is available.
- Returns:
True if MPS is available, False otherwise.
- Return type:
bool
- server_utils.check_system_gpu_availability() tuple[bool | None, int | None]
Check if system has GPU support.
- Returns:
bool | None – True if CUDA is available, False otherwise.
int | None – The number of available GPUs.
- server_utils.check_torch_with_cuda_available() bool
Check if PyTorch has CUDA support.
- Returns:
True if PyTorch has CUDA support, False otherwise.
- Return type:
bool
- server_utils.data_cache(maxsize=None)
A cache decorator for data. The cache is based on the unique file key. The cache is implemented as a dictionary with a maximum size. When the file is requested the cache is checked and if the file with the same key is found the file is returned from the cache. If the file is not found in the cache the file is read and the result is stored in the cache. If the cache size limit is reached the oldest cache entry is invalidated.
- maxsizeint, optional
The maximum size of the cache. The default is None.
;
- server_utils.find_algorithm_by_id(algorithm_id: str, bucket_contents: list[dict], separator: str = '~') tuple
Find an algorithm by its id.
- Parameters:
algorithm_id (str) – The id of the algorithm.
bucket_contents (list[dict]) – The bucket contents.
separator (str, optional) – The separator between the fields in the key. The default is “~”.
- Returns:
The algorithm key, id, name, major version, minor version.
- Return type:
tuple
- server_utils.generate_uuid(version: int = 1) str
Generate a uuid.
- Parameters:
version (int, optional) – The version of the uuid. The default is 1.
- Returns:
The uuid.
- Return type:
str
- Raises:
ValueError – If uuid version is not 1 or 4.
- server_utils.get_subprocess_fn()
Get the subprocess function appropriate for the current operating system.
- Returns:
A callable object used to launch subprocesses.
- Return type:
partial[JobPOpen.JobPOpen]
- Raises:
ValueError – If the operating system is not supported.
- server_utils.weak_lru(maxsize=128, typed=False)
LRU Cache decorator that keeps a weak reference to “self”
Server Endpoints
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- routers.algorithms_controller.get_algorithm(algorithm_name: str, algorithm_major_version: str, request: Request) AlgorithmRegisteredResponse | FailedAlgorithmRegisteredResponse | JSONResponse
Returns algorithm by its name and version.
- Parameters:
algorithm_name (str) – Algorithm name.
algorithm_major_version (str) – Algorithm version.
request (Request) – The request.
- Returns:
The algorithm.
- Return type:
Union[AlgorithmRegisteredResponse, FailedAlgorithmRegisteredResponse, JSONResponse]
- async routers.algorithms_controller.list_model_files(request: Request, positive_tag: List[str] | None = Query([]), negative_tag: List[str] | None = Query([]), algorithm_type: str | None = Query(None), supported_devices: List[str] | None = Query([])) List[S3ModelFileRecord]
Lists all available algorithms.
- Parameters:
request (Request) – The request.
positive_tag (Optional[List[str]] | None) – A list of tags the algorithm must have.
negative_tag (Optional[List[str]] | None) – A list of tags the algorithm must not have.
algorithm_type (Optional[str] | None) – The type of the algorithm.
supported_devices (Optional[List[str]] | None) – The devices the algorithm is compatible with.
- Returns:
The list of algorithms.
- Return type:
List[S3ModelFileRecord]
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- async routers.file_controller.delete_dataset(id: str, request: Request) ResponseMessage
Deletes a dataset from the database.
- Parameters:
id (str) – The id of the dataset.
request (Request) – The request.
- Return type:
- async routers.file_controller.download_dataset(id: str, request: Request) StreamingResponse
Downloads a dataset from database.
- Parameters:
id (str) – The id of the dataset.
request (Request) – The request.
- Returns:
The dataset.
- Return type:
StreamingResponse
- async routers.file_controller.upload_dataset(request: Request) FileUploadResponse
Uploads an image stack as a hdf5 file to the database.
- Parameters:
request (Request) – The request.
- Returns:
The file upload response.
- Return type:
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- routers.execution_controller.execute_algorithm(request: Request, incoming_execution_request: IncomingExecutionRequest) ExecutionResponse
Executes an algorithm on a dataset.
- Parameters:
request (Request) – The request.
incoming_execution_request (IncomingExecutionRequest) – The incoming execution request.
- Returns:
The execution response.
- Return type:
- Raises:
Exception – If the server backend is not supported or saving the execution record fails.
- async routers.execution_controller.get_execution_record(id: str, request: Request) ExecutionRecord
Get execution record by id.
- Parameters:
id (str) – The id of the execution record.
request (Request) – The request.
- Returns:
The execution record.
- Return type:
Pydantic Models
Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved
- class pydantic_models.Algorithm(*, algorithm_name: str, algorithm_major_version: str)
Algorithm model.
- algorithm_name
The name of the algorithm.
- Type:
str
- algorithm_major_version
The major version of the algorithm.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.AlgorithmRegisteredResponse(*, algorithm_id: str, algorithm_name: str, algorithm_version: str, algorithm_minor_version: str, algorithm_input_queue: str, algorithm_type: str, algorithm_tags: list[str], algorithm_description: str, supported_devices: list[str] = [], default_device: str, additional_parameters: list[AdditionalParameterSchema] = [])
Algorithm registered response model.
- algorithm_id
The id of the algorithm.
- Type:
str
- algorithm_name
The name of the algorithm.
- Type:
str
- algorithm_version
The major version of the algorithm.
- Type:
str
- algorithm_minor_version
The minor version of the algorithm.
- Type:
str
- algorithm_input_queue
The input queue of the algorithm.
- Type:
str
- algorithm_type
The type of the algorithm.
- Type:
str
- algorithm_tags
The tags of the algorithm.
- Type:
list[str]
- algorithm_description
Description of the algorithm.
- Type:
str
- supported_devices
The supported devices.
- Type:
list[str]
- default_device
The default device.
- Type:
str
- additional_parameters
The additional parameters.
- Type:
list[AdditionalParameterSchema]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.ExecutionLogRecord(*, log: str)
Execution log record model.
- log
The log.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.ExecutionRecord(*, execution_id: str, algorithm_id: str, input_dataset_ids: list[str], execution_device_override: str | None = None, additional_parameters: dict, session_token: str | None, output_dataset_ids: list[str], status: str, progress: float, time_started: str, time_completed: str, log: str)
Execution record model.
- execution_id
The id of the execution.
- Type:
str
- algorithm_id
The id of the algorithm.
- Type:
str
- input_dataset_ids
The ids of the input datasets.
- Type:
list[str]
- execution_device_override
The execution device override.
- Type:
Optional[str]
- additional_parameters
The additional parameters.
- Type:
dict
- session_token
The string identifier of the session.
- Type:
Union[str, None]
- output_dataset_ids
The ids of the output datasets.
- Type:
list[str]
- status
The status of the execution.
- Type:
str
- progress
The progress of the execution.
- Type:
float
- time_started
The time the execution started.
- Type:
str
- time_completed
The time the execution completed.
- Type:
str
- log
The log of the execution.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.ExecutionResponse(*, execution_id: str)
Execution response model.
- execution_id
The id of the execution.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.FailedAlgorithmRegisteredResponse(*, algorithm_name: str, algorithm_version: str, message: str)
Failed algorithm response model.
- algorithm_name
The name of the algorithm.
- Type:
str
- algorithm_version
The version of the algorithm.
- Type:
str
- message
The message.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.FileUploadBody(*, file_body: List)
File upload body model.
- file_body
The file body.
- Type:
List
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.FileUploadResponse(*, file_id: str)
File upload response model.
- file_id
The id of the file.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.IncomingExecutionRequest(*, algorithm_id: str, input_dataset_ids: list[str], execution_device_override: str = None, additional_parameters: dict = {}, session_token: str | None = None)
Incoming execution request model.
- algorithm_id
The id of the algorithm.
- Type:
str
- input_dataset_ids
The id of the input dataset.
- Type:
list[str]
- execution_device_override
The execution device override.
- Type:
str
- additional_parameters
The additional parameters.
- Type:
dict
- session_token
The string identifier of the session.
- Type:
Union[str, None]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.MinioServer(*, executable_path: str, storage_path: str, console_address: str, address: str)
Minio server model.
- executable_path
The path to the minio executable.
- Type:
str
- storage_path
The path to the minio storage.
- Type:
str
- console_address
The address of the minio console.
- Type:
str
- address
The address of the minio server.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.MinioServerInfo(*, storage_path: str, console_address: str, address: str)
Minio server info model.
- storage_path
The path to the minio storage.
- Type:
str
- console_address
The address of the minio console.
- Type:
str
- address
The address of the minio server.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.ResponseMessage(*, detail: str | None = None)
Response message model.
- detail
The message.
- Type:
str | None
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.RootMessage(*, name: str, tags: list[str], group: str, organization: str, domain: str, version: str, cuda_available: bool | None = None, cuda_capable_devices_count: int | None = None)
Root message model.
- name
The name of the server.
- Type:
str
- tags
The server tags.
- Type:
list[str]
- group
The group.
- Type:
str
- organization
The organization.
- Type:
str
- domain
The domain.
- Type:
str
- version
The version.
- Type:
str
- cuda_available
If cuda is available.
- Type:
bool | None
- cuda_capable_devices_count
The number of cuda capable devices.
- Type:
int | None
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.S3Bucket(*, bucket_name: str)
S3 bucket model.
- bucket_name
The name of the bucket.
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class pydantic_models.S3ModelFile(*, runner_path: str, algorithm_path: str, algorithm_name: str, algorithm_major_version: str, algorithm_minor_version: str)
S3 model file model.
- runner_path
The path to the runner file.
- Type:
str
- algorithm_path
The path to the algorithm file.
- Type:
str
- algorithm_name
The name of the algorithm.
- Type:
str
- algorithm_major_version
The major version of the algorithm.
- Type:
str
- algorithm_minor_version
- Type:
str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].