""".. Ignore pydocstyle D400.
===============
Slurm Connector
===============
"""
import logging
import os
import shlex
import subprocess
from django.conf import settings
from resolwe.flow.models import Data
from resolwe.storage import settings as storage_settings
from resolwe.utils import BraceMessage as __
from .base import BaseConnector
logger = logging.getLogger(__name__)
# We add this much to the memory limit to account for executor overhead,
# since the executor is running in the same environment as the process.
EXECUTOR_MEMORY_OVERHEAD = 200
[docs]class Connector(BaseConnector):
"""Slurm-based connector for job execution."""
[docs] def submit(self, data: Data, argv):
"""Run process with SLURM.
For details, see
:meth:`~resolwe.flow.managers.workload_connectors.base.BaseConnector.submit`.
"""
limits = data.get_resource_limits()
logger.debug(
__(
"Connector '{}' running for Data with id {} ({}).",
self.__class__.__module__,
data.id,
repr(argv),
)
)
# Compute target partition.
partition = getattr(settings, "FLOW_SLURM_PARTITION_DEFAULT", None)
if data.process.slug in getattr(settings, "FLOW_SLURM_PARTITION_OVERRIDES", {}):
partition = settings.FLOW_SLURM_PARTITION_OVERRIDES[data.process.slug]
try:
# Make sure the resulting file is executable on creation.
runtime_dir = storage_settings.FLOW_VOLUMES["runtime"]["config"]["path"]
script_path = os.path.join(runtime_dir, "slurm-{}.sh".format(data.pk))
file_descriptor = os.open(script_path, os.O_WRONLY | os.O_CREAT, mode=0o555)
with os.fdopen(file_descriptor, "wt") as script:
script.write("#!/bin/bash\n")
script.write(
"#SBATCH --mem={}M\n".format(
limits["memory"] + EXECUTOR_MEMORY_OVERHEAD
)
)
script.write("#SBATCH --cpus-per-task={}\n".format(limits["cores"]))
if partition:
script.write("#SBATCH --partition={}\n".format(partition))
script.write(
"#SBATCH --output slurm-url-{}-job-%j.out\n".format(
data.location.subpath
)
)
# Render the argument vector into a command line.
line = " ".join(map(shlex.quote, argv))
script.write(line + "\n")
command = ["/usr/bin/env", "sbatch", script_path]
subprocess.Popen(command, cwd=runtime_dir, stdin=subprocess.DEVNULL).wait()
except OSError as err:
logger.error(
__(
"OSError occurred while preparing SLURM script for Data {}: {}",
data.id,
err,
)
)
[docs] def cleanup(self, data_id: int):
"""Cleanup."""