Source code for resolwe.flow.executors.docker.prepare

""".. Ignore pydocstyle D400.

===========
Preparation
===========

.. autoclass:: resolwe.flow.executors.docker.prepare.FlowExecutorPreparer
    :members:

"""
import os

from django.conf import settings
from django.core.management import call_command

from resolwe.storage.connectors import connectors

from .. import constants
from ..prepare import BaseFlowExecutorPreparer


[docs]class FlowExecutorPreparer(BaseFlowExecutorPreparer): """Specialized manager assist for the docker executor."""
[docs] def post_register_hook(self, verbosity=1): """Pull Docker images needed by processes after registering.""" if not getattr(settings, "FLOW_DOCKER_DONT_PULL", False): call_command("list_docker_images", pull=True, verbosity=verbosity)
[docs] def resolve_data_path(self, data=None, filename=None): """Resolve data path for use with the executor. :param data: Data object instance :param filename: Filename to resolve :return: Resolved filename, which can be used to access the given data file in programs executed using this executor :raises RuntimeError: when data path can not be resolved. """ storage_name = "data" filesystem_connectors = [ connector for connector in connectors.for_storage(storage_name) if connector.mountable ] if data is None: if not filesystem_connectors: return constants.INPUTS_VOLUME else: return f"/{storage_name}_{filesystem_connectors[0].name}" data_connectors = data.location.connectors for connector in filesystem_connectors: if connector in data_connectors: return data.location.get_path( prefix=f"/{storage_name}_{connector.name}", filename=filename ) return data.location.get_path(prefix=constants.INPUTS_VOLUME, filename=filename)
[docs] def resolve_upload_path(self, filename=None): """Resolve upload path for use with the executor. :param filename: Filename to resolve :return: Resolved filename, which can be used to access the given uploaded file in programs executed using this executor """ upload_connectors = [ connector for connector in connectors.for_storage("upload") if connector.mountable ] if not upload_connectors: raise RuntimeError("No connectors are configured for 'upload' storage.") upload_connector = upload_connectors[0] if filename is None: return f"/upload_{upload_connector.name}" return f"/upload_{upload_connector.name}/{filename}"
[docs] def get_environment_variables(self): """Return dict of environment variables that will be added to executor.""" return {"TMPDIR": os.fspath(constants.PROCESSING_VOLUME / constants.TMPDIR)}