Source code for resolwe.flow.managers.workload_connectors.celery

""".. Ignore pydocstyle D400.

================
Celery Connector
================

"""
import logging
import os
import sys

from django.conf import settings

from resolwe.flow.models import Data, Process
from resolwe.flow.tasks import celery_run
from resolwe.storage import settings as storage_settings
from resolwe.utils import BraceMessage as __

from .base import BaseConnector

logger = logging.getLogger(__name__)

# Sphinx directly imports the modules it's documenting, so we need to
# guard from importing celery on installations which are configured to
# not use celery and thus don't have it available.
if "sphinx" not in sys.modules:
    try:
        import celery  # noqa: F401
    except ImportError:
        logger.error(
            "Please install Celery using 'pip install celery'", file=sys.stderr
        )
        sys.exit(1)


[docs]class Connector(BaseConnector): """Celery-based connector for job execution."""
[docs] def submit(self, data: Data, argv): """Run process. For details, see :meth:`~resolwe.flow.managers.workload_connectors.base.BaseConnector.submit`. """ queue = "ordinary" if data.process.scheduling_class == Process.SCHEDULING_CLASS_INTERACTIVE: queue = "hipri" logger.debug( __( "Connector '{}' running for Data with id {} ({}) in celery queue {}, EAGER is {}.", self.__class__.__module__, data.id, repr(argv), queue, getattr(settings, "CELERY_ALWAYS_EAGER", None), ) ) runtime_dir = storage_settings.FLOW_VOLUMES["runtime"]["config"]["path"] celery_run.apply_async((data.id, os.fspath(runtime_dir), argv), queue=queue)
[docs] def cleanup(self, data_id: int): """Cleanup."""