Reference

Permissions shortcuts

resolwe.permissions.shortcuts.get_object_perms(obj: django.db.models.base.Model, user: Optional[django.contrib.auth.models.User] = None, mock_superuser_permissions: bool = False) List[Dict][source]

Return permissions for given object in Resolwe specific format.

Function returns permissions for given object obj in the following format:

{
    "type": "group"/"user"/"public",
    "id": <group_or_user_id>,
    "name": <group_or_user_name>,
    "permissions": [<first_permission>, <second_permission>,...]
}

For public type id and name keys are omitted.

If user parameter is given, permissions are limited only to given user, groups he belongs to and public permissions.

This function should be only used from Resolwe views: since permissions for the current user (users when user has share permission on the given object) are prefetched, we only iterate through objects here and filter them in Python. Using filter method would result in a new database query.

Parameters
  • obj – Resolwe’s DB model’s instance

  • user – Django user

  • mock_superuser_permissions – when True return all permissions for users that are superusers

Returns

list of permissions object in described format

Permissions utils

resolwe.permissions.utils.copy_permissions(src_obj: django.db.models.base.Model, dest_obj: django.db.models.base.Model)[source]

Copy permissions form src_obj to dest_obj.

Warning

Existing permissions in dest_obj will we deleted.

Flow Managers

Workflow workload managers.

resolwe.flow.managers.manager

The global manager instance.

Type

Manager

Dispatcher

class resolwe.flow.managers.dispatcher.Manager(*args, **kwargs)[source]

The manager handles process job dispatching.

Each Data object that’s still waiting to be resolved is dispatched to a concrete workload management system (such as Celery or SLURM). The specific manager for that system (descended from BaseConnector) then handles actual job setup and submission. The job itself is an executor invocation; the executor then in turn sets up a safe and well-defined environment within the workload manager’s task in which the process is finally run.

async communicate(data_id=None, run_sync=False)[source]

Scan database for resolving Data objects and process them.

This is submitted as a task to the manager’s channel workers.

Parameters
  • data_id – Optional id of Data object which (+ its children) should be processes. If it is not given, all resolving objects are processed.

  • run_sync – If True, wait until all processes spawned from this point on have finished processing.

discover_engines()[source]

Discover configured engines.

Parameters

executor – Optional executor module override

drain_messages()[source]

Drain Django Channel messages.

async execution_barrier()[source]

Wait for executors to finish.

At least one must finish after this point to avoid a deadlock.

get_execution_engine(name: str)[source]

Return an execution engine instance.

get_executor()[source]

Return an executor instance.

get_expression_engine(name: str)[source]

Return an expression engine instance.

async handle_control_event(message: dict)[source]

Handle the control event.

The method is called from the channels layer when there is nome change either in the state of the Data object of the executors have finished with processing.

When running in sync state check that all database objects are in final state before raising the execution_barrier.

Channels layer callback, do not call directly.

load_execution_engines(engines: List[Union[dict, str]])[source]

Load execution engines.

load_executor(executor_name: str)[source]

Load process executor.

load_expression_engines(engines: List[Union[dict, str]])[source]

Load expression engines.

run(data: resolwe.flow.models.data.Data, argv: List)[source]

Select a concrete connector and run the process through it.

Parameters
  • data – The Data object that is to be run.

  • argv – The argument vector used to spawn the executor.

class resolwe.flow.managers.dispatcher.SettingsJSONifier(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Customized JSON encoder, coercing all unknown types into strings.

Needed due to the class hierarchy coming out of the database, which can’t be serialized using the vanilla json encoder.

default(o)[source]

Try default; otherwise, coerce the object into a string.

resolwe.flow.managers.dispatcher.dependency_status(data)[source]

Return abstracted status of dependencies.

  • STATUS_ERROR .. one dependency has error status or was deleted

  • STATUS_DONE .. all dependencies have done status

  • None .. other

Workload Connectors

The workload management system connectors are used as glue between the Resolwe Manager and various concrete workload management systems that might be used by it. Since the only functional requirement is job submission, they can be simple and nearly contextless.

Base Class

class resolwe.flow.managers.workload_connectors.base.BaseConnector[source]

The abstract base class for workload manager connectors.

The main Manager instance in manager uses connectors to handle communication with concrete backend workload management systems, such as Celery and SLURM. The connectors need not worry about how jobs are discovered or how they’re prepared for execution; this is all done by the manager.

cleanup(data_id: int)[source]

Perform final cleanup after data object is finished processing.

submit(data: resolwe.flow.models.data.Data, argv)[source]

Submit the job to the workload management system.

Parameters
  • data – The Data object that is to be run.

  • argv – The argument vector used to spawn the executor.

Local Connector

class resolwe.flow.managers.workload_connectors.local.Connector[source]

Local connector for job execution.

cleanup(data_id: int)[source]

Cleanup.

submit(data: resolwe.flow.models.data.Data, argv)[source]

Run process locally.

For details, see submit().

Celery Connector

class resolwe.flow.managers.workload_connectors.celery.Connector[source]

Celery-based connector for job execution.

cleanup(data_id: int)[source]

Cleanup.

submit(data: resolwe.flow.models.data.Data, argv)[source]

Run process.

For details, see submit().

Slurm Connector

class resolwe.flow.managers.workload_connectors.slurm.Connector[source]

Slurm-based connector for job execution.

cleanup(data_id: int)[source]

Cleanup.

submit(data: resolwe.flow.models.data.Data, argv)[source]

Run process with SLURM.

For details, see submit().

Kubernetes Connector

class resolwe.flow.managers.workload_connectors.kubernetes.Connector[source]

Kubernetes-based connector for job execution.

cleanup(data_id: int)[source]

Remove the persistent volume claims created by the executor.

start(data: resolwe.flow.models.data.Data, listener_connection: Tuple[str, str, str])[source]

Start process execution.

Construct kubernetes job description and pass it to the kubernetes.

submit(data: resolwe.flow.models.data.Data, argv)[source]

Run process.

For details, see submit().

resolwe.flow.managers.workload_connectors.kubernetes.get_mountable_connectors() Iterable[Tuple[str, resolwe.storage.connectors.baseconnector.BaseStorageConnector]][source]

Iterate through all the storages and find mountable connectors.

Returns

list of tuples (storage_name, connector).

resolwe.flow.managers.workload_connectors.kubernetes.get_upload_dir() str[source]

Get the upload path.

: returns: the path of the first mountable connector for storage

‘upload’.

Raises

RuntimeError – if no applicable connector is found.

resolwe.flow.managers.workload_connectors.kubernetes.sanitize_kubernetes_label(label: str, trim_end: bool = True) str[source]

Make sure kubernetes label complies with the rules.

See the URL bellow for details.

https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/

resolwe.flow.managers.workload_connectors.kubernetes.unique_volume_name(base_name: str, data_id: int) str[source]

Get unique persistent volume claim name.

Listener

Consumer

Manager Channels consumer.

class resolwe.flow.managers.consumer.HealtCheckConsumer[source]

Channels consumer for handling health-check events.

async health_check(message: dict)[source]

Perform health check.

We are testing the channels layer and database layer. The channels layer is already functioning if this method is called so we have to perform database check.

If the check is successfull touch the file specified in the channels message.

class resolwe.flow.managers.consumer.ManagerConsumer(*args, **kwargs)[source]

Channels consumer for handling manager events.

async control_event(message)[source]

Forward control events to the manager dispatcher.

async resolwe.flow.managers.consumer.exit_consumer()[source]

Cause the synchronous consumer to exit cleanly.

async resolwe.flow.managers.consumer.run_consumer(timeout=None)[source]

Run the consumer until it finishes processing.

Parameters

timeout – Set maximum execution time before cancellation, or None (default) for unlimited.

async resolwe.flow.managers.consumer.send_event(message)[source]

Construct a Channels event packet with the given message.

Parameters

message – The message to send to the manager workers.

Utilities

Utilities for using global manager features.

resolwe.flow.managers.utils.disable_auto_calls()[source]

Decorator/context manager which stops automatic manager calls.

When entered, automatic communicate() calls from the Django transaction signal are not done.

Flow Executors

Base Class

class resolwe.flow.executors.run.BaseFlowExecutor(data_id: int, communicator: resolwe.flow.executors.zeromq_utils.ZMQCommunicator, listener_connection: Tuple[str, str, str], *args, **kwargs)[source]

Represents a workflow executor.

get_tools_paths()[source]

Get tools paths.

async run()[source]

Execute the script and save results.

async start()[source]

Start process execution.

Flow Executor Preparer

Framework for the manager-resident executor preparation facilities.

class resolwe.flow.executors.prepare.BaseFlowExecutorPreparer[source]

Represents the preparation functionality of the executor.

extend_settings(data_id, files, secrets)[source]

Extend the settings the manager will serialize.

Parameters
  • data_id – The Data object id being prepared for.

  • files – The settings dictionary to be serialized. Keys are filenames, values are the objects that will be serialized into those files. Standard filenames are listed in resolwe.flow.managers.protocol.ExecutorFiles.

  • secrets – Secret files dictionary describing additional secret file content that should be created and made available to processes with special permissions. Keys are filenames, values are the raw strings that should be written into those files.

get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

get_tools_paths(from_applications=False)[source]

Get tools’ paths.

post_register_hook(verbosity=1)[source]

Run hook after the ‘register’ management command finishes.

Subclasses may implement this hook to e.g. pull Docker images at this point. By default, it does nothing.

prepare_for_execution(data)[source]

Prepare the data object for the execution.

This is mostly needed for the null executor to change the status of the data and worker object to done.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters
  • data – Data object instance

  • filename – Filename to resolve

Returns

Resolved filename, which can be used to access the given data file in programs executed using this executor

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters

filename – Filename to resolve

Returns

Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Raises

RuntimeError – when no storage connectors are configured for upload storage or path could not be resolved.

Docker Flow Executor

Preparation

class resolwe.flow.executors.docker.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the docker executor.

get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

post_register_hook(verbosity=1)[source]

Pull Docker images needed by processes after registering.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters
  • data – Data object instance

  • filename – Filename to resolve

Returns

Resolved filename, which can be used to access the given data file in programs executed using this executor

Raises

RuntimeError – when data path can not be resolved.

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters

filename – Filename to resolve

Returns

Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Local Flow Executor

class resolwe.flow.executors.local.run.FlowExecutor(*args, **kwargs)[source]

Local dataflow executor proxy.

Preparation

class resolwe.flow.executors.local.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the local executor.

extend_settings(data_id, files, secrets)[source]

Prevent processes requiring access to secrets from being run.

Null Flow Executor

class resolwe.flow.executors.null.run.FlowExecutor(data_id: int, communicator: resolwe.flow.executors.zeromq_utils.ZMQCommunicator, listener_connection: Tuple[str, str, str], *args, **kwargs)[source]

Null dataflow executor proxy.

This executor is intended to be used in tests where you want to save the object to the database but don’t need to run it.

Flow Models

Base Model

Base model for all other models.

class resolwe.flow.models.base.BaseModel(*args, **kwargs)[source]

Abstract model that includes common fields for other models.

class Meta[source]

BaseModel Meta options.

contributor

user that created the entry

created

creation date and time

modified

modified date and time

name

object name

save(*args, **kwargs)[source]

Save the model.

slug

URL slug

version

process version

Collection Model

Postgres ORM model for the organization of collections.

class resolwe.flow.models.collection.BaseCollection(*args, **kwargs)[source]

Template for Postgres model for storing a collection.

class Meta[source]

BaseCollection Meta options.

description

detailed description

descriptor

collection descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

collection descriptor schema

save(*args, **kwargs)[source]

Perform descriptor validation and save object.

search

field used for full-text search

tags

tags for categorizing objects

class resolwe.flow.models.Collection(*args, **kwargs)[source]

Postgres model for storing a collection.

exception DoesNotExist
exception MultipleObjectsReturned
duplicate(contributor)[source]

Duplicate (make a copy).

duplicated

duplication date and time

is_duplicate()[source]

Return True if collection is a duplicate.

objects = <django.db.models.manager.ManagerFromCollectionQuerySet object>

manager

Data model

Postgres ORM model for keeping the data structured.

class resolwe.flow.models.Data(*args, **kwargs)[source]

Postgres model for storing data.

exception DoesNotExist
exception MultipleObjectsReturned
STATUS_DIRTY = 'DR'

data object is in dirty state

STATUS_DONE = 'OK'

data object is done

STATUS_ERROR = 'ER'

data object is in error state

STATUS_PREPARING = 'PP'

data object is preparing

STATUS_PROCESSING = 'PR'

data object is processing

STATUS_RESOLVING = 'RE'

data object is being resolved

STATUS_UPLOADING = 'UP'

data object is uploading

STATUS_WAITING = 'WT'

data object is waiting

checksum

checksum field calculated on inputs

collection

collection

delete(*args, **kwargs)[source]

Delete the data model.

descriptor

actual descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

data descriptor schema

duplicate(contributor, inherit_entity=False, inherit_collection=False)[source]

Duplicate (make a copy).

duplicated

duplication date and time

entity

entity

finished

process finished date date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

get_resource_limits()[source]

Return the resource limits for this data.

input

actual inputs used by the process

is_duplicate()[source]

Return True if data object is a duplicate.

location

data location

move_to_collection(collection)[source]

Move data object to collection.

move_to_entity(entity)[source]

Move data object to entity.

named_by_user

track if user set the data name explicitly

objects = <django.db.models.manager.ManagerFromDataQuerySet object>

manager

output

actual outputs of the process

parents

dependencies between data objects

process

process used to compute the data object

process_cores

actual allocated cores

process_error

error log message

process_info

info log message

process_memory

actual allocated memory

process_pid

process id

process_progress

progress

process_rc

return code

process_resources

process requirements overrides

process_warning

warning log message

resolve_secrets()[source]

Retrieve handles for all basic:secret: fields on input.

The process must have the secrets resource requirement specified in order to access any secrets. Otherwise this method will raise a PermissionDenied exception.

Returns

A dictionary of secrets where key is the secret handle and value is the secret value.

save(render_name=False, *args, **kwargs)[source]

Save the data model.

save_dependencies(instance, schema)[source]

Save data: and list:data: references as parents.

scheduled

date and time when process was dispatched to the scheduling system (set by``resolwe.flow.managers.dispatcher.Manager.run``

search

field used for full-text search

size

total size of data’s outputs in bytes

started

process started date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

status

Data status

It can be one of the following:

tags

tags for categorizing objects

validate_change_collection(collection)[source]

Raise validation error if data object cannot change collection.

class resolwe.flow.models.DataDependency(*args, **kwargs)[source]

Dependency relation between data objects.

exception DoesNotExist
KIND_IO = 'io'

child uses parent’s output as its input

KIND_SUBPROCESS = 'subprocess'

child was spawned by the parent

exception MultipleObjectsReturned
child

child data object

kind

kind of dependency

parent

parent data object

Entity–relationship model

Postgres ORM to define the entity–relationship model that describes how data objects are related in a specific domain.

class resolwe.flow.models.Entity(*args, **kwargs)[source]

Postgres model for storing entities.

exception DoesNotExist
exception MultipleObjectsReturned
collection

collection to which entity belongs

duplicate(contributor, inherit_collection=False)[source]

Duplicate (make a copy).

duplicated

duplication date and time

is_duplicate()[source]

Return True if entity is a duplicate.

move_to_collection(collection)[source]

Move entity from the source to the destination collection.

Args collection

the collection to move entity into.

objects = <django.db.models.manager.ManagerFromEntityQuerySet object>

manager

type

entity type

class resolwe.flow.models.Relation(*args, **kwargs)[source]

Relations between entities.

The Relation model defines the associations and dependencies between entities in a given collection:

{
    "collection": "<collection_id>",
    "type": "comparison",
    "category": "case-control study",
    "entities": [
        {"enetity": "<entity1_id>", "label": "control"},
        {"enetity": "<entity2_id>", "label": "case"},
        {"enetity": "<entity3_id>", "label": "case"}
    ]
}

Relation type defines a specific set of associations among entities. It can be something like group, comparison or series. The relation type is an instance of RelationType and should be defined in any Django app that uses relations (e.g., as a fixture). Multiple relations of the same type are allowed on the collection.

Relation category defines a specific use case. The relation category must be unique in a collection, so that users can distinguish between different relations. In the example above, we could add another comparison relation of category, say Case-case study to compare <entity2> with <entity3>.

Relation is linked to resolwe.flow.models.Collection to enable defining different relations structures in different collections. This also greatly speed up retrieving of relations, as they are envisioned to be mainly used on a collection level.

unit defines units used in partitions where it is applicable, e.g. in relations of type series.

exception DoesNotExist
exception MultipleObjectsReturned
category

category of the relation

collection

collection to which relation belongs

entities

partitions of entities in the relation

objects = <django.db.models.manager.ManagerFromPermissionQuerySet object>

custom manager with permission filtering methods

type

type of the relation

unit

unit used in the partitions’ positions (where applicable, e.g. for serieses)

class resolwe.flow.models.RelationType(*args, **kwargs)[source]

Model for storing relation types.

exception DoesNotExist
exception MultipleObjectsReturned
name

relation type name

ordered

indicates if order of entities in relation is important or not

DescriptorSchema model

Postgres ORM model for storing descriptors.

class resolwe.flow.models.DescriptorSchema(*args, **kwargs)[source]

Postgres model for storing descriptors.

exception DoesNotExist
exception MultipleObjectsReturned
description

detailed description

schema

user descriptor schema represented as a JSON object

Process model

Postgres ORM model for storing processes.

class resolwe.flow.models.Process(*args, **kwargs)[source]

Postgres model for storing processes.

exception DoesNotExist
exception MultipleObjectsReturned
PERSISTENCE_CACHED = 'CAC'

cached persistence

PERSISTENCE_RAW = 'RAW'

raw persistence

PERSISTENCE_TEMP = 'TMP'

temp persistence

category

category

data_name

template for name of Data object created with Process

description

detailed description

entity_always_create

Create new entity, regardless of entity_input or entity_descriptor_schema fields.

entity_descriptor_schema

Slug of the descriptor schema assigned to the Entity created with entity_type.

entity_input

Limit the entity selection in entity_type to a single input.

entity_type

Automatically add Data object created with this process to an Entity object representing a data-flow. If all input Data objects belong to the same entity, add newly created Data object to it, otherwise create a new one.

get_resource_limits(data: Optional[resolwe.flow.models.data.Data] = None)[source]

Get the core count and memory usage limits for this process.

Returns

A dictionary with the resource limits, containing the following keys:

  • memory: Memory usage limit, in MB. Defaults to 4096 if not otherwise specified in the resource requirements.

  • cores: Core count limit. Defaults to 1.

  • storage: Size (in gibibytes) of temporary volume used for processing in kubernetes. Defaults to 200.

Return type

dict

input_schema

process input schema (describes input parameters, form layout “Inputs” for Data.input)

Handling:

  • schema defined by: dev

  • default by: user

  • changable by: none

is_active

designates whether this process should be treated as active

output_schema

process output schema (describes output JSON, form layout “Results” for Data.output)

Handling:

  • schema defined by: dev

  • default by: dev

  • changable by: dev

Implicitly defined fields (by resolwe.flow.management.commands.register() or resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives):

  • progress of type basic:float (from 0.0 to 1.0)

  • proc of type basic:group containing:

    • stdout of type basic:text

    • rc of type basic:integer

    • task of type basic:string (celery task id)

    • worker of type basic:string (celery worker hostname)

    • runtime of type basic:string (runtime instance hostname)

    • pid of type basic:integer (process ID)

persistence

Persistence of Data objects created with this process. It can be one of the following:

Note

If persistence is set to PERSISTENCE_CACHED or PERSISTENCE_TEMP, the process must be idempotent.

requirements

process requirements

run

process command and environment description for internal use

Handling:

  • schema defined by: dev

  • default by: dev

  • changable by: dev

scheduling_class

process scheduling class

type

data type

Storage model

Postgres ORM model for storing JSON.

class resolwe.flow.models.Storage(*args, **kwargs)[source]

Postgres model for storing storages.

exception DoesNotExist
exception MultipleObjectsReturned
data

corresponding data objects

json

actual JSON stored

objects = <django.db.models.manager.StorageManagerFromPermissionQuerySet object>

storage manager

Secret model

Postgres ORM model for storing secrets.

class resolwe.flow.models.Secret(*args, **kwargs)[source]

Postgres model for storing secrets.

ProcessMigrationHistory model

Postgres ORM model for storing proces migration history.

class resolwe.flow.models.ProcessMigrationHistory(*args, **kwargs)[source]

Model for storing process migration history.

exception DoesNotExist
exception MultipleObjectsReturned

DataMigrationHistory model

Postgres ORM model for storing data migration history.

class resolwe.flow.models.DataMigrationHistory(*args, **kwargs)[source]

Model for storing data migration history.

exception DoesNotExist
exception MultipleObjectsReturned

Utility functions

resolwe.flow.models.utils.duplicate.bulk_duplicate(collections=None, entities=None, data=None, contributor=None, inherit_collection=False, inherit_entity=False, name_prefix=None)[source]

Make a copy of given collection, entity or data queryset.

Exactly one of collections, entities and data parameters should be passed to the function and should respectively represent a queryset of Collection, Entity and Data objects to be copied.

When copying Collections or Entities, also the contained objects (Entities and Data) are also copied.

Copied objects are transformed in the following ways:

  • name_prefix (“Copy of ” by default) string is prepend to names of all copied objects

  • Collection and/or entity of top-most copied objects are preserved only if inherit_collection and/or inherit_entity values are set to True

  • all contained objects are attached to new collections and entities

  • input fields of all copied Data objects are processed and all inputs are replaced with their copies if they exist

  • permissions are copied from original objects

  • Data migration history is copied and linked to the new Data objects

Parameters
  • collections (~resolwe.flow.models.collection.CollectionQuerySet) – A collection queryset to duplicate.

  • entities – An entity queryset to duplicate.

  • data (~resolwe.flow.models.data.DataQuerySet) – A data queryset to duplicate.

  • contributor (~django.contrib.auth.models.User) – A Django user that will be assigned to copied objects as contributor.

  • inherit_collection (bool) – Indicates whether copied entities and data objects are added to the same collection as originals or not.

  • inherit_entity (bool) – Indicates whether copied data objects are added to the same collection as originals or not.

  • name_prefix (str) – A prefix that will be prepend to the name of all copied objects.

Return type

~resolwe.flow.models.collection.CollectionQuerySet or ~resolwe.flow.models.entity.EntityQuerySet or ~resolwe.flow.models.data.DataQuerySet

Flow Utilities

Resolwe Exceptions Utils

Utils functions for working with exceptions.

resolwe.flow.utils.exceptions.resolwe_exception_handler(exc, context)[source]

Handle exceptions raised in API and make them nicer.

To enable this, you have to add it to the settings:

REST_FRAMEWORK = {
    'EXCEPTION_HANDLER': 'resolwe.flow.utils.exceptions.resolwe_exception_handler',
}

Statistics

Various statistical utilities, used mostly for manager load tracking.

class resolwe.flow.utils.stats.NumberSeriesShape[source]

Helper class for computing characteristics for numerical data.

Given a series of numerical data, the class will keep a record of the extremes seen, arithmetic mean and standard deviation.

to_dict()[source]

Pack the stats computed into a dictionary.

update(num)[source]

Update metrics with the new number.

class resolwe.flow.utils.stats.SimpleLoadAvg(intervals)[source]

Helper class for a sort of load average based on event times.

Given a series of queue depth events, it will compute the average number of events for three different window lengths, emulating a form of ‘load average’. The calculation itself is modelled after the Linux scheduler, with a 5-second sampling rate. Because we don’t get consistent (time-wise) samples, the sample taken is the average of a simple moving window for the last 5 seconds; this is to avoid numerical errors if actual time deltas were used to compute the scaled decay.

add(count, timestamp=None)[source]

Add a value at the specified time to the series.

Parameters
  • count – The number of work items ready at the specified time.

  • timestamp – The timestamp to add. Defaults to None, meaning current time. It should be strictly greater (newer) than the last added timestamp.

to_dict()[source]

Pack the load averages into a nicely-keyed dictionary.

Flow Management

Register Processes

class resolwe.flow.management.commands.register.Command(stdout=None, stderr=None, no_color=False, force_color=False)[source]

Register processes.

add_arguments(parser)[source]

Command arguments.

find_descriptor_schemas(schema_file)[source]

Find descriptor schemas in given path.

find_schemas(schema_path, schema_type='process', verbosity=1)[source]

Find schemas in packages that match filters.

handle(*args, **options)[source]

Register processes.

register_descriptors(descriptor_schemas, user, force=False, verbosity=1)[source]

Read and register descriptors.

register_processes(process_schemas, user, force=False, verbosity=1)[source]

Read and register processors.

retire(process_schemas)[source]

Retire obsolete processes.

Remove old process versions without data. Find processes that have been registered but do not exist in the code anymore, then:

  • If they do not have data: remove them

  • If they have data: flag them not active (is_active=False)

valid(instance, schema)[source]

Validate schema.

Resolwe Test Framework

Resolwe Test Cases

class resolwe.test.TestCaseHelpers(methodName='runTest')[source]

Mixin for test case helpers.

assertAlmostEqualGeneric(actual, expected, msg=None)[source]

Assert almost equality for common types of objects.

This is the same as assertEqual(), but using assertAlmostEqual() when floats are encountered inside common containers (currently this includes dict, list and tuple types).

Parameters
  • actual – object to compare

  • expected – object to compare against

  • msg – optional message printed on failures

keep_data(mock_purge=True)[source]

Do not delete output files after tests.

setUp()[source]

Prepare environment for test.

class resolwe.test.TransactionTestCase(methodName='runTest')[source]

Base class for writing Resolwe tests not enclosed in a transaction.

It is based on Django’s TransactionTestCase. Use it if you need to access the test’s database from another thread/process.

setUp()[source]

Initialize test data.

class resolwe.test.TestCase(methodName='runTest')[source]

Base class for writing Resolwe tests.

It is based on TransactionTestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

class resolwe.test.ProcessTestCase(methodName='runTest')[source]

Base class for writing process tests.

It is a subclass of TransactionTestCase with some specific functions used for testing processes.

To write a process test use standard Django’s syntax for writing tests and follow the next steps:

  1. Put input files (if any) in tests/files directory of a Django application.

  2. Run the process using run_process().

  3. Check if the process has the expected status using assertStatus().

  4. Check process’s output using assertFields(), assertFile(), assertFileExists(), assertFiles() and assertJSON().

Note

When creating a test case for a custom Django application, subclass this class and over-ride the self.files_path with:

self.files_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'files')

Danger

If output files don’t exist in tests/files directory of a Django application, they are created automatically. But you have to check that they are correct before using them for further runs.

assertDir(obj, field_path, fn)[source]

Compare process output directory to correct compressed directory.

Parameters
  • obj (Data) – object that includes the directory to compare

  • field_path (str) – path to Data object’s field with the file name

  • fn (str) – file name (and relative path) of the correct compressed directory to compare against. Path should be relative to the tests/files directory of a Django application. Compressed directory needs to be in tar.gz format.

assertDirExists(obj, field_path)[source]

Assert that a directory in the output field of the given object exists.

Parameters
  • obj – object that includes the file for which to check if it exists

  • field_path – directory name/path

assertDirStructure(obj, field_path, dir_struct, exact=True)[source]

Assert correct tree structure in output field of given object.

Only names of directories and files are asserted. Content of files is not compared.

Parameters
  • obj (Data) – object that includes the directory to compare

  • dir_path (str) – path to the directory to compare

  • dir_struct (dict) – correct tree structure of the directory. Dictionary keys are directory and file names with the correct nested structure. Dictionary value associated with each directory is a new dictionary which lists the content of the directory. Dictionary value associated with each file name is None

  • exact (bool) – if True tested directory structure must exactly match dir_struct. If False dir_struct must be a partial structure of the directory to compare

assertFields(obj, path, value)[source]

Compare object’s field to the given value.

The file size is ignored. Use assertFile to validate file contents.

Parameters
  • obj (Data) – object with the field to compare

  • path (str) – path to Data object’s field

  • value (str) – desired value of Data object’s field

assertFile(obj, field_path, fn, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters
  • obj (Data) – object that includes the file to compare

  • field_path (str) – path to Data object’s field with the file name

  • fn (str) – file name (and relative path) of the correct file to compare against. Path should be relative to the tests/files directory of a Django application.

  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.

  • filter (FunctionType) – function for filtering the contents of output files. It is used in itertools.filterfalse() function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.

  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.

assertFileExists(obj, field_path)[source]

Ensure a file in the given object’s field exists.

Parameters
  • obj (Data) – object that includes the file for which to check if it exists

  • field_path (str) – path to Data object’s field with the file name/path

assertFiles(obj, field_path, fn_list, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters
  • obj (Data) – object which includes the files to compare

  • field_path (str) – path to Data object’s field with the list of file names

  • fn_list (list) – list of file names (and relative paths) of files to compare against. Paths should be relative to the tests/files directory of a Django application.

  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.

  • filter (FunctionType) – Function for filtering the contents of output files. It is used in itertools.filterfalse function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.

  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.

assertFilesExist(obj, field_path)[source]

Ensure files in the given object’s field exists.

Parameters
  • obj (Data) – object that includes list of files for which to check existance

  • field_path (str) – path to Data object’s field with the file name/path

assertJSON(obj, storage, field_path, file_name)[source]

Compare JSON in Storage object to the given correct JSON.

Parameters
  • obj (Data) – object to which the Storage object belongs

  • storage (Storage or str) – object or id which contains JSON to compare

  • field_path (str) – path to JSON subset in the Storage’s object to compare against. If it is empty, the entire object will be compared.

  • file_name (str) –

    file name (and relative path) of the file with the correct JSON to compare against. Path should be relative to the tests/files directory of a Django application.

    Note

    The given JSON file should be compresed with gzip and have the .gz extension.

assertStatus(obj, status)[source]

Check if object’s status is equal to the given status.

Parameters
  • obj (Data) – object for which to check the status

  • status (str) – desired value of object’s status attribute

property files_path

Path to test files.

get_json(file_name, storage)[source]

Return JSON saved in file and test JSON to compare it to.

The method returns a tuple of the saved JSON and the test JSON. In your test you should then compare the test JSON to the saved JSON that is commited to the repository.

The storage argument could be a Storage object, Storage ID or a Python dictionary. The test JSON is assigned a json field of the Storage object or the complete Python dictionary (if a dict is given).

If the file does not exist it is created, the test JSON is written to the new file and an exception is rased.

Parameters
  • file_name (str) – file name (and relative path) of a JSON file. Path should be relative to the tests/files directory of a Django app. The file name must have a .gz extension.

  • storage (Storage, str or dict) – Storage object, Storage ID or a dict.

Returns

(reference JSON, test JSON)

Return type

tuple

preparation_stage()[source]

Context manager to mark input preparation stage.

run_process(process_slug, input_={}, assert_status='OK', descriptor=None, descriptor_schema=None, verbosity=0, tags=None, contributor=None, collection=None, process_resources=None)[source]

Run the specified process with the given inputs.

If input is a file, file path should be given relative to the tests/files directory of a Django application. If assert_status is given, check if Data object’s status matches it after the process has finished.

Note

If you need to delay calling the manager, you must put the desired code in a with transaction.atomic() block.

Parameters
  • process_slug (str) – slug of the Process to run

  • input_ (dict) –

    Process’s input parameters

    Note

    You don’t have to specify parameters with defined default values.

  • assert_status (str) – desired status of the Data object

  • descriptor (dict) – descriptor to set on the Data object

  • descriptor_schema (dict) – descriptor schema to set on the Data object

  • tags (list) – list of tags that will be added to the created Data object

Returns

object created by Process

Return type

Data

run_processor(*args, **kwargs)[source]

Run process.

Deprecated method: use run_process.

setUp()[source]

Initialize test data.

tearDown()[source]

Clean up after the test.

class resolwe.test.TransactionResolweAPITestCase(methodName='runTest')[source]

Base class for testing Resolwe REST API.

This class is derived from Django REST Framework’s APITransactionTestCase class and has implemented some basic features that makes testing Resolwe API easier. These features includes following functions:

_get_list(user=None, query_params={})[source]

Make GET request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters

user (User or None) – User to authenticate in request

Returns

API response object

Return type

Response

_get_detail(pk, user=None, query_params={})[source]

Make GET request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters
  • pk (int) – Primary key of the coresponding object

  • user (User or None) – User to authenticate in request

Returns

API response object

Return type

Response

_post(data={}, user=None, query_params={})[source]

Make POST request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters
  • data (dict) – data for posting in request’s body

  • user (User or None) – User to authenticate in request

Returns

API response object

Return type

Response

_patch(pk, data={}, user=None, query_params={})[source]

Make PATCH request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters
  • pk (int) – Primary key of the coresponding object

  • data (dict) – data for posting in request’s body

  • user (User or None) – User to authenticate in request

Returns

API response object

Return type

Response

_delete(pk, user=None, query_params={})[source]

Make DELETE request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters
  • pk (int) – Primary key of the coresponding object

  • user (User or None) – User to authenticate in request

Returns

API response object

Return type

Response

_detail_permissions(pk, data={}, user=None)[source]

Make POST request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters
  • pk (int) – Primary key of the coresponding object

  • data (dict) – data for posting in request’s body

  • user (User or None) – User to authenticate in request

Returns

API response object

Return type

Response

It also has included 2 views made from referenced DRF’s ViewSet. First mimic list view and has following links between request’s methods and ViewSet’s methods:

  • GET -> list

  • POST -> create

Second mimic detail view and has following links between request’s methods and ViewSet’s methods:

  • GET -> retrieve

  • PUT -> update

  • PATCH -> partial_update

  • DELETE -> destroy

  • POST -> permissions

If any of the listed methods is not defined in the VievSet, corresponding link is omitted.

Note

self.viewset (instance of DRF’s Viewset) and self.resource_name (string) must be defined before calling super setUp method to work properly.

self.factory is instance of DRF’s APIRequestFactory.

assertKeys(data, wanted)[source]

Assert dictionary keys.

detail_permissions(pk)[source]

Get detail permissions url.

detail_url(pk)[source]

Get detail url.

property list_url

Get list url.

setUp()[source]

Prepare data.

class resolwe.test.ResolweAPITestCase(methodName='runTest')[source]

Base class for writing Resolwe API tests.

It is based on TransactionResolweAPITestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

Resolwe Test Helpers and Decorators

resolwe.test.utils.check_docker()[source]

Check if Docker is installed and working.

Returns

(indicator of the availability of Docker, reason for unavailability)

Return type

tuple(bool, str)

resolwe.test.utils.check_installed(command)[source]

Check if the given command is installed.

Parameters

command (str) – name of the command

Returns

(indicator of the availability of the command, message saying command is not available)

Return type

tuple(bool, str)

resolwe.test.utils.create_data_location(subpath=None)[source]

Create equivalent of old DataLocation object.

When argument is None, store the ID of the file storage object in the subpath.

resolwe.test.utils.is_testing()[source]

Return current testing status.

This assumes that the Resolwe test runner is being used.

resolwe.test.utils.with_custom_executor(wrapped=None, **custom_executor_settings)[source]

Decorate unit test to run processes with a custom executor.

Parameters

custom_executor_settings (dict) – custom FLOW_EXECUTOR settings with which you wish to override the current settings

resolwe.test.utils.with_docker_executor(wrapped=None)[source]

Decorate unit test to run processes with the Docker executor.

resolwe.test.utils.with_null_executor(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]

Decorate unit test to run processes with the Null executor.

resolwe.test.utils.with_resolwe_host(wrapper=None, enabled=None, adapter=None, proxy=<class 'FunctionWrapper'>)[source]

Decorate unit test to give it access to a live Resolwe host.

Set RESOLWE_HOST_URL setting to the address where the testing live Resolwe host listens to.

Note

This decorator must be used with a (sub)class of LiveServerTestCase which starts a live Django server in the background.

Resolwe Utilities

class resolwe.utils.BraceMessage(fmt, *args, **kwargs)[source]

Log messages with the new {}-string formatting syntax.

Note

When using this helper class, one pays no significant performance penalty since the actual formatting only happens when (and if) the logged message is actually outputted to a log by a handler.

Example of usage:

from resolwe.utils import BraceMessage as __

logger.error(__("Message with {0} {name}", 2, name="placeholders"))

Source: https://docs.python.org/3/howto/logging-cookbook.html#use-of-alternative-formatting-styles.