Reference

Permissions shortcuts

resolwe.permissions.shortcuts.get_object_perms(obj: Model, user: User | None = None, mock_superuser_permissions: bool = False) List[Dict][source]

Return permissions for given object in Resolwe specific format.

Function returns permissions for given object obj in the following format:

{
    "type": "group"/"user"/"public",
    "id": <group_or_user_id>,
    "name": <group_or_user_name>,
    "username": <username>,
    "permissions": [<first_permission>, <second_permission>,...]
}

For public type id and name keys are omitted.

If user parameter is given, permissions are limited only to given user, groups he belongs to and public permissions.

This function should be only used from Resolwe views: since permissions for the current user (users when user has share permission on the given object) are prefetched, we only iterate through objects here and filter them in Python. Using filter method would result in a new database query.

Parameters:
  • obj – Resolwe’s DB model’s instance

  • user – Django user

  • mock_superuser_permissions – when True return all permissions for users that are superusers

Returns:

list of permissions object in described format

Permissions utils

resolwe.permissions.utils.copy_permissions(src_obj: Model, dest_obj: Model)[source]

Copy permissions form src_obj to dest_obj.

Warning

Existing permissions in dest_obj will we deleted.

Flow Managers

Workflow workload managers.

resolwe.flow.managers.manager

The global manager instance.

Type:

Manager

Dispatcher

class resolwe.flow.managers.dispatcher.Manager(*args, **kwargs)[source]

The manager handles process job dispatching.

Each Data object that’s still waiting to be resolved is dispatched to a concrete workload management system (such as Celery or SLURM). The specific manager for that system (descended from BaseConnector) then handles actual job setup and submission. The job itself is an executor invocation; the executor then in turn sets up a safe and well-defined environment within the workload manager’s task in which the process is finally run.

async communicate(data_id=None, run_sync=False)[source]

Scan database for resolving Data objects and process them.

This is submitted as a task to the manager’s channel workers.

Parameters:
  • data_id – Optional id of Data object which (+ its children) should be processes. If it is not given, all resolving objects are processed.

  • run_sync – If True, wait until all processes spawned from this point on have finished processing.

discover_engines()[source]

Discover configured engines.

Parameters:

executor – Optional executor module override

async drain_messages()[source]

Drain Django Channel messages.

async execution_barrier()[source]

Wait for executors to finish.

At least one must finish after this point to avoid a deadlock.

get_execution_engine(name: str)[source]

Return an execution engine instance.

get_executor()[source]

Return an executor instance.

get_expression_engine(name: str)[source]

Return an expression engine instance.

async handle_control_event(message: dict)[source]

Handle the control event.

The method is called from the channels layer when there is nome change either in the state of the Data object of the executors have finished with processing.

When running in sync state check that all database objects are in final state before raising the execution_barrier.

Channels layer callback, do not call directly.

load_execution_engines(engines: List[dict | str])[source]

Load execution engines.

load_executor(executor_name: str)[source]

Load process executor.

load_expression_engines(engines: List[dict | str])[source]

Load expression engines.

run(data: Data, argv: List)[source]

Select a concrete connector and run the process through it.

Parameters:
  • data – The Data object that is to be run.

  • argv – The argument vector used to spawn the executor.

class resolwe.flow.managers.dispatcher.SettingsJSONifier(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Customized JSON encoder, coercing all unknown types into strings.

Needed due to the class hierarchy coming out of the database, which can’t be serialized using the vanilla json encoder.

default(o)[source]

Try default; otherwise, coerce the object into a string.

Workload Connectors

The workload management system connectors are used as glue between the Resolwe Manager and various concrete workload management systems that might be used by it. Since the only functional requirement is job submission, they can be simple and nearly contextless.

Base Class

class resolwe.flow.managers.workload_connectors.base.BaseConnector[source]

The abstract base class for workload manager connectors.

The main Manager instance in manager uses connectors to handle communication with concrete backend workload management systems, such as Celery and SLURM. The connectors need not worry about how jobs are discovered or how they’re prepared for execution; this is all done by the manager.

cleanup(data_id: int)[source]

Perform final cleanup after data object is finished processing.

submit(data: Data, argv)[source]

Submit the job to the workload management system.

Parameters:
  • data – The Data object that is to be run.

  • argv – The argument vector used to spawn the executor.

Local Connector

class resolwe.flow.managers.workload_connectors.local.Connector[source]

Local connector for job execution.

cleanup(data_id: int)[source]

Cleanup.

submit(data: Data, argv)[source]

Run process locally.

For details, see submit().

Celery Connector

class resolwe.flow.managers.workload_connectors.celery.Connector[source]

Celery-based connector for job execution.

cleanup(data_id: int)[source]

Cleanup.

submit(data: Data, argv)[source]

Run process.

For details, see submit().

Slurm Connector

class resolwe.flow.managers.workload_connectors.slurm.Connector[source]

Slurm-based connector for job execution.

cleanup(data_id: int)[source]

Cleanup.

submit(data: Data, argv)[source]

Run process with SLURM.

For details, see submit().

Kubernetes Connector

class resolwe.flow.managers.workload_connectors.kubernetes.ConfigLocation(value)[source]

The enum specifying where to read the configuration from.

class resolwe.flow.managers.workload_connectors.kubernetes.Connector[source]

Kubernetes-based connector for job execution.

cleanup(data_id: int)[source]

Remove the persistent volume claims created by the executor.

optimize_job_scheduling(data: Data, job_description: dict)[source]

Optimize the scheduling by modifying the job description.

Moodify the job description to be a better fit on the scheduler. Currently this is used to schedule based on the process scheduling class.

start(data: Data, listener_connection: Tuple[str, str, str])[source]

Start process execution.

Construct kubernetes job description and pass it to the kubernetes.

submit(data: Data, argv)[source]

Run process.

For details, see submit().

resolwe.flow.managers.workload_connectors.kubernetes.get_mountable_connectors() Iterable[Tuple[str, BaseStorageConnector]][source]

Iterate through all the storages and find mountable connectors.

Returns:

list of tuples (storage_name, connector).

resolwe.flow.managers.workload_connectors.kubernetes.get_upload_dir() str[source]

Get the upload path.

: returns: the path of the first mountable connector for storage

‘upload’.

Raises:

RuntimeError – if no applicable connector is found.

resolwe.flow.managers.workload_connectors.kubernetes.sanitize_kubernetes_label(label: str, trim_end: bool = True) str[source]

Make sure kubernetes label complies with the rules.

See the URL bellow for details.

https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/

resolwe.flow.managers.workload_connectors.kubernetes.unique_volume_name(base_name: str, data_id: int, postfix: str) str[source]

Get unique persistent volume claim name.

Listener

Consumer

Manager Channels consumer.

class resolwe.flow.managers.consumer.HealtCheckConsumer[source]

Channels consumer for handling health-check events.

check_database() bool[source]

Perform a simple database check.

async health_check(message: dict)[source]

Perform health check.

We are testing the channels layer and database layer. The channels layer is already functioning if this method is called so we have to perform database check.

If the check is successfull touch the file specified in the channels message.

class resolwe.flow.managers.consumer.ManagerConsumer(*args, **kwargs)[source]

Channels consumer for handling manager events.

async control_event(message)[source]

Forward control events to the manager dispatcher.

async resolwe.flow.managers.consumer.exit_consumer()[source]

Cause the synchronous consumer to exit cleanly.

async resolwe.flow.managers.consumer.run_consumer(timeout=None)[source]

Run the consumer until it finishes processing.

Parameters:

timeout – Set maximum execution time before cancellation, or None (default) for unlimited.

async resolwe.flow.managers.consumer.send_event(message)[source]

Construct a Channels event packet with the given message.

Parameters:

message – The message to send to the manager workers.

Utilities

Utilities for using global manager features.

resolwe.flow.managers.utils.disable_auto_calls()[source]

Decorator/context manager which stops automatic manager calls.

When entered, automatic communicate() calls from the Django transaction signal are not done.

Flow Executors

Base Class

class resolwe.flow.executors.run.BaseFlowExecutor(data_id: int, communicator: ZMQCommunicator, listener_connection: Tuple[str, str, str], *args, **kwargs)[source]

Represents a workflow executor.

get_tools_paths()[source]

Get tools paths.

async run()[source]

Execute the script and save results.

async start()[source]

Start process execution.

Flow Executor Preparer

Framework for the manager-resident executor preparation facilities.

class resolwe.flow.executors.prepare.BaseFlowExecutorPreparer[source]

Represents the preparation functionality of the executor.

extend_settings(data_id, files, secrets)[source]

Extend the settings the manager will serialize.

Parameters:
  • data_id – The Data object id being prepared for.

  • files – The settings dictionary to be serialized. Keys are filenames, values are the objects that will be serialized into those files. Standard filenames are listed in resolwe.flow.managers.protocol.ExecutorFiles.

  • secrets – Secret files dictionary describing additional secret file content that should be created and made available to processes with special permissions. Keys are filenames, values are the raw strings that should be written into those files.

get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

get_tools_paths(from_applications=False)[source]

Get tools’ paths.

post_register_hook(verbosity=1)[source]

Run hook after the ‘register’ management command finishes.

Subclasses may implement this hook to e.g. pull Docker images at this point. By default, it does nothing.

prepare_for_execution(data)[source]

Prepare the data object for the execution.

This is mostly needed for the null executor to change the status of the data and worker object to done.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters:
  • data – Data object instance

  • filename – Filename to resolve

Returns:

Resolved filename, which can be used to access the given data file in programs executed using this executor

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters:

filename – Filename to resolve

Returns:

Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Raises:

RuntimeError – when no storage connectors are configured for upload storage or path could not be resolved.

Docker Flow Executor

Preparation

class resolwe.flow.executors.docker.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the docker executor.

get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

post_register_hook(verbosity=1)[source]

Pull Docker images needed by processes after registering.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters:
  • data – Data object instance

  • filename – Filename to resolve

Returns:

Resolved filename, which can be used to access the given data file in programs executed using this executor

Raises:

RuntimeError – when data path can not be resolved.

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters:

filename – Filename to resolve

Returns:

Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Local Flow Executor

class resolwe.flow.executors.local.run.FlowExecutor(*args, **kwargs)[source]

Local dataflow executor proxy.

Preparation

class resolwe.flow.executors.local.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the local executor.

extend_settings(data_id, files, secrets)[source]

Prevent processes requiring access to secrets from being run.

Null Flow Executor

class resolwe.flow.executors.null.run.FlowExecutor(data_id: int, communicator: ZMQCommunicator, listener_connection: Tuple[str, str, str], *args, **kwargs)[source]

Null dataflow executor proxy.

This executor is intended to be used in tests where you want to save the object to the database but don’t need to run it.

Flow Models

Base Model

Base model for all other models.

class resolwe.flow.models.base.BaseModel(*args, **kwargs)[source]

Abstract model that includes common fields for other models.

class Meta[source]

BaseModel Meta options.

contributor

user that created the entry

created

creation date and time

modified

modified date and time

name

object name

save(*args, **kwargs)[source]

Save the model.

slug

URL slug

version

process version

Collection Model

Postgres ORM model for the organization of collections.

class resolwe.flow.models.collection.BaseCollection(*args, **kwargs)[source]

Template for Postgres model for storing a collection.

class Meta[source]

BaseCollection Meta options.

contributor

user that created the entry

created

creation date and time

description

detailed description

descriptor

collection descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

collection descriptor schema

modified

modified date and time

name

object name

save(*args, **kwargs)[source]

Perform descriptor validation and save object.

search

field used for full-text search

slug

URL slug

tags

tags for categorizing objects

version

process version

class resolwe.flow.models.Collection(*args, **kwargs)[source]

Postgres model for storing a collection.

exception DoesNotExist
exception MultipleObjectsReturned
annotation_fields

annotation fields available to samples in this collection

contributor

user that created the entry

created

creation date and time

delete_background()[source]

Delete the object in the background.

description

detailed description

descriptor

collection descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

collection descriptor schema

duplicate(contributor) BackgroundTask[source]

Duplicate (make a copy) object in the background.

duplicated

duplication date and time

is_duplicate()[source]

Return True if collection is a duplicate.

modified

modified date and time

name

object name

objects = <django.db.models.manager.ManagerFromCollectionQuerySet object>

manager

permission_group

permission group for the object

save(*args, **kwargs)[source]

Add required annotation fields to the collection.

search

field used for full-text search

slug

URL slug

tags

tags for categorizing objects

version

process version

Data model

Postgres ORM model for keeping the data structured.

class resolwe.flow.models.Data(*args, **kwargs)[source]

Postgres model for storing data.

exception DoesNotExist
exception MultipleObjectsReturned
STATUS_DIRTY = 'DR'

data object is in dirty state

STATUS_DONE = 'OK'

data object is done

STATUS_ERROR = 'ER'

data object is in error state

STATUS_PREPARING = 'PP'

data object is preparing

STATUS_PROCESSING = 'PR'

data object is processing

STATUS_RESOLVING = 'RE'

data object is being resolved

STATUS_UPLOADING = 'UP'

data object is uploading

STATUS_WAITING = 'WT'

data object is waiting

checksum

checksum field calculated on inputs

collection

collection

contributor

user that created the entry

created

creation date and time

delete(*args, **kwargs)[source]

Delete the data model.

delete_background()[source]

Delete the object in the background.

dependency_status() str | None[source]

Return abstracted status of instance IO dependencies.

Returns:

  • STATUS_ERROR .. one dependency has error status or was deleted

  • STATUS_DONE .. all dependencies have done status

  • None .. other

descriptor

actual descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

data descriptor schema

duplicate(contributor) BackgroundTask[source]

Duplicate (make a copy) object in the background.

duplicated

duplication date and time

entity

entity

finished

process finished date date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

get_resource_limits()[source]

Return the resource limits for this data.

input

actual inputs used by the process

is_duplicate()[source]

Return True if data object is a duplicate.

location

data location

modified

modified date and time

name

object name

named_by_user

track if user set the data name explicitly

objects = <django.db.models.manager.ManagerFromDataQuerySet object>

manager

output

actual outputs of the process

parents

dependencies between data objects

permission_group

permission group for the object

process

process used to compute the data object

process_cores

actual allocated cores

process_error

error log message

process_info

info log message

process_memory

actual allocated memory

process_pid

process id

process_progress

progress

process_rc

return code

process_resources

process requirements overrides

process_warning

warning log message

resolve_secrets()[source]

Retrieve handles for all basic:secret: fields on input.

The process must have the secrets resource requirement specified in order to access any secrets. Otherwise this method will raise a PermissionDenied exception.

Returns:

A dictionary of secrets where key is the secret handle and value is the secret value.

restart(resource_overrides: dict = {})[source]

Restart the data object and all its children.

The status of the data object must be ERROR and stasus of its dependencies must be DONE.

Parameters:

resource_overrides – dictionary mapping ids of data objects to resource overrides.

Raises:
  • RuntimeError – if the object is not in the right state.

  • RuntimeError – when object dependencies are not in the status DONE.

save(render_name=False, *args, **kwargs)[source]

Save the data model.

save_dependencies(instance, schema)[source]

Save data: and list:data: references as parents.

scheduled

date and time when process was dispatched to the scheduling system (set by``resolwe.flow.managers.dispatcher.Manager.run``

search

field used for full-text search

size

total size of data’s outputs in bytes

slug

URL slug

started

process started date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

status

Data status

It can be one of the following:

tags

tags for categorizing objects

validate_change_collection(collection)[source]

Raise validation error if data object cannot change collection.

version

process version

class resolwe.flow.models.DataDependency(*args, **kwargs)[source]

Dependency relation between data objects.

exception DoesNotExist
KIND_IO = 'io'

child uses parent’s output as its input

KIND_SUBPROCESS = 'subprocess'

child was spawned by the parent

exception MultipleObjectsReturned
child

child data object

kind

kind of dependency

parent

parent data object

Entity–relationship model

Postgres ORM to define the entity–relationship model that describes how data objects are related in a specific domain.

class resolwe.flow.models.Entity(*args, **kwargs)[source]

Postgres model for storing entities.

exception DoesNotExist
exception MultipleObjectsReturned
collection

collection to which entity belongs

contributor

user that created the entry

copy_annotations(destination: Entity) List[AnnotationValue][source]

Copy annotation from this entity to the destination.

Raises:

ValidationError – when some of the annotation fields are missing on the destination entity.

created

creation date and time

delete_background()[source]

Delete the object in the background.

description

detailed description

descriptor

collection descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

collection descriptor schema

duplicate(contributor) BackgroundTask[source]

Duplicate (make a copy) object in the background.

duplicated

duplication date and time

get_annotation(path: str, default: Any | None = None) Any[source]

Get the annotation for the given path.

Attr path:

the path to the annotation in the format ‘group.field’.

Attr default:

default value when annotation is not found.

Returns:

value of the annotation or default if not found.

invalid_annotation_fields(annotation_fields=None)[source]

Get the Queryset of invalid annotation fields.

The invalid annotation field is a field that has annotatiton but it is not allowed in the collection this entity belongs to.

Attr annotation_fields:

the iterable containing annotations fields to be checked. When None is given the annotation fields belonging to the entity are checked.

is_duplicate()[source]

Return True if entity is a duplicate.

modified

modified date and time

name

object name

objects = <django.db.models.manager.ManagerFromEntityQuerySet object>

manager

permission_group

permission group for the object

search

field used for full-text search

set_annotation(path: str, value: Any)[source]

Get the annotation for the given path.

Attr path:

the path to the annotation in the format ‘group.field’.

Attr value:

the annotation value.

slug

URL slug

tags

tags for categorizing objects

type

entity type

update_annotations(annotations: dict[str, Any], update=True)[source]

Update annotations with the given values.

When annotation value is set no None it is deleted.

Attr annotations:

the dictionary with annotation values. Keys are annotation paths.

validate_annotations()[source]

Perform streamlined descriptor validation.

Raises:

ValidationError – when annotations do not pass validation. All fields are validated and error messages aggregated into single exception.

version

process version

class resolwe.flow.models.Relation(*args, **kwargs)[source]

Relations between entities.

The Relation model defines the associations and dependencies between entities in a given collection:

{
    "collection": "<collection_id>",
    "type": "comparison",
    "category": "case-control study",
    "entities": [
        {"enetity": "<entity1_id>", "label": "control"},
        {"enetity": "<entity2_id>", "label": "case"},
        {"enetity": "<entity3_id>", "label": "case"}
    ]
}

Relation type defines a specific set of associations among entities. It can be something like group, comparison or series. The relation type is an instance of RelationType and should be defined in any Django app that uses relations (e.g., as a fixture). Multiple relations of the same type are allowed on the collection.

Relation category defines a specific use case. The relation category must be unique in a collection, so that users can distinguish between different relations. In the example above, we could add another comparison relation of category, say Case-case study to compare <entity2> with <entity3>.

Relation is linked to resolwe.flow.models.Collection to enable defining different relations structures in different collections. This also greatly speed up retrieving of relations, as they are envisioned to be mainly used on a collection level.

unit defines units used in partitions where it is applicable, e.g. in relations of type series.

exception DoesNotExist
exception MultipleObjectsReturned
category

category of the relation

collection

collection to which relation belongs

contributor

user that created the entry

created

creation date and time

descriptor

relation descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

relation descriptor schema

entities

partitions of entities in the relation

modified

modified date and time

name

object name

objects = <django.db.models.manager.ManagerFromPermissionQuerySet object>

custom manager with permission filtering methods

permission_group

permission group for the object

save(*args, **kwargs)[source]

Perform descriptor validation and save object.

slug

URL slug

type

type of the relation

unit

unit used in the partitions’ positions (where applicable, e.g. for serieses)

version

process version

class resolwe.flow.models.RelationType(*args, **kwargs)[source]

Model for storing relation types.

exception DoesNotExist
exception MultipleObjectsReturned
name

relation type name

ordered

indicates if order of entities in relation is important or not

DescriptorSchema model

Postgres ORM model for storing descriptors.

class resolwe.flow.models.DescriptorSchema(*args, **kwargs)[source]

Postgres model for storing descriptors.

exception DoesNotExist
exception MultipleObjectsReturned
contributor

user that created the entry

created

creation date and time

description

detailed description

modified

modified date and time

name

object name

permission_group

permission group for the object

schema

user descriptor schema represented as a JSON object

slug

URL slug

version

process version

Process model

Postgres ORM model for storing processes.

class resolwe.flow.models.Process(*args, **kwargs)[source]

Postgres model for storing processes.

exception DoesNotExist
exception MultipleObjectsReturned
PERSISTENCE_CACHED = 'CAC'

cached persistence

PERSISTENCE_RAW = 'RAW'

raw persistence

PERSISTENCE_TEMP = 'TMP'

temp persistence

category

category

contributor

user that created the entry

created

creation date and time

data_name

template for name of Data object created with Process

description

detailed description

entity_always_create

Create new entity, regardless of entity_input or entity_descriptor_schema fields.

entity_descriptor_schema

Slug of the descriptor schema assigned to the Entity created with entity_type.

entity_input

Limit the entity selection in entity_type to a single input.

entity_type

Automatically add Data object created with this process to an Entity object representing a data-flow. If all input Data objects belong to the same entity, add newly created Data object to it, otherwise create a new one.

get_resource_limits(data: Data | None = None)[source]

Get the core count and memory usage limits for this process.

Returns:

A dictionary with the resource limits, containing the following keys:

  • memory: Memory usage limit, in MB. Defaults to 4096 if not otherwise specified in the resource requirements.

  • cores: Core count limit. Defaults to 1.

  • storage: Size (in gibibytes) of temporary volume used for processing in kubernetes. Defaults to 200.

Return type:

dict

input_schema

process input schema (describes input parameters, form layout “Inputs” for Data.input)

Handling:

  • schema defined by: dev

  • default by: user

  • changable by: none

is_active

designates whether this process should be treated as active

modified

modified date and time

name

object name

output_schema

process output schema (describes output JSON, form layout “Results” for Data.output)

Handling:

  • schema defined by: dev

  • default by: dev

  • changable by: dev

Implicitly defined fields (by resolwe.flow.management.commands.register() or resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives):

  • progress of type basic:float (from 0.0 to 1.0)

  • proc of type basic:group containing:

    • stdout of type basic:text

    • rc of type basic:integer

    • task of type basic:string (celery task id)

    • worker of type basic:string (celery worker hostname)

    • runtime of type basic:string (runtime instance hostname)

    • pid of type basic:integer (process ID)

permission_group

permission group for the object

persistence

Persistence of Data objects created with this process. It can be one of the following:

Note

If persistence is set to PERSISTENCE_CACHED or PERSISTENCE_TEMP, the process must be idempotent.

requirements

process requirements

run

process command and environment description for internal use

Handling:

  • schema defined by: dev

  • default by: dev

  • changable by: dev

scheduling_class

process scheduling class

slug

URL slug

type

data type

version

process version

Storage model

Postgres ORM model for storing JSON.

class resolwe.flow.models.Storage(*args, **kwargs)[source]

Postgres model for storing storages.

exception DoesNotExist
exception MultipleObjectsReturned
contributor

user that created the entry

created

creation date and time

data

corresponding data objects

json

actual JSON stored

modified

modified date and time

name

object name

objects = <django.db.models.manager.StorageManagerFromPermissionQuerySet object>

storage manager

slug

URL slug

version

process version

Secret model

Postgres ORM model for storing secrets.

class resolwe.flow.models.Secret(*args, **kwargs)[source]

Postgres model for storing secrets.

ProcessMigrationHistory model

Postgres ORM model for storing proces migration history.

class resolwe.flow.models.ProcessMigrationHistory(*args, **kwargs)[source]

Model for storing process migration history.

exception DoesNotExist
exception MultipleObjectsReturned
created

creation date and time

metadata

migration-specific metadata

migration

migration identifier

DataMigrationHistory model

Postgres ORM model for storing data migration history.

class resolwe.flow.models.DataMigrationHistory(*args, **kwargs)[source]

Model for storing data migration history.

exception DoesNotExist
exception MultipleObjectsReturned
created

creation date and time

metadata

migration-specific metadata

migration

migration identifier

AnnotationGroup model

Postgres ORM model for storing annotation group data.

class resolwe.flow.models.annotations.AnnotationGroup(*args, **kwargs)[source]

Group of annotation fields.

exception DoesNotExist
exception MultipleObjectsReturned
label

the label of the annotation group

name

the name of the annotation group

sort_order

the sorting order among annotation groups

AnnotationField model

Postgres ORM model for storing annotation field data.

class resolwe.flow.models.annotations.AnnotationField(*args, **kwargs)[source]

Annotation field.

exception DoesNotExist
exception MultipleObjectsReturned
static add_to_collection(source: Collection, destination: Collection)[source]

Add fields from the source to the destination collection.

property annotation_type: AnnotationType

Get the field type as enum.

description

user visible field description

classmethod field_from_path(path: str) AnnotationField[source]

Get the field id from the field path.

Raises:

ValidationError – when field does not exist.

group

the annotation group this field belongs to

static group_field_from_path(path: str) List[str][source]

Return the group and field name from path.

classmethod id_from_path(path: str) int | None[source]

Get the field id from the field path.

label

user visible field name

label_by_value(label: str) str[source]

Get the value by label.

When no value is found the label is returned.

name

the name of the annotation fields

required

is this field required

revalidate_values()[source]

Revalidate all annotation values.

Raises:

ValidationError – when validation fails.

save(*args, **kwargs)[source]

Recompute the labels for annotation values if vocabulary changes.

Raises:

ValidationError – when vocabulary changes so that annotation values are no longer valid.

sort_order

the sorting order among annotation fields

type

the type of the annotation field

validator_regex

optional regular expression for validation

vocabulary

optional map of valid values to labels

AnnotationPreset model

Postgres ORM model for storing annotation presets data.

class resolwe.flow.models.annotations.AnnotationPreset(*args, **kwargs)[source]

The named set of annotation fields.

The presets have permissions.

exception DoesNotExist
exception MultipleObjectsReturned
contributor

user that created the entry

created

creation date and time

fields

the fields belonging to this preset

modified

modified date and time

name

object name

permission_group

permission group for the object

slug

URL slug

version

process version

AnnotationValue model

Postgres ORM model for storing annotation values.

class resolwe.flow.models.annotations.AnnotationValue(*args, **kwargs)[source]

The value of the annotation.

exception DoesNotExist
exception MultipleObjectsReturned
entity: Entity

the entity this field belongs to

field: AnnotationField

the field this field belongs to

static from_path(entity_id: int, path: str) AnnotationValue | None[source]

Get the annotation value from the path.

has_permission(permission: Permission, user) bool[source]

Return if user permission on this object.

The permission is checked on the entity.

property label: Any

Return the cached label.

modified

the date when field was last modified

recompute_label()[source]

Recompute label from value and set it to the model instance.

save(*args, **kwargs)[source]

Save the annotation value after validation has passed.

validate()[source]

Validate the given value.

The validation is always done in full, all errors are gathered and sent in the response.

Raises:

ValidationError – when the validation fails.

property value: str | int | float | date

Get the actual value.

History model

Basic model for history tracking.

DataHistory model

Model for tracking changes in Data objects.

class resolwe.flow.models.history.DataHistory(*args, **kwargs)[source]

Track Data history.

exception DoesNotExist
exception MultipleObjectsReturned
datum

Pointer to the data object, can be null

deleted

When the object was deleted

valid

When this object is valid for

CollectionHistory model

Model for tracking changes in Collection objects.

class resolwe.flow.models.history.CollectionHistory(*args, **kwargs)[source]

Track Collection history.

exception DoesNotExist
exception MultipleObjectsReturned
datum

Pointer to the collection object, can be null

deleted

When the object was deleted

valid

When this object is valid for

TrackChange model

Basic model to track changes to particular model fields.

class resolwe.flow.models.history.TrackChange(*args, **kwargs)[source]

Base class to track the changes in the model.

class Meta[source]

TrackChange meta options.

get_value(field_name: str) Any[source]

Return the value used to track changes.

objects: Manager

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

static preprocess_value(instance: Model, field_name: str) Any[source]

Preprocess the value before saving.

classmethod process_change(instance: HistoryMixin, field_name: str)[source]

Create new track change object.

timestamp

Timestamp of the change

DataSlugChange model

Track data slug changes.

class resolwe.flow.models.history.DataSlugChange(*args, **kwargs)[source]

Track data slug changes.

exception DoesNotExist
exception MultipleObjectsReturned
history

Reference to the history object

objects: Manager = <django.db.models.manager.Manager object>

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

timestamp

Timestamp of the change

value: Any

New value.

CollectionSlugChange model

Track collection slug changes.

class resolwe.flow.models.history.CollectionSlugChange(*args, **kwargs)[source]

Track collection slug changes.

exception DoesNotExist
exception MultipleObjectsReturned
history

Reference to the history object

objects: Manager = <django.db.models.manager.Manager object>

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

timestamp

Timestamp of the change

value: Any

New value.

DataNameChange model

Track data name changes.

class resolwe.flow.models.history.DataNameChange(*args, **kwargs)[source]

Track data name changes.

exception DoesNotExist
exception MultipleObjectsReturned
history

Reference to the history object

objects: Manager = <django.db.models.manager.Manager object>

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

timestamp

Timestamp of the change

value: Any

New value.

CollectionNameChange model

Track collection name changes.

class resolwe.flow.models.history.CollectionNameChange(*args, **kwargs)[source]

Track collection name changes.

exception DoesNotExist
exception MultipleObjectsReturned
history

Reference to the history object

objects: Manager = <django.db.models.manager.Manager object>

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

timestamp

Timestamp of the change

value: Any

New value.

CollectionChange model

Track collection changes.

class resolwe.flow.models.history.CollectionChange(*args, **kwargs)[source]

Track collection changes.

exception DoesNotExist
exception MultipleObjectsReturned
history

Reference to the history object

objects: Manager = <django.db.models.manager.Manager object>

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

static preprocess_value(instance: Model, field_name: str) Any[source]

Preprocess the value before saving.

timestamp

Timestamp of the change

value: Any

New value.

SizeChange model

Track size changes.

class resolwe.flow.models.history.SizeChange(*args, **kwargs)[source]

Track size changes.

exception DoesNotExist
exception MultipleObjectsReturned
history

Reference to the history object

objects: Manager = <django.db.models.manager.Manager object>

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

timestamp

Timestamp of the change

value: Any

New value

ProcessingHistory model

Track processing data objects.

class resolwe.flow.models.history.ProcessingHistory(*args, **kwargs)[source]

Track processing changes.

exception DoesNotExist
exception MultipleObjectsReturned
allocated_cpu

Number of CPU allocated to the process

allocated_memory

Amount of memory (in bytes) allocated to the process

get_value(field_name: str) Any[source]

Return the value used to track changes.

history

Reference to the history object

interval

Processing interval

node_cpu

Number of CPU on machine running the process

node_memory

Amount of memory on machine running the process

objects: Manager = <django.db.models.manager.Manager object>

Defined to keep mypy from reporting errors (objects do not exist since class is abstract).

classmethod process_change(instance: HistoryMixin, field_name: str)[source]

Preprocess the value before saving.

timestamp

Timestamp of the change

Utility functions

Flow Utilities

Resolwe Exceptions Utils

Utils functions for working with exceptions.

resolwe.flow.utils.exceptions.resolwe_exception_handler(exc, context)[source]

Handle exceptions raised in API and make them nicer.

To enable this, you have to add it to the settings:

REST_FRAMEWORK = {
    'EXCEPTION_HANDLER': 'resolwe.flow.utils.exceptions.resolwe_exception_handler',
}

Statistics

Various statistical utilities, used mostly for manager load tracking.

class resolwe.flow.utils.stats.NumberSeriesShape[source]

Helper class for computing characteristics for numerical data.

Given a series of numerical data, the class will keep a record of the extremes seen, arithmetic mean and standard deviation.

to_dict()[source]

Pack the stats computed into a dictionary.

update(num)[source]

Update metrics with the new number.

class resolwe.flow.utils.stats.SimpleLoadAvg(intervals)[source]

Helper class for a sort of load average based on event times.

Given a series of queue depth events, it will compute the average number of events for three different window lengths, emulating a form of ‘load average’. The calculation itself is modelled after the Linux scheduler, with a 5-second sampling rate. Because we don’t get consistent (time-wise) samples, the sample taken is the average of a simple moving window for the last 5 seconds; this is to avoid numerical errors if actual time deltas were used to compute the scaled decay.

add(count, timestamp=None)[source]

Add a value at the specified time to the series.

Parameters:
  • count – The number of work items ready at the specified time.

  • timestamp – The timestamp to add. Defaults to None, meaning current time. It should be strictly greater (newer) than the last added timestamp.

to_dict()[source]

Pack the load averages into a nicely-keyed dictionary.

Flow Management

Register Processes

class resolwe.flow.management.commands.register.Command(stdout=None, stderr=None, no_color=False, force_color=False)[source]

Register processes.

add_arguments(parser)[source]

Command arguments.

find_descriptor_schemas(schema_file)[source]

Find descriptor schemas in given path.

find_schemas(schema_path, schema_type='process', verbosity=1)[source]

Find schemas in packages that match filters.

handle(*args, **options)[source]

Register processes.

register_descriptors(descriptor_schemas, user, force=False, verbosity=1)[source]

Read and register descriptors.

register_processes(process_schemas, user, force=False, verbosity=1)[source]

Read and register processors.

retire(process_schemas)[source]

Retire obsolete processes.

Remove old process versions without data. Find processes that have been registered but do not exist in the code anymore, then:

  • If they do not have data: remove them

  • If they have data: flag them not active (is_active=False)

valid(instance, schema)[source]

Validate schema.

Resolwe Test Framework

Resolwe Test Cases

class resolwe.test.TestCaseHelpers(methodName='runTest')[source]

Mixin for test case helpers.

assertAlmostEqualGeneric(actual, expected, msg=None)[source]

Assert almost equality for common types of objects.

This is the same as assertEqual(), but using assertAlmostEqual() when floats are encountered inside common containers (currently this includes dict, list and tuple types).

Parameters:
  • actual – object to compare

  • expected – object to compare against

  • msg – optional message printed on failures

assertAnnotation(entity, path, value)[source]

Compare the entity annotation with the given value.

keep_data(mock_purge=True)[source]

Do not delete output files after tests.

setUp()[source]

Prepare environment for test.

class resolwe.test.TransactionTestCase(methodName='runTest')[source]

Base class for writing Resolwe tests not enclosed in a transaction.

It is based on Django’s TransactionTestCase. Use it if you need to access the test’s database from another thread/process.

setUp()[source]

Initialize test data.

class resolwe.test.TestCase(methodName='runTest')[source]

Base class for writing Resolwe tests.

It is based on TransactionTestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

class resolwe.test.ProcessTestCase(methodName='runTest')[source]

Base class for writing process tests.

It is a subclass of TransactionTestCase with some specific functions used for testing processes.

To write a process test use standard Django’s syntax for writing tests and follow the next steps:

  1. Put input files (if any) in tests/files directory of a Django application.

  2. Run the process using run_process().

  3. Check if the process has the expected status using assertStatus().

  4. Check process’s output using assertFields(), assertFile(), assertFileExists(), assertFiles() and assertJSON().

Note

When creating a test case for a custom Django application, subclass this class and over-ride the self.files_path with:

self.files_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'files')

Danger

If output files don’t exist in tests/files directory of a Django application, they are created automatically. But you have to check that they are correct before using them for further runs.

assertDir(obj, field_path, fn)[source]

Compare process output directory to correct compressed directory.

Parameters:
  • obj (Data) – object that includes the directory to compare

  • field_path (str) – path to Data object’s field with the file name

  • fn (str) – file name (and relative path) of the correct compressed directory to compare against. Path should be relative to the tests/files directory of a Django application. Compressed directory needs to be in tar.gz format.

assertDirExists(obj, field_path)[source]

Assert that a directory in the output field of the given object exists.

Parameters:
  • obj – object that includes the file for which to check if it exists

  • field_path – directory name/path

assertDirStructure(obj, field_path, dir_struct, exact=True)[source]

Assert correct tree structure in output field of given object.

Only names of directories and files are asserted. Content of files is not compared.

Parameters:
  • obj (Data) – object that includes the directory to compare

  • dir_path (str) – path to the directory to compare

  • dir_struct (dict) – correct tree structure of the directory. Dictionary keys are directory and file names with the correct nested structure. Dictionary value associated with each directory is a new dictionary which lists the content of the directory. Dictionary value associated with each file name is None

  • exact (bool) – if True tested directory structure must exactly match dir_struct. If False dir_struct must be a partial structure of the directory to compare

assertFields(obj, path, value)[source]

Compare object’s field to the given value.

The file size is ignored. Use assertFile to validate file contents.

Parameters:
  • obj (Data) – object with the field to compare

  • path (str) – path to Data object’s field

  • value (str) – desired value of Data object’s field

assertFile(obj, field_path, fn, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters:
  • obj (Data) – object that includes the file to compare

  • field_path (str) – path to Data object’s field with the file name

  • fn (str) – file name (and relative path) of the correct file to compare against. Path should be relative to the tests/files directory of a Django application.

  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.

  • filter (FunctionType) – function for filtering the contents of output files. It is used in itertools.filterfalse() function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.

  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.

assertFileExists(obj, field_path)[source]

Ensure a file in the given object’s field exists.

Parameters:
  • obj (Data) – object that includes the file for which to check if it exists

  • field_path (str) – path to Data object’s field with the file name/path

assertFiles(obj, field_path, fn_list, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters:
  • obj (Data) – object which includes the files to compare

  • field_path (str) – path to Data object’s field with the list of file names

  • fn_list (list) – list of file names (and relative paths) of files to compare against. Paths should be relative to the tests/files directory of a Django application.

  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.

  • filter (FunctionType) – Function for filtering the contents of output files. It is used in itertools.filterfalse function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.

  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.

assertFilesExist(obj, field_path)[source]

Ensure files in the given object’s field exists.

Parameters:
  • obj (Data) – object that includes list of files for which to check existance

  • field_path (str) – path to Data object’s field with the file name/path

assertJSON(obj, storage, field_path, file_name)[source]

Compare JSON in Storage object to the given correct JSON.

Parameters:
  • obj (Data) – object to which the Storage object belongs

  • storage (Storage or str) – object or id which contains JSON to compare

  • field_path (str) – path to JSON subset in the Storage’s object to compare against. If it is empty, the entire object will be compared.

  • file_name (str) –

    file name (and relative path) of the file with the correct JSON to compare against. Path should be relative to the tests/files directory of a Django application.

    Note

    The given JSON file should be compresed with gzip and have the .gz extension.

assertStatus(obj, status)[source]

Check if object’s status is equal to the given status.

Parameters:
  • obj (Data) – object for which to check the status

  • status (str) – desired value of object’s status attribute

property files_path

Path to test files.

get_json(file_name, storage)[source]

Return JSON saved in file and test JSON to compare it to.

The method returns a tuple of the saved JSON and the test JSON. In your test you should then compare the test JSON to the saved JSON that is commited to the repository.

The storage argument could be a Storage object, Storage ID or a Python dictionary. The test JSON is assigned a json field of the Storage object or the complete Python dictionary (if a dict is given).

If the file does not exist it is created, the test JSON is written to the new file and an exception is rased.

Parameters:
  • file_name (str) – file name (and relative path) of a JSON file. Path should be relative to the tests/files directory of a Django app. The file name must have a .gz extension.

  • storage (Storage, str or dict) – Storage object, Storage ID or a dict.

Returns:

(reference JSON, test JSON)

Return type:

tuple

preparation_stage()[source]

Context manager to mark input preparation stage.

run_process(process_slug, input_={}, assert_status='OK', descriptor=None, descriptor_schema=None, verbosity=0, tags=None, contributor=None, collection=None, process_resources=None)[source]

Run the specified process with the given inputs.

If input is a file, file path should be given relative to the tests/files directory of a Django application. If assert_status is given, check if Data object’s status matches it after the process has finished.

Note

If you need to delay calling the manager, you must put the desired code in a with transaction.atomic() block.

Parameters:
  • process_slug (str) – slug of the Process to run

  • input_ (dict) –

    Process’s input parameters

    Note

    You don’t have to specify parameters with defined default values.

  • assert_status (str) – desired status of the Data object

  • descriptor (dict) – descriptor to set on the Data object

  • descriptor_schema (dict) – descriptor schema to set on the Data object

  • tags (list) – list of tags that will be added to the created Data object

Returns:

object created by Process

Return type:

Data

run_processor(*args, **kwargs)[source]

Run process.

Deprecated method: use run_process.

setUp()[source]

Initialize test data.

tearDown()[source]

Clean up after the test.

class resolwe.test.TransactionResolweAPITestCase(methodName='runTest')[source]

Base class for testing Resolwe REST API.

This class is derived from Django REST Framework’s APITransactionTestCase class and has implemented some basic features that makes testing Resolwe API easier. These features includes following functions:

_get_list(user=None, query_params={})[source]

Make GET request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:

user (User or None) – User to authenticate in request

Returns:

API response object

Return type:

Response

_get_detail(pk, user=None, query_params={})[source]

Make GET request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object

  • user (User or None) – User to authenticate in request

Returns:

API response object

Return type:

Response

_post(data={}, user=None, query_params={})[source]

Make POST request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • data (dict) – data for posting in request’s body

  • user (User or None) – User to authenticate in request

Returns:

API response object

Return type:

Response

_patch(pk, data={}, user=None, query_params={})[source]

Make PATCH request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object

  • data (dict) – data for posting in request’s body

  • user (User or None) – User to authenticate in request

Returns:

API response object

Return type:

Response

_delete(pk, user=None, query_params={})[source]

Make DELETE request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object

  • user (User or None) – User to authenticate in request

Returns:

API response object

Return type:

Response

_detail_permissions(pk, data={}, user=None)[source]

Make POST request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object

  • data (dict) – data for posting in request’s body

  • user (User or None) – User to authenticate in request

Returns:

API response object

Return type:

Response

It also has included 2 views made from referenced DRF’s ViewSet. First mimic list view and has following links between request’s methods and ViewSet’s methods:

  • GET -> list

  • POST -> create

Second mimic detail view and has following links between request’s methods and ViewSet’s methods:

  • GET -> retrieve

  • PUT -> update

  • PATCH -> partial_update

  • DELETE -> destroy

  • POST -> permissions

If any of the listed methods is not defined in the VievSet, corresponding link is omitted.

Note

self.viewset (instance of DRF’s Viewset) and self.resource_name (string) must be defined before calling super setUp method to work properly.

self.factory is instance of DRF’s APIRequestFactory.

assertKeys(data, wanted)[source]

Assert dictionary keys.

detail_permissions(pk)[source]

Get detail permissions url.

detail_url(pk)[source]

Get detail url.

property list_url

Get list url.

setUp()[source]

Prepare data.

class resolwe.test.ResolweAPITestCase(methodName='runTest')[source]

Base class for writing Resolwe API tests.

It is based on TransactionResolweAPITestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

Resolwe Test Helpers and Decorators

resolwe.test.utils.check_docker()[source]

Check if Docker is installed and working.

Returns:

(indicator of the availability of Docker, reason for unavailability)

Return type:

tuple(bool, str)

resolwe.test.utils.check_installed(command)[source]

Check if the given command is installed.

Parameters:

command (str) – name of the command

Returns:

(indicator of the availability of the command, message saying command is not available)

Return type:

tuple(bool, str)

resolwe.test.utils.create_data_location(subpath=None)[source]

Create equivalent of old DataLocation object.

When argument is None, store the ID of the file storage object in the subpath.

resolwe.test.utils.is_testing()[source]

Return current testing status.

This assumes that the Resolwe test runner is being used.

resolwe.test.utils.with_custom_executor(wrapped=None, **custom_executor_settings)[source]

Decorate unit test to run processes with a custom executor.

Parameters:

custom_executor_settings (dict) – custom FLOW_EXECUTOR settings with which you wish to override the current settings

resolwe.test.utils.with_docker_executor(wrapped=None)[source]

Decorate unit test to run processes with the Docker executor.

resolwe.test.utils.with_null_executor(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]

Decorate unit test to run processes with the Null executor.

resolwe.test.utils.with_resolwe_host(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]

Decorate unit test to give it access to a live Resolwe host.

Set RESOLWE_HOST_URL setting to the address where the testing live Resolwe host listens to.

Note

This decorator must be used with a (sub)class of LiveServerTestCase which starts a live Django server in the background.

Resolwe Utilities

class resolwe.utils.BraceMessage(fmt, *args, **kwargs)[source]

Log messages with the new {}-string formatting syntax.

Note

When using this helper class, one pays no significant performance penalty since the actual formatting only happens when (and if) the logged message is actually outputted to a log by a handler.

Example of usage:

from resolwe.utils import BraceMessage as __

logger.error(__("Message with {0} {name}", 2, name="placeholders"))

Source: https://docs.python.org/3/howto/logging-cookbook.html#use-of-alternative-formatting-styles.