Reference

Permissions shortcuts

resolwe.permissions.shortcuts._group_groups(perm_list)[source]

Group permissions by group.

Input is list of tuples of length 3, where each tuple is in following format:

(<group_id>, <group_name>, <single_permission>)

Permissions are regrouped and returned in such way that there is only one tuple for each group:

(<group_id>, <group_name>, [<first_permission>, <second_permission>,...])
Parameters:perm_list (list) – list of touples of length 3
Returns:list tuples with grouped permissions
Return type:list
resolwe.permissions.shortcuts.get_object_perms(obj, user=None)[source]

Return permissions for given object in Resolwe specific format.

Function returns permissions for given object obj in following format:

{
    "type": "group"/"user"/"public",
    "id": <group_or_user_id>,
    "name": <group_or_user_name>,
    "permissions": [<first_permission>, <second_permission>,...]
}

For public type id and name keys are omitted.

If user parameter is given, permissions are limited only to given user, groups he belongs to and public permissions.

Parameters:
  • obj (a subclass of BaseModel) – Resolwe’s DB model’s instance
  • user (User or None) – Django user
Returns:

list of permissions object in described format

Return type:

list

Permissions utils

resolwe.permissions.utils.copy_permissions(src_obj, dest_obj)[source]

Copy permissions form src_obj to dest_obj.

Flow Managers

Workflow workload managers.

resolwe.flow.managers.manager

The global manager instance.

Type:Manager

Dispatcher

class resolwe.flow.managers.dispatcher.Manager(*args, **kwargs)[source]

The manager handles process job dispatching.

Each Data object that’s still waiting to be resolved is dispatched to a concrete workload management system (such as Celery or SLURM). The specific manager for that system (descended from BaseConnector) then handles actual job setup and submission. The job itself is an executor invocation; the executor then in turn sets up a safe and well-defined environment within the workload manager’s task in which the process is finally run.

communicate(data_id=None, run_sync=False, save_settings=True)[source]

Scan database for resolving Data objects and process them.

This is submitted as a task to the manager’s channel workers.

Parameters:
  • data_id – Optional id of Data object which (+ its children) should be processes. If it is not given, all resolving objects are processed.
  • run_sync – If True, wait until all processes spawned from this point on have finished processing. If no processes are spawned, this results in a deadlock, since counts are handled on process finish.
  • save_settings – If True, save the current Django settings context to the global state. This should never be True for “automatic” calls, such as from Django signals, which can be invoked from inappropriate contexts (such as in the listener). For user code, it should be left at the default value. The saved settings are in effect until the next such call.
discover_engines(executor=None)[source]

Discover configured engines.

Parameters:executor – Optional executor module override
execution_barrier()[source]

Wait for executors to finish.

At least one must finish after this point to avoid a deadlock.

get_execution_engine(name)[source]

Return an execution engine instance.

get_executor()[source]

Return an executor instance.

get_expression_engine(name)[source]

Return an expression engine instance.

handle_control_event(message)[source]

Handle an event from the Channels layer.

Channels layer callback, do not call directly.

load_execution_engines(engines)[source]

Load execution engines.

load_executor(executor_name)[source]

Load process executor.

load_expression_engines(engines)[source]

Load expression engines.

override_settings(**kwargs)[source]

Override global settings within the calling context.

Parameters:kwargs – The settings overrides. Same use as for django.test.override_settings().
reset(keep_state=False)[source]

Reset the shared state and drain Django Channels.

Parameters:keep_state – If True, do not reset the shared manager state (useful in tests, where the settings overrides need to be kept). Defaults to False.
run(data, runtime_dir, argv)[source]

Select a concrete connector and run the process through it.

Parameters:
  • data – The Data object that is to be run.
  • runtime_dir – The directory the executor is run from.
  • argv – The argument vector used to spawn the executor.
class resolwe.flow.managers.dispatcher.SettingsJSONifier(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Customized JSON encoder, coercing all unknown types into strings.

Needed due to the class hierarchy coming out of the database, which can’t be serialized using the vanilla json encoder.

default(o)[source]

Try default; otherwise, coerce the object into a string.

resolwe.flow.managers.dispatcher.dependency_status(data)[source]

Return abstracted status of dependencies.

  • STATUS_ERROR .. one dependency has error status or was deleted
  • STATUS_DONE .. all dependencies have done status
  • None .. other

Workload Connectors

The workload management system connectors are used as glue between the Resolwe Manager and various concrete workload management systems that might be used by it. Since the only functional requirement is job submission, they can be simple and nearly contextless.

Base Class

class resolwe.flow.managers.workload_connectors.base.BaseConnector[source]

The abstract base class for workload manager connectors.

The main Manager instance in manager uses connectors to handle communication with concrete backend workload management systems, such as Celery and SLURM. The connectors need not worry about how jobs are discovered or how they’re prepared for execution; this is all done by the manager.

submit(data, runtime_dir, argv)[source]

Submit the job to the workload management system.

Parameters:
  • data – The Data object that is to be run.
  • runtime_dir – The directory the executor is run from.
  • argv – The argument vector used to spawn the executor.

Local Connector

class resolwe.flow.managers.workload_connectors.local.Connector[source]

Local connector for job execution.

submit(data, runtime_dir, argv)[source]

Run process locally.

For details, see submit().

Celery Connector

class resolwe.flow.managers.workload_connectors.celery.Connector[source]

Celery-based connector for job execution.

submit(data, runtime_dir, argv)[source]

Run process.

For details, see submit().

Slurm Connector

class resolwe.flow.managers.workload_connectors.slurm.Connector[source]

Slurm-based connector for job execution.

submit(data, runtime_dir, argv)[source]

Run process with SLURM.

For details, see submit().

Listener

Standalone Redis client used as a contact point for executors.

class resolwe.flow.managers.listener.ExecutorListener(*args, **kwargs)[source]

The contact point implementation for executors.

check_critical_load()[source]

Check for critical load and log an error if necessary.

clear_queue()[source]

Reset the executor queue channel to an empty state.

handle_abort(obj)[source]

Handle an incoming Data abort processing request.

Important

This only makes manager’s state consistent and doesn’t affect Data object in any way. Any changes to the Data must be applied over handle_update method.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'abort',
    'data_id': [id of the :class:`~resolwe.flow.models.Data` object
               this command was triggered by],
}
handle_annotate(obj)[source]

Handle an incoming Data object annotate request.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'annotate',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object this command annotates],
    'annotations': {
        [annotations to be added/updated]
    }
}
handle_download_aborted(obj)[source]

Handle an incoming download aborted request.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'download_aborted',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
    'storage_location_id': id of storage location
}
handle_download_finished(obj)[source]

Handle an incoming download finished request.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'download_finished',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
    'storage_location_id': id of storage location
}
handle_download_started(obj)[source]

Handle an incoming request to start downloading data.

We have to check if the download for given StorageLocation object has already started.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'download_started',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
    'storage_location_id': id of storage location
    'download_started_lock': obtain lock, defaults to False
}
handle_finish(obj)[source]

Handle an incoming Data finished processing request.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'finish',
    'data_id': [id of the :class:`~resolwe.flow.models.Data` object
               this command changes],
    'process_rc': [exit status of the processing]
    'spawn_processes': [optional; list of spawn dictionaries],
    'exported_files_mapper': [if spawn_processes present]
}
handle_get_files_to_download(obj)[source]

Get a list of files belonging to a given storage location object.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'get_files_to_download',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
    'storage_location_id': id of the :class:`~resolwe.storage.models.StorageLocation` object.
}
handle_get_referenced_files(obj)[source]

Get a list of files referenced by the data object.

To get the entire output this request must be sent after processing is finished.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'get_referenced_data',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
}
handle_log(obj)[source]

Handle an incoming log processing request.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'log',
    'message': [log message]
}
handle_missing_data_locations(obj)[source]

Handle an incoming request to get missing data locations.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'missing_data_locations',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object]
}
handle_referenced_files(obj)[source]

Store a list of files and directories produced by the worker.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'referenced_data',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
    'referenced_files': list of referenced file names relative
        to the DATA_DIR.
    'referenced_dirs': list of referenced directory paths
        relative to the DATA_DIR.
}
handle_storage_location_lock(obj)[source]

Handle an incoming request to lock StorageLocation object.

Lock is implemented by creating AccessLog object with finish date set to None.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'storage_location_lock',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
    'storage_location_id': id of storage location
    'storage_location_lock_reason': reason for lock.
}
handle_storage_location_unlock(obj)[source]

Handle an incoming request to unlock StorageLocation object.

Parameters:obj

The Channels message object. Command object format:

{
    'command': 'storage_location_unlock',
    'data_id': [id of the :class:`~resolwe.flow.models.Data`
               object],
    'storage_access_log_id': id of access log to close.
}
handle_update(obj, internal_call=False)[source]

Handle an incoming Data object update request.

Parameters:
  • obj

    The Channels message object. Command object format:

    {
        'command': 'update',
        'data_id': [id of the :class:`~resolwe.flow.models.Data`
                   object this command changes],
        'changeset': {
            [keys to be changed]
        }
    }
    
  • internal_call – If True, this is an internal delegate call, so a reply to the executor won’t be sent.
hydrate_spawned_files(exported_files_mapper, filename, data_id)[source]

Pop the given file’s map from the exported files mapping.

Parameters:
  • exported_files_mapper – The dict of file mappings this process produced.
  • filename – The filename to format and remove from the mapping.
  • data_id – The id of the Data() object owning the mapping.
Returns:

The formatted mapping between the filename and temporary file path.

Return type:

dict

push_stats()[source]

Push current stats to Redis.

run()[source]

Run the main listener run loop.

Doesn’t return until terminate() is called.

terminate()[source]

Stop the standalone manager.

State

Synchronized singleton state container for the manager.

resolwe.flow.managers.state.update_constants()[source]

Recreate channel name constants with changed settings.

This kludge is mostly needed due to the way Django settings are patched for testing and how modules need to be imported throughout the project. On import time, settings are not patched yet, but some of the code needs static values immediately. Updating functions such as this one are then needed to fix dummy values.

class resolwe.flow.managers.state.ManagerState(key_prefix)[source]

State interface implementation.

This holds variables required to be shared between all manager workers and takes care of operation atomiticy and synchronization. Redis facilitates storage shared between workers, whereas atomicity needs to be dealt with explicitly; this interface hides the Redis and Python details required to achieve syntax-transparent atomicity (such as being able to do executor_count += 1, a load-modify-store operation sequence).

Consumer

Manager Channels consumer.

class resolwe.flow.managers.consumer.ManagerConsumer(*args, **kwargs)[source]

Channels consumer for handling manager events.

control_event(message)[source]

Forward control events to the manager dispatcher.

resolwe.flow.managers.consumer.exit_consumer()[source]

Cause the synchronous consumer to exit cleanly.

resolwe.flow.managers.consumer.run_consumer(timeout=None, dry_run=False)[source]

Run the consumer until it finishes processing.

Parameters:
  • timeout – Set maximum execution time before cancellation, or None (default) for unlimited.
  • dry_run – If True, don’t actually dispatch messages, just dequeue them. Defaults to False.
resolwe.flow.managers.consumer.send_event(message)[source]

Construct a Channels event packet with the given message.

Parameters:message – The message to send to the manager workers.

Utilities

Utilities for using global manager features.

resolwe.flow.managers.utils.disable_auto_calls()[source]

Decorator/context manager which stops automatic manager calls.

When entered, automatic communicate() calls from the Django transaction signal are not done.

Flow Executors

Main standalone execution stub, used when the executor is run.

It should be run as a module with one argument: the relative module name of the concrete executor class to use. The current working directory should be where the executors module directory is, so that it can be imported with python’s -m <module> interpreter option.

Usage format:

/path/to/python -m executors .executor_type

Concrete example, run from the directory where ./executors/ is:

/venv/bin/python -m executors .docker

using the python from the venv virtualenv.

Note

The startup code adds the concrete class name as needed, so that in the example above, what’s actually instantiated is .docker.run.FlowExecutor.

Base Class

class resolwe.flow.executors.run.BaseFlowExecutor(*args, **kwargs)[source]

Represents a workflow executor.

annotate_data(**kwargs)[source]

Annotate Data object.

Parameters:kwargs – The dictionary of annotations to be added/updated.
end()[source]

End process execution.

get_stdout()[source]

Get process’ standard output.

get_tools_paths()[source]

Get tools paths.

run(data_id, script, log_file, json_file)[source]

Execute the script and save results.

run_script(script)[source]

Run process script.

start()[source]

Start process execution.

terminate()[source]

Terminate a running script.

update_data_status(**kwargs)[source]

Update (PATCH) Data object.

Parameters:kwargs – The dictionary of Data attributes to be changed.

Flow Executor Preparer

Framework for the manager-resident executor preparation facilities.

class resolwe.flow.executors.prepare.BaseFlowExecutorPreparer[source]

Represents the preparation functionality of the executor.

extend_settings(data_id, files, secrets)[source]

Extend the settings the manager will serialize.

Parameters:
  • data_id – The Data object id being prepared for.
  • files – The settings dictionary to be serialized. Keys are filenames, values are the objects that will be serialized into those files. Standard filenames are listed in resolwe.flow.managers.protocol.ExecutorFiles.
  • secrets – Secret files dictionary describing additional secret file content that should be created and made available to processes with special permissions. Keys are filenames, values are the raw strings that should be written into those files.
get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

get_tools_paths()[source]

Get tools’ paths.

post_register_hook(verbosity=1)[source]

Run hook after the ‘register’ management command finishes.

Subclasses may implement this hook to e.g. pull Docker images at this point. By default, it does nothing.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters:
  • data – Data object instance
  • filename – Filename to resolve
Returns:

Resolved filename, which can be used to access the given data file in programs executed using this executor

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters:filename – Filename to resolve
Returns:Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Docker Flow Executor

class resolwe.flow.executors.docker.run.FlowExecutor(*args, **kwargs)[source]

Docker executor.

end()[source]

End process execution.

run_script(script)[source]

Execute the script and save results.

start()[source]

Start process execution.

terminate()[source]

Terminate a running script.

Preparation

class resolwe.flow.executors.docker.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the docker executor.

get_environment_variables()[source]

Return dict of environment variables that will be added to executor.

post_register_hook(verbosity=1)[source]

Pull Docker images needed by processes after registering.

resolve_data_path(data=None, filename=None)[source]

Resolve data path for use with the executor.

Parameters:
  • data – Data object instance
  • filename – Filename to resolve
Returns:

Resolved filename, which can be used to access the given data file in programs executed using this executor

resolve_upload_path(filename=None)[source]

Resolve upload path for use with the executor.

Parameters:filename – Filename to resolve
Returns:Resolved filename, which can be used to access the given uploaded file in programs executed using this executor

Local Flow Executor

class resolwe.flow.executors.local.run.FlowExecutor(*args, **kwargs)[source]

Local dataflow executor proxy.

Preparation

class resolwe.flow.executors.local.prepare.FlowExecutorPreparer[source]

Specialized manager assist for the local executor.

extend_settings(data_id, files, secrets)[source]

Prevent processes requiring access to secrets from being run.

Null Flow Executor

class resolwe.flow.executors.null.run.FlowExecutor(*args, **kwargs)[source]

Null dataflow executor proxy.

This executor is intended to be used in tests where you want to save the object to the database but don’t need to run it.

Flow Models

Base Model

Base model for all other models.

class resolwe.flow.models.base.BaseModel(*args, **kwargs)[source]

Abstract model that includes common fields for other models.

class Meta[source]

BaseModel Meta options.

contributor

user that created the entry

created

creation date and time

modified

modified date and time

name

object name

save(*args, **kwargs)[source]

Save the model.

slug

URL slug

version

process version

Collection Model

Postgres ORM model for the organization of collections.

class resolwe.flow.models.collection.BaseCollection(*args, **kwargs)[source]

Template for Postgres model for storing a collection.

class Meta[source]

BaseCollection Meta options.

description

detailed description

descriptor

collection descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

collection descriptor schema

save(*args, **kwargs)[source]

Perform descriptor validation and save object.

search

field used for full-text search

tags

tags for categorizing objects

class resolwe.flow.models.Collection(*args, **kwargs)[source]

Postgres model for storing a collection.

duplicate(contributor=None)[source]

Duplicate (make a copy).

duplicated

duplication date and time

is_duplicate()[source]

Return True if collection is a duplicate.

objects = <django.db.models.manager.ManagerFromCollectionQuerySet object>

manager

Data model

Postgres ORM model for keeping the data structured.

class resolwe.flow.models.Data(*args, **kwargs)[source]

Postgres model for storing data.

STATUS_DIRTY = 'DR'

data object is in dirty state

STATUS_DONE = 'OK'

data object is done

STATUS_ERROR = 'ER'

data object is in error state

STATUS_PREPARING = 'PP'

data object is preparing

STATUS_PROCESSING = 'PR'

data object is processing

STATUS_RESOLVING = 'RE'

data object is being resolved

STATUS_UPLOADING = 'UP'

data object is uploading

STATUS_WAITING = 'WT'

data object is waiting

checksum

checksum field calculated on inputs

collection

collection

delete(*args, **kwargs)[source]

Delete the data model.

descriptor

actual descriptor

descriptor_dirty

indicate whether descriptor doesn’t match descriptor_schema (is dirty)

descriptor_schema

data descriptor schema

duplicate(contributor=None, inherit_entity=False, inherit_collection=False)[source]

Duplicate (make a copy).

duplicated

duplication date and time

entity

entity

finished

process finished date date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

get_runtime_path(filename=None)[source]

Get the runtime directory of the executor.

That is the script that created this object. When filename is not None return the path to the filename in the working directory of the executor.

input

actual inputs used by the process

is_duplicate()[source]

Return True if data object is a duplicate.

location

data location

move_to_collection(destination_collection)[source]

Move data object to collection.

named_by_user

track if user set the data name explicitly

objects = <django.db.models.manager.ManagerFromDataQuerySet object>

manager

output

actual outputs of the process

parents

dependencies between data objects

process

process used to compute the data object

process_cores

actual allocated cores

process_error

error log message

process_info

info log message

process_memory

actual allocated memory

process_pid

process id

process_progress

progress

process_rc

return code

process_warning

warning log message

resolve_secrets()[source]

Retrieve handles for all basic:secret: fields on input.

The process must have the secrets resource requirement specified in order to access any secrets. Otherwise this method will raise a PermissionDenied exception.

Returns:A dictionary of secrets where key is the secret handle and value is the secret value.
save(render_name=False, *args, **kwargs)[source]

Save the data model.

save_dependencies(instance, schema)[source]

Save data: and list:data: references as parents.

save_storage(instance, schema)[source]

Save basic:json values to a Storage collection.

scheduled

date and time when process was dispatched to the scheduling system (set by``resolwe.flow.managers.dispatcher.Manager.run``

search

field used for full-text search

size

total size of data’s outputs in bytes

started

process started date and time (set by resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives)

status

Data status

It can be one of the following:

tags

tags for categorizing objects

validate_change_collection(destination_collection)[source]

Raise validation error if data object cannot change collection.

class resolwe.flow.models.DataDependency(*args, **kwargs)[source]

Dependency relation between data objects.

KIND_IO = 'io'

child uses parent’s output as its input

KIND_SUBPROCESS = 'subprocess'

child was spawned by the parent

child

child data object

kind

kind of dependency

parent

parent data object

Entity–relationship model

Postgres ORM to define the entity–relationship model that describes how data objects are related in a specific domain.

class resolwe.flow.models.Entity(*args, **kwargs)[source]

Postgres model for storing entities.

collection

collection to which entity belongs

duplicate(contributor=None, inherit_collection=False)[source]

Duplicate (make a copy).

duplicated

duplication date and time

is_duplicate()[source]

Return True if entity is a duplicate.

move_to_collection(source_collection, destination_collection)[source]

Move entity to destination collection.

objects = <django.db.models.manager.ManagerFromEntityQuerySet object>

manager

type

entity type

class resolwe.flow.models.Relation(*args, **kwargs)[source]

Relations between entities.

The Relation model defines the associations and dependencies between entities in a given collection:

{
    "collection": "<collection_id>",
    "type": "comparison",
    "category": "case-control study",
    "entities": [
        {"enetity": "<entity1_id>", "label": "control"},
        {"enetity": "<entity2_id>", "label": "case"},
        {"enetity": "<entity3_id>", "label": "case"}
    ]
}

Relation type defines a specific set of associations among entities. It can be something like group, comparison or series. The relation type is an instance of RelationType and should be defined in any Django app that uses relations (e.g., as a fixture). Multiple relations of the same type are allowed on the collection.

Relation category defines a specific use case. The relation category must be unique in a collection, so that users can distinguish between different relations. In the example above, we could add another comparison relation of category, say Case-case study to compare <entity2> with <entity3>.

Relation is linked to resolwe.flow.models.Collection to enable defining different relations structures in different collections. This also greatly speed up retrieving of relations, as they are envisioned to be mainly used on a collection level.

unit defines units used in partitions where it is applicable, e.g. in relations of type series.

category

category of the relation

collection

collection to which relation belongs

entities

partitions of entities in the relation

type

type of the relation

unit

unit used in the partitions’ positions (where applicable, e.g. for serieses)

class resolwe.flow.models.RelationType(*args, **kwargs)[source]

Model for storing relation types.

name

relation type name

ordered

indicates if order of entities in relation is important or not

DescriptorSchema model

Postgres ORM model for storing descriptors.

class resolwe.flow.models.DescriptorSchema(*args, **kwargs)[source]

Postgres model for storing descriptors.

description

detailed description

schema

user descriptor schema represented as a JSON object

Process model

Postgres ORM model for storing processes.

class resolwe.flow.models.Process(*args, **kwargs)[source]

Postgres model for storing processes.

PERSISTENCE_CACHED = 'CAC'

cached persistence

PERSISTENCE_RAW = 'RAW'

raw persistence

PERSISTENCE_TEMP = 'TMP'

temp persistence

category

category

data_name

template for name of Data object created with Process

description

detailed description

entity_always_create

Create new entity, regardless of entity_input or entity_descriptor_schema fields.

entity_descriptor_schema

Slug of the descriptor schema assigned to the Entity created with entity_type.

entity_input

Limit the entity selection in entity_type to a single input.

entity_type

Automatically add Data object created with this process to an Entity object representing a data-flow. If all input Data objects belong to the same entity, add newly created Data object to it, otherwise create a new one.

get_resource_limits()[source]

Get the core count and memory usage limits for this process.

Returns:A dictionary with the resource limits, containing the following keys:
  • memory: Memory usage limit, in MB. Defaults to 4096 if not otherwise specified in the resource requirements.
  • cores: Core count limit. Defaults to 1.
Return type:dict
input_schema

process input schema (describes input parameters, form layout “Inputs” for Data.input)

Handling:

  • schema defined by: dev
  • default by: user
  • changable by: none
is_active

designates whether this process should be treated as active

output_schema

process output schema (describes output JSON, form layout “Results” for Data.output)

Handling:

  • schema defined by: dev
  • default by: dev
  • changable by: dev

Implicitly defined fields (by resolwe.flow.management.commands.register() or resolwe.flow.executors.run.BaseFlowExecutor.run or its derivatives):

  • progress of type basic:float (from 0.0 to 1.0)
  • proc of type basic:group containing:
    • stdout of type basic:text
    • rc of type basic:integer
    • task of type basic:string (celery task id)
    • worker of type basic:string (celery worker hostname)
    • runtime of type basic:string (runtime instance hostname)
    • pid of type basic:integer (process ID)
persistence

Persistence of Data objects created with this process. It can be one of the following:

Note

If persistence is set to PERSISTENCE_CACHED or PERSISTENCE_TEMP, the process must be idempotent.

requirements

process requirements

run

process command and environment description for internal use

Handling:

  • schema defined by: dev
  • default by: dev
  • changable by: dev
scheduling_class

process scheduling class

type

data type

Storage model

Postgres ORM model for storing JSON.

class resolwe.flow.models.Storage(*args, **kwargs)[source]

Postgres model for storing storages.

data

corresponding data objects

json

actual JSON stored

objects = <django.db.models.manager.StorageManagerFromBaseQuerySet object>

storage manager

Secret model

Postgres ORM model for storing secrets.

class resolwe.flow.models.Secret(*args, **kwargs)[source]

Postgres model for storing secrets.

ProcessMigrationHistory model

Postgres ORM model for storing proces migration history.

class resolwe.flow.models.ProcessMigrationHistory(*args, **kwargs)[source]

Model for storing process migration history.

DataMigrationHistory model

Postgres ORM model for storing data migration history.

class resolwe.flow.models.DataMigrationHistory(*args, **kwargs)[source]

Model for storing data migration history.

Flow Utilities

Resolwe Exceptions Utils

Utils functions for working with exceptions.

resolwe.flow.utils.exceptions.resolwe_exception_handler(exc, context)[source]

Handle exceptions raised in API and make them nicer.

To enable this, you have to add it to the settings:

REST_FRAMEWORK = {
    'EXCEPTION_HANDLER': 'resolwe.flow.utils.exceptions.resolwe_exception_handler',
}

Statistics

Various statistical utilities, used mostly for manager load tracking.

class resolwe.flow.utils.stats.NumberSeriesShape[source]

Helper class for computing characteristics for numerical data.

Given a series of numerical data, the class will keep a record of the extremes seen, arithmetic mean and standard deviation.

to_dict()[source]

Pack the stats computed into a dictionary.

update(num)[source]

Update metrics with the new number.

class resolwe.flow.utils.stats.SimpleLoadAvg(intervals)[source]

Helper class for a sort of load average based on event times.

Given a series of queue depth events, it will compute the average number of events for three different window lengths, emulating a form of ‘load average’. The calculation itself is modelled after the Linux scheduler, with a 5-second sampling rate. Because we don’t get consistent (time-wise) samples, the sample taken is the average of a simple moving window for the last 5 seconds; this is to avoid numerical errors if actual time deltas were used to compute the scaled decay.

add(count, timestamp=None)[source]

Add a value at the specified time to the series.

Parameters:
  • count – The number of work items ready at the specified time.
  • timestamp – The timestamp to add. Defaults to None, meaning current time. It should be strictly greater (newer) than the last added timestamp.
to_dict()[source]

Pack the load averages into a nicely-keyed dictionary.

Flow Management

Register Processes

class resolwe.flow.management.commands.register.Command(stdout=None, stderr=None, no_color=False, force_color=False)[source]

Register processes.

add_arguments(parser)[source]

Command arguments.

find_descriptor_schemas(schema_file)[source]

Find descriptor schemas in given path.

find_schemas(schema_path, schema_type='process', verbosity=1)[source]

Find schemas in packages that match filters.

handle(*args, **options)[source]

Register processes.

register_descriptors(descriptor_schemas, user, force=False, verbosity=1)[source]

Read and register descriptors.

register_processes(process_schemas, user, force=False, verbosity=1)[source]

Read and register processors.

retire(process_schemas)[source]

Retire obsolete processes.

Remove old process versions without data. Find processes that have been registered but do not exist in the code anymore, then:

  • If they do not have data: remove them
  • If they have data: flag them not active (is_active=False)
valid(instance, schema)[source]

Validate schema.

Resolwe Test Framework

Resolwe Test Cases

class resolwe.test.TestCaseHelpers(methodName='runTest')[source]

Mixin for test case helpers.

assertAlmostEqualGeneric(actual, expected, msg=None)[source]

Assert almost equality for common types of objects.

This is the same as assertEqual(), but using assertAlmostEqual() when floats are encountered inside common containers (currently this includes dict, list and tuple types).

Parameters:
  • actual – object to compare
  • expected – object to compare against
  • msg – optional message printed on failures
keep_data(mock_purge=True)[source]

Do not delete output files after tests.

setUp()[source]

Prepare environment for test.

class resolwe.test.TransactionTestCase(methodName='runTest')[source]

Base class for writing Resolwe tests not enclosed in a transaction.

It is based on Django’s TransactionTestCase. Use it if you need to access the test’s database from another thread/process.

setUp()[source]

Initialize test data.

class resolwe.test.TestCase(methodName='runTest')[source]

Base class for writing Resolwe tests.

It is based on TransactionTestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

class resolwe.test.ProcessTestCase(methodName='runTest')[source]

Base class for writing process tests.

It is a subclass of TransactionTestCase with some specific functions used for testing processes.

To write a process test use standard Django’s syntax for writing tests and follow the next steps:

  1. Put input files (if any) in tests/files directory of a Django application.
  2. Run the process using run_process().
  3. Check if the process has the expected status using assertStatus().
  4. Check process’s output using assertFields(), assertFile(), assertFileExists(), assertFiles() and assertJSON().

Note

When creating a test case for a custom Django application, subclass this class and over-ride the self.files_path with:

self.files_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'files')

Danger

If output files don’t exist in tests/files directory of a Django application, they are created automatically. But you have to check that they are correct before using them for further runs.

assertDir(obj, field_path, fn)[source]

Compare process output directory to correct compressed directory.

Parameters:
  • obj (Data) – object that includes the directory to compare
  • field_path (str) – path to Data object’s field with the file name
  • fn (str) – file name (and relative path) of the correct compressed directory to compare against. Path should be relative to the tests/files directory of a Django application. Compressed directory needs to be in tar.gz format.
assertDirExists(obj, field_path)[source]

Assert that a directory in the output field of the given object exists.

Parameters:
  • obj – object that includes the file for which to check if it exists
  • field_path – directory name/path
assertDirStructure(obj, field_path, dir_struct, exact=True)[source]

Assert correct tree structure in output field of given object.

Only names of directories and files are asserted. Content of files is not compared.

Parameters:
  • obj (Data) – object that includes the directory to compare
  • dir_path (str) – path to the directory to compare
  • dir_struct (dict) – correct tree structure of the directory. Dictionary keys are directory and file names with the correct nested structure. Dictionary value associated with each directory is a new dictionary which lists the content of the directory. Dictionary value associated with each file name is None
  • exact (bool) – if True tested directory structure must exactly match dir_struct. If False dir_struct must be a partial structure of the directory to compare
assertFields(obj, path, value)[source]

Compare object’s field to the given value.

The file size is ignored. Use assertFile to validate file contents.

Parameters:
  • obj (Data) – object with the field to compare
  • path (str) – path to Data object’s field
  • value (str) – desired value of Data object’s field
assertFile(obj, field_path, fn, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters:
  • obj (Data) – object that includes the file to compare
  • field_path (str) – path to Data object’s field with the file name
  • fn (str) – file name (and relative path) of the correct file to compare against. Path should be relative to the tests/files directory of a Django application.
  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.
  • filter (FunctionType) – function for filtering the contents of output files. It is used in itertools.filterfalse() function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.
  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.
assertFileExists(obj, field_path)[source]

Ensure a file in the given object’s field exists.

Parameters:
  • obj (Data) – object that includes the file for which to check if it exists
  • field_path (str) – path to Data object’s field with the file name/path
assertFiles(obj, field_path, fn_list, **kwargs)[source]

Compare a process’s output file to the given correct file.

Parameters:
  • obj (Data) – object which includes the files to compare
  • field_path (str) – path to Data object’s field with the list of file names
  • fn_list (list) – list of file names (and relative paths) of files to compare against. Paths should be relative to the tests/files directory of a Django application.
  • compression (str) – if not None, files will be uncompressed with the appropriate compression library before comparison. Currently supported compression formats are gzip and zip.
  • filter (FunctionType) – Function for filtering the contents of output files. It is used in itertools.filterfalse function and takes one parameter, a line of the output file. If it returns True, the line is excluded from comparison of the two files.
  • sort (bool) – if set to True, basic sort will be performed on file contents before computing hash value.
assertFilesExist(obj, field_path)[source]

Ensure files in the given object’s field exists.

Parameters:
  • obj (Data) – object that includes list of files for which to check existance
  • field_path (str) – path to Data object’s field with the file name/path
assertJSON(obj, storage, field_path, file_name)[source]

Compare JSON in Storage object to the given correct JSON.

Parameters:
  • obj (Data) – object to which the Storage object belongs
  • storage (Storage or str) – object or id which contains JSON to compare
  • field_path (str) – path to JSON subset in the Storage’s object to compare against. If it is empty, the entire object will be compared.
  • file_name (str) –

    file name (and relative path) of the file with the correct JSON to compare against. Path should be relative to the tests/files directory of a Django application.

    Note

    The given JSON file should be compresed with gzip and have the .gz extension.

assertStatus(obj, status)[source]

Check if object’s status is equal to the given status.

Parameters:
  • obj (Data) – object for which to check the status
  • status (str) – desired value of object’s status attribute
files_path

Path to test files.

get_json(file_name, storage)[source]

Return JSON saved in file and test JSON to compare it to.

The method returns a tuple of the saved JSON and the test JSON. In your test you should then compare the test JSON to the saved JSON that is commited to the repository.

The storage argument could be a Storage object, Storage ID or a Python dictionary. The test JSON is assigned a json field of the Storage object or the complete Python dictionary (if a dict is given).

If the file does not exist it is created, the test JSON is written to the new file and an exception is rased.

Parameters:
  • file_name (str) – file name (and relative path) of a JSON file. Path should be relative to the tests/files directory of a Django app. The file name must have a .gz extension.
  • storage (Storage, str or dict) – Storage object, Storage ID or a dict.
Returns:

(reference JSON, test JSON)

Return type:

tuple

preparation_stage()[source]

Context manager to mark input preparation stage.

run_process(process_slug, input_={}, assert_status='OK', descriptor=None, descriptor_schema=None, verbosity=0, tags=None)[source]

Run the specified process with the given inputs.

If input is a file, file path should be given relative to the tests/files directory of a Django application. If assert_status is given, check if Data object’s status matches it after the process has finished.

Note

If you need to delay calling the manager, you must put the desired code in a with transaction.atomic() block.

Parameters:
  • process_slug (str) – slug of the Process to run
  • input_ (dict) –

    Process’s input parameters

    Note

    You don’t have to specify parameters with defined default values.

  • assert_status (str) – desired status of the Data object
  • descriptor (dict) – descriptor to set on the Data object
  • descriptor_schema (dict) – descriptor schema to set on the Data object
  • tags (list) – list of tags that will be added to the created Data object
Returns:

object created by Process

Return type:

Data

run_processor(*args, **kwargs)[source]

Run process.

Deprecated method: use run_process.

setUp()[source]

Initialize test data.

tearDown()[source]

Clean up after the test.

class resolwe.test.TransactionResolweAPITestCase(methodName='runTest')[source]

Base class for testing Resolwe REST API.

This class is derived from Django REST Framework’s APITransactionTestCase class and has implemented some basic features that makes testing Resolwe API easier. These features includes following functions:

_get_list(user=None, query_params={})[source]

Make GET request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:user (User or None) – User to authenticate in request
Returns:API response object
Return type:Response
_get_detail(pk, user=None, query_params={})[source]

Make GET request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_post(data={}, user=None, query_params={})[source]

Make POST request to self.list_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • data (dict) – data for posting in request’s body
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_patch(pk, data={}, user=None, query_params={})[source]

Make PATCH request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • data (dict) – data for posting in request’s body
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_delete(pk, user=None, query_params={})[source]

Make DELETE request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

_detail_permissions(pk, data={}, user=None)[source]

Make POST request to self.detail_view view.

If user is not None, the given user is authenticated before making the request.

Parameters:
  • pk (int) – Primary key of the coresponding object
  • data (dict) – data for posting in request’s body
  • user (User or None) – User to authenticate in request
Returns:

API response object

Return type:

Response

It also has included 2 views made from referenced DRF’s ViewSet. First mimic list view and has following links between request’s methods and ViewSet’s methods:

  • GET -> list
  • POST -> create

Second mimic detail view and has following links between request’s methods and ViewSet’s methods:

  • GET -> retrieve
  • PUT -> update
  • PATCH -> partial_update
  • DELETE -> destroy
  • POST -> permissions

If any of the listed methods is not defined in the VievSet, corresponding link is omitted.

Note

self.viewset (instance of DRF’s Viewset) and self.resource_name (string) must be defined before calling super setUp method to work properly.

self.factory is instance of DRF’s APIRequestFactory.

assertKeys(data, wanted)[source]

Assert dictionary keys.

detail_permissions(pk)[source]

Get detail permissions url.

detail_url(pk)[source]

Get detail url.

list_url

Get list url.

setUp()[source]

Prepare data.

class resolwe.test.ResolweAPITestCase(methodName='runTest')[source]

Base class for writing Resolwe API tests.

It is based on TransactionResolweAPITestCase and Django’s TestCase. The latter encloses the test code in a database transaction that is rolled back at the end of the test.

Resolwe Test Helpers and Decorators

resolwe.test.utils.check_installed(command)[source]

Check if the given command is installed.

Parameters:command (str) – name of the command
Returns:(indicator of the availability of the command, message saying command is not available)
Return type:tuple(bool, str)
resolwe.test.utils.check_docker()[source]

Check if Docker is installed and working.

Returns:(indicator of the availability of Docker, reason for unavailability)
Return type:tuple(bool, str)
resolwe.test.utils.create_data_location(subpath=None)[source]

Create equivalent of old DataLocation object.

When argument is None, store the ID of the file storage object in the subpath.

resolwe.test.utils.with_custom_executor(wrapped=None, **custom_executor_settings)[source]

Decorate unit test to run processes with a custom executor.

Parameters:custom_executor_settings (dict) – custom FLOW_EXECUTOR settings with which you wish to override the current settings
resolwe.test.utils.with_docker_executor(wrapped=None)[source]

Decorate unit test to run processes with the Docker executor.

resolwe.test.utils.with_null_executor(wrapper=None, enabled=None, adapter=None)[source]

Decorate unit test to run processes with the Null executor.

resolwe.test.utils.with_resolwe_host(wrapper=None, enabled=None, adapter=None)[source]

Decorate unit test to give it access to a live Resolwe host.

Set RESOLWE_HOST_URL setting to the address where the testing live Resolwe host listens to.

Note

This decorator must be used with a (sub)class of LiveServerTestCase which starts a live Django server in the background.

resolwe.test.utils.is_testing()[source]

Return current testing status.

This assumes that the Resolwe test runner is being used.

Resolwe Utilities

class resolwe.utils.BraceMessage(fmt, *args, **kwargs)[source]

Log messages with the new {}-string formatting syntax.

Note

When using this helper class, one pays no significant performance penalty since the actual formatting only happens when (and if) the logged message is actually outputted to a log by a handler.

Example of usage:

from resolwe.utils import BraceMessage as __

logger.error(__("Message with {0} {name}", 2, name="placeholders"))

Source: https://docs.python.org/3/howto/logging-cookbook.html#use-of-alternative-formatting-styles.