API Reference

Algorithm Utils

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class algorithm_utils.AlgorithmDeployer.AlgorithmDeployer(algorithm_directory: str)

The AlgorithmDeployer class is used to deploy an algorithm to the algorithm store in the database. The algorithm directory gets automatically separated into the algorithm module by detecting all .py files and zipping them as a python module and storing them in the module-store collection. Files other than .py files are stored as assets in the asset-store collection. The algorithm metadata is stored in the algorithm-store collection as a json file. The json file contains the algorithm name, major and minor version, the module id, the assets dictionary and the timestamp of when the algorithm was stored.

Parameters:

algorithm_directory (str) – The path to the algorithm directory.

static calculate_etag(file: bytes) str

Calculate the etag hash of a file.

Parameters:

file (bytes) – The file.

Returns:

The etag hash.

Return type:

str

static check_if_zip_is_importable(path_to_zip: str) bool

Check if a a zipped module is importable. :param path_to_zip: The path to the zip file. :type path_to_zip: str

Returns:

True if the module is importable, False otherwise.

Return type:

bool

static find_other_than_py_files(directory: str, ignore_pycache: bool = True, ignore_gitignore: bool = True) list[str]

Find all the files in a directory other than .py files.

Parameters:
  • directory (str) – The directory to search.

  • ignore_pycache (bool, optional) – Whether to ignore the __pycache__ directory. The default is True.

  • ignore_gitignore (bool, optional) – Whether to ignore the .gitignore file. The default is True.

Returns:

The list of files other than .py files.

Return type:

list[str]

static find_py_files(directory: str, ignore_pycache: bool = True) list[str]

Find all the .py files in a directory recursively.

Parameters:
  • directory (str) – The directory to search.

  • ignore_pycache (bool) – Whether to ignore __pycache__ directory

Returns:

The list of .py files.

Return type:

list[str]

static generate_uuid(version: int = 1) str

Generate a uuid.

Parameters:

version (int, optional) – The version of the uuid. The default is 1.

Returns:

The uuid.

Return type:

str

Raises:

ValueError – if version of the uuid is not 1 or 4.

static hash_directory(directory: str) str

Compute the md5 hash of a directory based on its contents.

Parameters:

directory (str) – The path to the directory.

Returns:

The md5 hash.

Return type:

str

static hash_py_file(file: str) str

Compute the md5 hash of a .py file.

Parameters:

file (str) – The path to the file.

Returns:

The md5 hash.

Return type:

str

parse_pyproject_toml(path_to_algorithm_directory: str) dict

Parse the pyproject.toml file in the algorithm directory to get the algorithm name, major version and minor version.

Parameters:

path_to_algorithm_directory (str) – The path to the algorithm directory.

Returns:

The algorithm name, major version and minor version.

Return type:

dict

Raises:

FileNotFoundError – If pyproject.toml not found in algorithm directory.

static process_path_to_dict_key(path: str) str

This method takes a path to a file specified as directory and substitutes any backslashes and double backslashes with forward slashes. It also removes the leading forward slash if it exists. This is necessary to store the directory structure as a dictionary key which can then be accessed by the server independently of the operating system, where the deployment is performed.

Parameters:

path (str) – A path to a file.

Returns:

The processed path with forward slashes and without the leading forward slash.

Return type:

str

store_algorithm(database_connection: BaseConnection | None = None, separate_runner_path: str | None = None, algorithm_collection_name: str = 'algorithm-store', module_collection_name: str = 'module-store', asset_collection_name: str = 'asset-store') str

Store the algorithm to the algorithm store.

Parameters:
  • database_connection (BaseConnection.BaseConnection | None) – The database connection object. Can be None if the algorithm is not supposed to be stored in the database (e.g. for local testing and development).

  • separate_runner_path (str | None , optional) – The path to the runner file. Use this if the runner file is not in the root of the algorithm directory. The default is None.

  • algorithm_collection_name (str, optional) – The name of the collection to store the algorithm. The default is “algorithm-store”.

  • module_collection_name (str, optional) – The name of the collection to store the module. The default is “module-store”.

  • asset_collection_name (str, optional) – The name of the collection to store the assets. The default is “asset-store”.

Returns:

algorithm id

Return type:

str

Raises:

Exception – if algorithm module or assets store failed

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class algorithm_utils.AlgorithmManager.AlgorithmManager(database_connection: BaseConnection, algorithms_collection: str = 'algorithm-store', module_collection: str = 'module-store', assets_collection: str = 'asset-store')

This class is responsible for managing the algorithms, modules and assets in the database. It provides methods to list or delete algorithms, modules and assets. To store the algorithms, modules and assets, use the AlgorithmDeployer class.

Parameters:
  • database_connection (BaseConnection.BaseConnection) – The database connection to use for the operations.

  • algorithms_collection (str) – The name of the collection where the algorithms are stored.

  • module_collection (str) – The name of the collection where the modules are stored.

  • assets_collection (str) – The name of the collection where the assets are stored.

delete_algorithms(name: str | None = None, major_version: str | None = None, minor_version: str | None = None) None

Delete an algorithm and associated modules and assets.

Parameters:
  • name (str | None) – The name of the algorithm to delete.

  • major_version (str | None) – The major version of the algorithm to delete.

  • minor_version (str | None) – The minor version of the algorithm to delete.

Return type:

None

Raises:

ValueError – if name, major_version or minor_version is not specified

list_algorithms(name: str | None = None, major_version: str | None = None, minor_version: str | None = None) list[dict]

List all algorithms stored in the database. Optionally can filter by name, major version or minor version.

Parameters:
  • name (str | None, optional) – Can be used to filter the algorithms by name.

  • major_version (str | None, optional) – Can be used to filter the algorithms by major version.

  • minor_version (str | None, optional) – Can be used to filter the algorithms by minor version.

Returns:

The list of algorithms defined by their jsons

Return type:

list[dict]

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class algorithm_utils.BaseRunner.BaseRunner

Base class for all runners. Specifies the architecture of a runner and the required methods.

When implementing a new runner, the following methods need to be implemented: - preprocess: Preprocess the input data. - inference: Run the inference on the output of the preprocessing. - postprocess: Postprocess the output of the inference.

property device: str

Get the device on which the model and inference will be run. This is set during the initialization of the runner.

Returns:

The device that will be used to run the model and inference

Return type:

str

fetch_asset(asset_path: str) BytesIO

Fetches an asset as bytes from the database by its path relative to the algorithm Runner class.

Parameters:

asset_path (str) – TThe path to the asset relative to the algorithm Runner class. e.g. “files/weights.pth”

Returns:

The asset as bytes.

Return type:

io.BytesIO

fetch_data(file_ids: list[dict], pydantic_data_schema: Type[DataSchema], *keys: str, parallel: bool = False) list[dict]

Fetches the data from the database. A pydantic schema must be provided to validate the data. The data is fetches as a list of dictionaries, where each dictionary represents a dataset. Specific keys can be provided to fetch from the HDF5 file, if not provided, all keys will be fetched. This method is wrapper around the fetch_data method of the TaskHandler class.

Parameters:
  • file_ids (list[dict]) – The identifiers of the data files in the database.

  • pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.

  • *keys (str) – Optional keys to fetch from the HDF5 file, if not provided, all keys will be fetched.

  • parallel (bool, optional) – If True, the data will be fetched in parallel. Default is False.

Returns:

List of the datasets fetched from the database as dictionaries.

Return type:

list[dict]

abstractmethod inference(data: Any, args: dict = {}) Any

Run the inference.

Parameters:
  • data (Any) – The input data.

  • args (dict) – Additional arguments.

Returns:

The output data.

Return type:

Any

Raises:

NotImplementedError

inference_base(data: Any, args: dict = {}) Any

Run the inference.

Parameters:
  • data (Any) – The input data.

  • args (dict) – Additional arguments.

Returns:

The output data.

Return type:

Any

initialize(device: str | None = None) None

Initialize the runner with the given device. This method is called by the TaskHandler when the algorithm is fetched. It is used to set the device on which the model and inference will be run.

Parameters:

device (str | None) – The device on which the model and inference will be run. e.g. “cpu”, “cuda:0” or “cuda:1”. This is set during the initialization of the runner.

Return type:

None

load_assets()

This method should be overridden to load all necessary assets for the algorithm, such as trained models, precomputed data, or other resources.

Assets must be loaded using self.fetch_asset() instead of accessing the file system directly. All assets should be stored as attributes on the runner instance.

WARNING: The attributes set in this method will be protected against reassignment in other parts of the code, so they should not be modified after this method is called. However, this protection does not hold for mutating mutable types with in-place operations (e.g., appending to a list or modifying a dictionary). If you need to modify such attributes, consider using a different approach.

load_item_from_session(key: str) Any

Fetch an item from the session cache.

Parameters:

key (str) – The key to fetch the item.

Returns:

The item fetched from the session cache.

Return type:

Any

log_message(message: str, logging_level: str = 'INFO') None

Log a message.

Parameters:
  • message (str) – The message to log.

  • logging_level (str) – The logging level as defined in the logging module. Default is “INFO”.

Return type:

None

Raises:

ValueError – If an invalid logging level is provided.

post_data(data: list[dict], pydantic_data_schema: Type[DataSchema], parallel: bool = False) list[str]

Uploads a list of datasets to the database. The dataset is a dictionary where the keys are the names of the datasets and the values are the datasets themselves (e.g. numpy arrays). A pydantic schema must be provided to validate the data before uploading. The data is uploaded as HDF5 files. This method is wrapper around the post_data method of the TaskHandler class.

Parameters:
  • data (list[dict]) – List of the datasets to upload. Each dataset is a defined as a dictionary.

  • pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.

  • parallel (bool, optional) – If True, the data will be uploaded in parallel. Default is False.

Returns:

List of the identifiers of the uploaded datasets.

Return type:

list[str]

abstractmethod postprocess(data: Any, args: dict = {}) list[str]

Postprocess the output data.

Parameters:
  • data (Any) – The input data.

  • args (dict) – Additional arguments.

Returns:

The ids of the output datasets.

Return type:

list[str]

Raises:

NotImplementedError

postprocess_base(data: Any, args: dict = {}) list[str]

Postprocess the output data.

Parameters:
  • data (Any) – The input data.

  • args (dict) – Additional arguments.

Returns:

The ids of the output datasets.

Return type:

list[str]

abstractmethod preprocess(input_data: dict, args: dict = {}) Any

Preprocess the input data.

Parameters:
  • input_data (dict) – The input data.

  • args (dict) – Additional arguments.

Returns:

The preprocessed input data.

Return type:

Any

Raises:

NotImplementedError

preprocess_base(input_data: dict, args: dict = {}) Any

Preprocess the input data.

Parameters:
  • input_data (dict) – The input data.

  • args (dict) – The additional arguments

Returns:

The preprocessed input data.

Return type:

Any

remove_item_from_session(key: str) None

Remove an item from the session cache.

Parameters:

key (str) – The key to remove the item.

Return type:

None

run(input_data: dict, args: dict = {}) None

Run the algorithm.

Parameters:
  • input_data (dict) – The input data.

  • args (dict) – Additional arguments.

Return type:

None

Raises:

Exception – If an error occurs during the execution.

property runner_context: dict

Get the current runner context. This is used to access the runner context methods and attributes.

Returns:

current runner context

Return type:

dict

save_item_to_session(obj: Any, key: str) None

Save an item to the session cache.

Parameters:
  • obj (Any) – The item to save.

  • key (str) – The key to save the item.

Return type:

None

set_progress(progress: float) None

Set the progress of the execution. The progress must be a float between 0 and 1.

Parameters:

progress (float) – The progress of the execution.

Raises:

ValueError – If progress is not between 0 a 1 or float

property task_handler: TaskHandler

Get the current task handler. This is used to access the task handler methods and attributes.

Returns:

Current task handler.

Return type:

TaskHandler.TaskHandler

Raises:

ValueError – If task handler is not set.

Tasks

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class tasks.TaskHandler.TaskHandler(task_id: str, database_connection: S3Connection, database_update: bool = True, task_session: TaskSession | None = None)

Task handler class for the execution task. This class is used to update the progress, status and log of the execution task. Also contains methods to fetch the algorithm, assets and data from the database server of choice.

Parameters:
  • task_id (str) – The identifier of the task. Typically a UUID.

  • database_connection (S3Connection) – The database connection object instance. Must inherit from the BaseConnection class and implement the required methods.

  • database_update (bool, optional) – Whether to the execution record in the database, by default True. Can be set to False for example when debugging locally.

  • task_session (TaskSession | None, optional) – The task session object instance. Must inherit from the TaskSession class, by default None.

fetch_algorithm(algorithm_id: str, execution_device_override: str | None = None) object

Fetches the algorithm from the database and imports its corresponding Python module and runner class.

Parameters:
  • algorithm_id (str) – The id of the algorithm.

  • execution_device_override (str | None, optional) – The computing device override, by default None. If provided, the device will be set to the computing device override (if the device is supported and available).

Returns:

The algorithm Runner object.

Return type:

object

Raises:

ValueError – If fetch algorithm failed.

fetch_asset(asset_path: str) BytesIO

Fetches an asset as bytes from the database by its path relative to the algorithm Runner class.

Parameters:

asset_path (str) – The path to the asset relative to the algorithm Runner class. e.g. “files/weights.pth”

Returns:

The asset as bytes.

Return type:

io.BytesIO

Raises:

ValueError – If fetch asset failed.

fetch_data(file_ids: list[dict], pydantic_data_schema: Type[DataSchema], *keys: str, parallel: bool = False) list[dict]

Fetches the data from the database. A pydantic schema must be provided to validate the data. The data is fetches as a list of dictionaries, where each dictionary represents a dataset. Specific keys can be provided to fetch from the HDF5 file, if not provided, all keys will be fetched.

Parameters:
  • file_ids (list[dict]) – The identifiers of the data files in the database.

  • pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.

  • *keys (str) – Optional keys to fetch from the HDF5 file, if not provided, all keys will be fetched.

  • parallel (bool, optional) – If True, the data will be fetched in parallel. Default is False.

Returns:

List of the datasets fetched from the database as dictionaries.

Return type:

list[dict]

Raises:

Exception

load_item_from_session(key: str) Any

Load an object from the task session.

Parameters:

key (str) – The key of the object to load.

Returns:

The object loaded from the task session.

Return type:

Any

Raises:
  • Exception – If the task session is not initialized.

  • ValueError – If task session is not initialized.

mark_as_completed(output_dataset_ids: list[str]) None

Mark the task as completed and update its record in the database. This will set the progress to 1.0, the status to “COMPLETED” and the time completed to the current time.

Parameters:

output_dataset_ids (list[str]) – The output dataset identifiers of the task.

Return type:

None

mark_as_failed(e: Exception | None = None) None

Mark the task as failed and update its record in the database. This will set the progress to 1.0, the status to “FAILED” and the time completed to the current time. The exception that caused the task to fail will be logged in the task log.

Parameters:

e (Exception | None, optional) – The exception that caused the task to fail, by default None. It will be logged in the task log.

Return type:

None

property output_dataset_ids

The output dataset identifiers of the task.

Getter:

Returns the output dataset identifiers of the task.

Setter:

Sets the output dataset identifiers of the task.

Type:

list[str]

post_data(result: list[dict], pydantic_data_schema: Type[DataSchema], parallel: bool = False) list[str]

Uploads a list of datasets to the database. The dataset is a dictionary where the keys are the names of the datasets and the values are the datasets themselves (e.g. numpy arrays). A pydantic schema must be provided to validate the data before uploading. The data is uploaded as HDF5 files.

Parameters:
  • result (list[dict]) – The result to upload to the database.

  • pydantic_data_schema (Type[DataSchema]) – The pydantic schema of the data. Must inherit from the DataSchema class.

  • parallel (bool, optional) – If True, the data will be uploaded in parallel. Default is False.

Returns:

The dataset identifiers of the uploaded datasets.

Return type:

list[str]

Raises:

Exception

property progress

The progress of the task in the range [0., 1.].

Getter:

Returns the progress of the task.

Setter:

Sets the progress of the task.

Type:

float

remove_item_from_session(key: str) None

Remove an object from the task session.

Parameters:

key (str) – The key of the object to remove.

Return type:

None

Raises:
  • Exception

  • ValueError – If task session is not initialized.

save_item_to_session(obj: Any, key: str) None

Save an object to the task session.

Parameters:
  • obj (Any) – The object to save.

  • key (str) – The key to save the object under.

Return type:

None

Raises:
  • Exception

  • ValueError – If task session is not initialized.

property session_token

The identifier of the session. Typically a UUID.

Getter:

Returns the session id.

Setter:

Sets the session id.

Type:

str

set_as_current_task_handler() None

Set this task handler as the current task handler in the current_task_handler context variable. This is used to access the current task handler from anywhere in the code.

Return type:

None

property status

The status of the task. e.g. “RUNNING”, “COMPLETED”, “FAILED”

Getter:

Returns the status of the task.

Setter:

Sets the status of the task.

Type:

str

property task_id

The identifier of the task. Typically a UUID.

Getter:

Returns the task id.

Setter:

Sets the task id.

Type:

str

property time_completed

The time the task was completed.

Getter:

Returns the time the task was completed.

Setter:

Sets the time the task was completed.

Type:

str

update_log() None

Update the log of the task in the database. This method is called automatically when the task is completed or failed. It can also be called manually to update the log during the execution of the task.

Return type:

None

Raises:

Exception

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class tasks.DebuggingTaskHandler.DebuggingTaskHandler(task_id: str)

TaskHandler for debugging algorithm runners locally, without the need to have a running server. Works in local filesystem instead of database server.

Parameters:

task_id (str) – The task id.

fetch_algorithm(path_to_algorithm: str, device: str = 'cpu') object

Fetches the algorithm from the local filesystem.

Parameters:
  • path_to_algorithm (str) – The path to the algorithm.

  • device (str) – The device to run the algorithm on.

Returns:

The algorithm runner instance.

Return type:

object

Raises:

ImportError – If algorithm runner could not be imported.

fetch_asset(path_to_asset: str) BytesIO

Fetches the asset from the local filesystem.

Parameters:

path_to_asset (str) – The path to the asset.

Returns:

The asset as a BytesIO object.

Return type:

io.BytesIO

Sessions

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class session.TaskSession.TaskSession(session_token: str | None = None, max_number_of_data_caches: int = 5, max_cache_size: int = 5, max_cache_memory_mb: int | None = None, expire_hours: int = 24, not_implemented: bool = False)

The TaskSession class is used to serve as a common interface for individual TaskHandler instances. The session is identified by a session token. The purpose of session is to mainly handle in-memory data caches for algorithms. This is useful, as in some algorithms, it is necessary to be able to quickly access and modify some data without the need to repeatedly store and fetch the data from the database. The data is stored under in a dictionary-like structure, where the key is the session token and the value is the data cache object. The session token is an unique identifier generated for each session. If the client wishes to continue the session, the session token is passed in execution response, and when the client sends in other requests, they can pass the session in the session_token field. A new session is then created with the session token and with the access to the data stored in the cache under the particular session token.

TODO: this currently only works for a single process. If we want to scale this to multiple processes, we need to use a shared memory object with access across the individual worker nodes.

data_caches

Dictionary storing all session caches. Keys are session tokens.

Type:

dict

Parameters:
  • session_token (str | None) – The identifier of the session. Typically a UUID.

  • max_number_of_data_caches (int) – The maximum number of data caches which will be stored in memory.

  • max_cache_size (int) – The maximum size of the cache.

  • max_cache_memory_mb (int | None) – The maximum memory in MB that the cache can use.

  • expire_hours (int) – The number of hours after which the session expires.

  • not_implemented (bool) – Bool which marks the sessions as not supported. This is used currently used for marking the session as not supported for celery tasks.

add_item(obj: Any, key: str) None

Store the item in the cache.

Parameters:
  • obj (Any) – The item to store.

  • key (str) – The key to store the item with.

Return type:

None

Raises:

NotImplementedError

clear_cache()

Clear the cache.

remove_item(key: str)

Remove the item from the cache.

Parameters:

key (str) – The key to remove.

Raises:

NotImplementedError

property session_token

The identifier of the session. Typically a UUID.

Getter:

Returns the session id.

Setter:

Sets the session id.

Type:

str

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class session.DataCache.DataCache(max_size: int = 5, max_memory_mb: int | None = None)

This class serves as a data cache for the task handler. It is used to store data in memory for quick access and modification. The cache is identified by a key which is used to store and retrieve the data. The cache has a maximum size and memory limit. If the cache exceeds the maximum size, the oldest item is removed. If the cache exceeds the maximum memory limit, the cache is cleared.

Parameters:
  • max_size (int) – The maximum size of the cache.

  • max_memory_mb (int | None) – The maximum memory in MB that the cache can use.

add_item(obj: Any, key: str)

Add an item to the cache.

Parameters:
  • obj (Any) – The item to add to the cache.

  • key (str) – The key of the item.

clear()

Clear the cache.

remove_item(key: str)

Remove an item from the cache.

Parameters:

key (str) – The key of the item to remove.

Database Connection

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class database_connection.BaseConnection.BaseConnection

A generic database connection class. This class is meant to be inherited by specific database connection classes. It defines the methods for interacting with the object storage database. It assumes that the database is structured as a set of collections, where each collection contains a set of objects. The objects can be any type of data, such as files, images, or other objects. The objects are acessed by their names, and the collections are accessed by their names. For example, in an S3 database, the collections would be the buckets, and the objects would be the files in the buckets.

check_collections_exists(collection_names: list[str]) list[bool]

Checks if collections exist.

Parameters:

collection_names (list[str]) – The collection names.

Returns:

The list of booleans indicating if the collections exist.

Return type:

list[bool]

Raises:

NotImplementedError

check_objects_exist(collection_name: str, object_names: list[str]) list[bool]

Checks if objects exist in a collection.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object names.

Returns:

The list of booleans indicating if the objects exist.

Return type:

list[bool]

Raises:

NotImplementedError

create_collections(collection_names: list[str]) None

Creates collections.

Parameters:

collection_names (list[str]) – The collection names.

Raises:

NotImplementedError

delete_collections(collection_names: list[str]) None

Deletes collections.

Parameters:

collection_names (list[str]) – The collection names.

Raises:

NotImplementedError

delete_objects(collection_name: str, object_names: list[str]) None

Deletes objects from a collection.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object names.

Return type:

None

Raises:

NotImplementedError

get_objects(collection_name: str, object_names: list[str]) list[bytes]

Gets objects from a collection.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object names.

Returns:

The list of bytes objects.

Return type:

list[bytes]

Raises:

NotImplementedError

list_collections() list

Lists all object collections.

Returns:

The list of object collections.

Return type:

list

Raises:

NotImplementedError

list_objects(collection_name: str) list[dict] | list[str]

Lists all objects in a collection.

Parameters:

collection_name (str) – The collection name.

Returns:

The list of objects in the collection.

Return type:

list[dict] | list[str]

Raises:

NotImplementedError

put_objects(collection_name: str, object_names: list[str], object: list[bytes] | list[str]) None

Puts objects into a collection.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object names.

  • object (list[bytes] | list[str]) – The byte objects.

Return type:

None

Raises:

NotImplementedError

put_objects_with_duplicity_check(collection_name: str, object_names: list[str], object: list[bytes]) list[bool] | list[str]

Puts objects into a collection with duplicity check. Returns the list of object names, where the objects of which duplicates were found, substituted with the object names of the duplicates.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object names.

  • object (list[bytes]) – The byte objects.

Returns:

The list of object names.

Return type:

list[bool] | list[str]

Raises:

NotImplementedError

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class database_connection.S3Connection.S3Connection(endpoint_url: str, aws_access_key_id: str, aws_secret_access_key: str, region_name: str | None = None, data_store_expire_days: int = 1, collection_prefix: str = '')

A connection class for an S3 object storage database. This class inherits from the BaseConnection class and implements the methods for interacting with an S3 object storage database.

Parameters:
  • endpoint_url (str) – The endpoint URL.

  • aws_access_key_id (str) – The AWS access key ID.

  • aws_secret_access_key (str) – The AWS secret access key.

  • region_name (str | None) – The region name.

  • data_store_expire_days (int) – The number of days after which the objects in the data-store bucket expire. Default is 1.

  • collection_prefix (str) – The prefix for the actual bucket names. The bucket names are constructed as {collection_prefix}{collection_name}. Default is an empty string.

check_collections_exists(collection_names: list[str]) list[bool]

Checks if buckets exist.

Parameters:

collection_names (list[str]) – The collection names.

Returns:

The list of booleans indicating if the collection exist.

Return type:

list[bool]

check_objects_exist(collection_name: str, object_names: list[str]) list[bool]

Checks if objects exist in a bucket.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object keys.

Returns:

The list of booleans indicating if the objects exist.

Return type:

list[bool]

create_collections(collection_names: list[str]) None

Creates collections.

Parameters:

collection_names (list[str]) – The collection names.

delete_collections(collection_names: list[str]) None

Deletes collections.

Parameters:

collection_names (list[str]) – The collection names.

delete_objects(collection_name: str, object_names: list[str]) None

Deletes objects in a collection.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object keys.

generate_presigned_url(client_method: str, collection_name: str, object_name: str, expiration: int = 3600) str

Generate a generic presigned URL.

Parameters:
  • client_method (str) – The S3 client method to use (e.g., ‘get_object’ or ‘put_object’).

  • collection_name (str) – The name of the bucket where the object will be stored.

  • object_name (str) – The key of the object in the bucket.

  • expiration (int, optional) – Time in seconds until the URL expires.

Returns:

A presigned URL.

Return type:

str

get_objects(collection_name: str, object_names: list[str]) list[bytes]

Gets objects from a collection.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object keys.

Returns:

The list of object bytes.

Return type:

list[bytes]

get_presigned_download_url(collection_name: str, object_name: str, expiration: int = 3600) str

Generate a presigned URL for downloading an object.

Parameters:
  • collection_name (str) – The name of the bucket where the object is stored.

  • object_name (str) – The key of the object in the bucket.

  • expiration (int, optional) – Time in seconds until the URL expires.

Returns:

A presigned URL that can be used to download the object.

Return type:

str

get_presigned_upload_url(collection_name: str, object_name: str, expiration: int = 3600) str

Generate a presigned URL for uploading an object.

Parameters:
  • collection_name (str) – The name of the bucket where the object will be stored.

  • object_name (str) – The key of the object in the bucket.

  • expiration (int, optional) – Time in seconds until the URL expires.

Returns:

A presigned URL that can be used to upload the object.

Return type:

str

list_collections() list

Lists all collections.

Returns:

The list of collections.

Return type:

list

list_objects(collection_name: str) list[dict]

Lists all objects in a collection.

Parameters:

collection_name (str) – The collection name.

Returns:

The list of object keys.

Return type:

list[dict]

put_objects(collection_name: str, object_names: list[str], object: list[bytes] | list[str]) None

Puts objects into a collection.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object keys.

  • object (list[bytes] | list[str]) – The byte objects.

put_objects_with_duplicity_check(collection_name: str, object_names: list[str], object: list[bytes]) list[str]

Puts objects into a collection with duplicity check. Returns the list of object keys, where the objects of which duplicates were found, substituted with the object keys of the duplicates. The check is based on the ETag.

Parameters:
  • collection_name (str) – The collection name.

  • object_names (list[str]) – The object names.

  • object (list[bytes]) – The byte objects.

Returns:

The list of object names.

Return type:

list[str]

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class database_connection.TempfileConnection.TempfileConnection(temp_folder_name: str = 'pcb_temp')

A connection class for a local file system “database”. This class inherits from the BaseConnection class and implements the methods for interacting with a local tempfile file structure mimicking an object storage database. Can be used for testing and debugging purposes, or when a real database is not available for local deployment of the application.

Parameters:

temp_folder_name (str) – The name of the temporary folder.

check_collections_exists(collection_names: list[str]) list[bool]

Check if the subdirectories exist in the temporary folder.

Parameters:

collection_names (list[str]) – The subdirectory names.

Returns:

The list of booleans indicating if the subdirectories exist.

Return type:

list[bool]

check_objects_exist(collection_name: str, object_names: list[str]) list[bool]

Check if files exist in a subdirectory.

Parameters:
  • collection_name (str) – The subdirectory name.

  • object_names (list[str]) – The file names.

Returns:

The list of booleans indicating if the files exist.

Return type:

list[bool]

create_collections(collection_names: list[str]) None

Create subdirectories in the temporary folder.

Parameters:

collection_names (list[str]) – The subdirectory names.

delete_collections(collection_names: list[str]) None

Delete the subdirectories in the temporary folder including all files.

Parameters:

collection_names (list[str]) – The subdirectory names.

delete_objects(collection_name: str, object_names: list[str]) None

Delete files in a subdirectory.

Parameters:
  • collection_name (str) – The subdirectory name.

  • object_names (list[str]) – The file names.

get_objects(collection_name: str, object_names: list[str]) list[bytes]

Get files from a subdirectory.

Parameters:
  • collection_name (str) – The subdirectory name.

  • object_names (list[str]) – The file names.

Returns:

The list of file bytes.

Return type:

list[bytes]

list_collections() list

List all subdirectories in the temporary folder.

Returns:

The list of subdirectories.

Return type:

list

list_objects(collection_name: str) list[str]

List all files in a subdirectory.

Parameters:

collection_name (str) – The subdirectory name.

Returns:

The list of files.

Return type:

list[str]

put_objects(collection_name: str, object_names: list[str], object: list[bytes] | list[str]) None

Put files in a subdirectory.

Parameters:
  • collection_name (str) – The subdirectory name.

  • object_names (list[str]) – The file names.

  • object (list[bytes] | list[str]) – The file bytes.

put_objects_with_duplicity_check(collection_name: str, object_names: list[str], object: list[bytes]) list[bool]

Put files in a subdirectory with a check for existing files.

Parameters:
  • collection_name (str) – The subdirectory name.

  • object_names (list[str]) – The file names.

  • object (list[bytes]) – The file bytes.

Returns:

The list of booleans indicating if the files were put.

Return type:

list[bool]

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class database_connection.database_utils.S3FileUploader(s3_client: client, chunk_size: int = 8388608, num_threads: int = 8)

File uploader to S3.

Parameters:
  • s3_client (boto3.client) – The s3 client.

  • chunk_size (int) – The size of the chunks to upload. The default is 8 * 1024 * 1024.

  • num_threads (int) – The number of threads to use. The default is 8.

upload_file_multipart(bytes: bytes, key: str, bucket: str) None

Upload a file to S3 using multipart upload. This is useful for large files. We use a thread pool to upload the file in parallel.

Parameters:
  • bytes (bytes) – The file bytes.

  • key (str) – The key of the file in the bucket.

  • bucket (str) – The bucket name.

upload_part(part: bytes, key: str, bucket: str, part_number: int, upload_id: str) dict

Upload a part of a file to S3.

Parameters:
  • part (bytes) – The part of the file.

  • key (str) – The key of the file in the bucket.

  • bucket (str) – The bucket name.

  • part_number (int) – The part number.

  • upload_id (str) – The upload id.

Return type:

dict

database_connection.database_utils.calculate_etag(bytes_obj: bytes) str

Calculate the etag hash of a file, the etag should be the same as the etag calculate internally by the boto3/minio client

Parameters:

bytes_obj (bytes) – The file bytes to calculate the etag hash of.

Returns:

The etag hash.

Return type:

str

database_connection.database_utils.calculate_etag_multipart(bytes_obj: bytes, chunk_size: int) str

Calculate the etag hash of a file uploaded using multipart upload. The etag should be the same as the etag calculate internally by the boto3/minio client.

Parameters:
  • bytes_obj (bytes) – The file bytes to calculate the etag hash of.

  • chunk_size (int)

Returns:

The etag hash.

Return type:

str

Server Utils

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

server_utils.algorithm_cache(maxsize=None)

A cache decorator for algorithms. The cache is based on the algorithm_id and device. The cache is implemented as a dictionary with a maximum size. When the algorithm is requested the cache is checked and if the algorithm with the same algorithm_id and device is found the algorithm’s Runner object is returned from the cache. If the algorithm is not found in the cache the algorithm is executed and the result is stored in the cache. If the cache size limit is reached the oldest cache entry is invalidated.

maxsizeint, optional

The maximum size of the cache. The default is None.

;

server_utils.calculate_s3_etag(bytes_obj: BytesIO) str

Calculate the etag hash of a file, the etag should be the same as the etag calculate internally by the boto3/minio client

Parameters:

bytes_obj (io.BytesIO) – The file bytes to calculate the etag hash of.

Returns:

The etag hash.

Return type:

str

server_utils.check_and_create_database_collections(collection_names: list[str], database_connection: BaseConnection) list[str]

Checks if the collections exist in the database and creates them if they do not exist.

Parameters:
Returns:

The list of newly created collections.

Return type:

list[str]

server_utils.check_mps_availability() bool

Check if MacOS MPS (Metal Performance Shaders) is available.

Returns:

True if MPS is available, False otherwise.

Return type:

bool

server_utils.check_system_gpu_availability() tuple[bool | None, int | None]

Check if system has GPU support.

Returns:

  • bool | None – True if CUDA is available, False otherwise.

  • int | None – The number of available GPUs.

server_utils.check_torch_with_cuda_available() bool

Check if PyTorch has CUDA support.

Returns:

True if PyTorch has CUDA support, False otherwise.

Return type:

bool

server_utils.data_cache(maxsize=None)

A cache decorator for data. The cache is based on the unique file key. The cache is implemented as a dictionary with a maximum size. When the file is requested the cache is checked and if the file with the same key is found the file is returned from the cache. If the file is not found in the cache the file is read and the result is stored in the cache. If the cache size limit is reached the oldest cache entry is invalidated.

maxsizeint, optional

The maximum size of the cache. The default is None.

;

server_utils.find_algorithm_by_id(algorithm_id: str, bucket_contents: list[dict], separator: str = '~') tuple

Find an algorithm by its id.

Parameters:
  • algorithm_id (str) – The id of the algorithm.

  • bucket_contents (list[dict]) – The bucket contents.

  • separator (str, optional) – The separator between the fields in the key. The default is “~”.

Returns:

The algorithm key, id, name, major version, minor version.

Return type:

tuple

server_utils.generate_uuid(version: int = 1) str

Generate a uuid.

Parameters:

version (int, optional) – The version of the uuid. The default is 1.

Returns:

The uuid.

Return type:

str

Raises:

ValueError – If uuid version is not 1 or 4.

server_utils.get_subprocess_fn()

Get the subprocess function appropriate for the current operating system.

Returns:

A callable object used to launch subprocesses.

Return type:

partial[JobPOpen.JobPOpen]

Raises:

ValueError – If the operating system is not supported.

server_utils.weak_lru(maxsize=128, typed=False)

LRU Cache decorator that keeps a weak reference to “self”

Server Endpoints

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

routers.algorithms_controller.get_algorithm(algorithm_name: str, algorithm_major_version: str, request: Request) AlgorithmRegisteredResponse | FailedAlgorithmRegisteredResponse | JSONResponse

Returns algorithm by its name and version.

Parameters:
  • algorithm_name (str) – Algorithm name.

  • algorithm_major_version (str) – Algorithm version.

  • request (Request) – The request.

Returns:

The algorithm.

Return type:

Union[AlgorithmRegisteredResponse, FailedAlgorithmRegisteredResponse, JSONResponse]

async routers.algorithms_controller.list_model_files(request: Request, positive_tag: List[str] | None = Query([]), negative_tag: List[str] | None = Query([]), algorithm_type: str | None = Query(None), supported_devices: List[str] | None = Query([])) List[S3ModelFileRecord]

Lists all available algorithms.

Parameters:
  • request (Request) – The request.

  • positive_tag (Optional[List[str]] | None) – A list of tags the algorithm must have.

  • negative_tag (Optional[List[str]] | None) – A list of tags the algorithm must not have.

  • algorithm_type (Optional[str] | None) – The type of the algorithm.

  • supported_devices (Optional[List[str]] | None) – The devices the algorithm is compatible with.

Returns:

The list of algorithms.

Return type:

List[S3ModelFileRecord]

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

async routers.file_controller.delete_dataset(id: str, request: Request) ResponseMessage

Deletes a dataset from the database.

Parameters:
  • id (str) – The id of the dataset.

  • request (Request) – The request.

Return type:

ResponseMessage

async routers.file_controller.download_dataset(id: str, request: Request) StreamingResponse

Downloads a dataset from database.

Parameters:
  • id (str) – The id of the dataset.

  • request (Request) – The request.

Returns:

The dataset.

Return type:

StreamingResponse

async routers.file_controller.upload_dataset(request: Request) FileUploadResponse

Uploads an image stack as a hdf5 file to the database.

Parameters:

request (Request) – The request.

Returns:

The file upload response.

Return type:

FileUploadResponse

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

routers.execution_controller.execute_algorithm(request: Request, incoming_execution_request: IncomingExecutionRequest) ExecutionResponse

Executes an algorithm on a dataset.

Parameters:
  • request (Request) – The request.

  • incoming_execution_request (IncomingExecutionRequest) – The incoming execution request.

Returns:

The execution response.

Return type:

ExecutionResponse

Raises:

Exception – If the server backend is not supported or saving the execution record fails.

async routers.execution_controller.get_execution_record(id: str, request: Request) ExecutionRecord

Get execution record by id.

Parameters:
  • id (str) – The id of the execution record.

  • request (Request) – The request.

Returns:

The execution record.

Return type:

ExecutionRecord

Pydantic Models

Copyright 2024 TESCAN 3DIM, s.r.o. All rights reserved

class pydantic_models.Algorithm(*, algorithm_name: str, algorithm_major_version: str)

Algorithm model.

algorithm_name

The name of the algorithm.

Type:

str

algorithm_major_version

The major version of the algorithm.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.AlgorithmRegisteredResponse(*, algorithm_id: str, algorithm_name: str, algorithm_version: str, algorithm_minor_version: str, algorithm_input_queue: str, algorithm_type: str, algorithm_tags: list[str], algorithm_description: str, supported_devices: list[str] = [], default_device: str, additional_parameters: list[AdditionalParameterSchema] = [])

Algorithm registered response model.

algorithm_id

The id of the algorithm.

Type:

str

algorithm_name

The name of the algorithm.

Type:

str

algorithm_version

The major version of the algorithm.

Type:

str

algorithm_minor_version

The minor version of the algorithm.

Type:

str

algorithm_input_queue

The input queue of the algorithm.

Type:

str

algorithm_type

The type of the algorithm.

Type:

str

algorithm_tags

The tags of the algorithm.

Type:

list[str]

algorithm_description

Description of the algorithm.

Type:

str

supported_devices

The supported devices.

Type:

list[str]

default_device

The default device.

Type:

str

additional_parameters

The additional parameters.

Type:

list[AdditionalParameterSchema]

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.ExecutionLogRecord(*, log: str)

Execution log record model.

log

The log.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.ExecutionRecord(*, execution_id: str, algorithm_id: str, input_dataset_ids: list[str], execution_device_override: str | None = None, additional_parameters: dict, session_token: str | None, output_dataset_ids: list[str], status: str, progress: float, time_started: str, time_completed: str, log: str)

Execution record model.

execution_id

The id of the execution.

Type:

str

algorithm_id

The id of the algorithm.

Type:

str

input_dataset_ids

The ids of the input datasets.

Type:

list[str]

execution_device_override

The execution device override.

Type:

Optional[str]

additional_parameters

The additional parameters.

Type:

dict

session_token

The string identifier of the session.

Type:

Union[str, None]

output_dataset_ids

The ids of the output datasets.

Type:

list[str]

status

The status of the execution.

Type:

str

progress

The progress of the execution.

Type:

float

time_started

The time the execution started.

Type:

str

time_completed

The time the execution completed.

Type:

str

log

The log of the execution.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.ExecutionResponse(*, execution_id: str)

Execution response model.

execution_id

The id of the execution.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.FailedAlgorithmRegisteredResponse(*, algorithm_name: str, algorithm_version: str, message: str)

Failed algorithm response model.

algorithm_name

The name of the algorithm.

Type:

str

algorithm_version

The version of the algorithm.

Type:

str

message

The message.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.FileUploadBody(*, file_body: List)

File upload body model.

file_body

The file body.

Type:

List

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.FileUploadResponse(*, file_id: str)

File upload response model.

file_id

The id of the file.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.IncomingExecutionRequest(*, algorithm_id: str, input_dataset_ids: list[str], execution_device_override: str = None, additional_parameters: dict = {}, session_token: str | None = None)

Incoming execution request model.

algorithm_id

The id of the algorithm.

Type:

str

input_dataset_ids

The id of the input dataset.

Type:

list[str]

execution_device_override

The execution device override.

Type:

str

additional_parameters

The additional parameters.

Type:

dict

session_token

The string identifier of the session.

Type:

Union[str, None]

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.MinioServer(*, executable_path: str, storage_path: str, console_address: str, address: str)

Minio server model.

executable_path

The path to the minio executable.

Type:

str

storage_path

The path to the minio storage.

Type:

str

console_address

The address of the minio console.

Type:

str

address

The address of the minio server.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.MinioServerInfo(*, storage_path: str, console_address: str, address: str)

Minio server info model.

storage_path

The path to the minio storage.

Type:

str

console_address

The address of the minio console.

Type:

str

address

The address of the minio server.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.ResponseMessage(*, detail: str | None = None)

Response message model.

detail

The message.

Type:

str | None

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.RootMessage(*, name: str, tags: list[str], group: str, organization: str, domain: str, version: str, cuda_available: bool | None = None, cuda_capable_devices_count: int | None = None)

Root message model.

name

The name of the server.

Type:

str

tags

The server tags.

Type:

list[str]

group

The group.

Type:

str

organization

The organization.

Type:

str

domain

The domain.

Type:

str

version

The version.

Type:

str

cuda_available

If cuda is available.

Type:

bool | None

cuda_capable_devices_count

The number of cuda capable devices.

Type:

int | None

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.S3Bucket(*, bucket_name: str)

S3 bucket model.

bucket_name

The name of the bucket.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.S3ModelFile(*, runner_path: str, algorithm_path: str, algorithm_name: str, algorithm_major_version: str, algorithm_minor_version: str)

S3 model file model.

runner_path

The path to the runner file.

Type:

str

algorithm_path

The path to the algorithm file.

Type:

str

algorithm_name

The name of the algorithm.

Type:

str

algorithm_major_version

The major version of the algorithm.

Type:

str

algorithm_minor_version
Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.S3ModelFileRecord(*, algorithm_key: str)

S3 model file record model.

algorithm_key

The key of the algorithm.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class pydantic_models.UrlResponse(*, url: str)

Url response model.

url

The url.

Type:

str

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].