"""Reslowe process model."""
from collections import ChainMap
from typing import Optional
from django.conf import settings
from django.contrib.postgres.indexes import GinIndex
from django.core.validators import RegexValidator
from django.db import models
from resolwe.permissions.models import PermissionObject
from .base import BaseModel
from .data import Data
[docs]class Process(BaseModel, PermissionObject):
"""Postgres model for storing processes."""
class Meta(BaseModel.Meta):
"""Process Meta options."""
permissions = (
("view", "Can view process"),
("share", "Can share process"),
("owner", "Is owner of the process"),
)
indexes = [
models.Index(name="idx_process_name", fields=["name"]),
models.Index(name="idx_process_slug", fields=["slug"]),
models.Index(name="idx_process_type", fields=["type"]),
GinIndex(
name="idx_process_type_trgm",
fields=["type"],
opclasses=["gin_trgm_ops"],
),
]
#: raw persistence
PERSISTENCE_RAW = "RAW"
#: cached persistence
PERSISTENCE_CACHED = "CAC"
#: temp persistence
PERSISTENCE_TEMP = "TMP"
PERSISTENCE_CHOICES = (
(PERSISTENCE_RAW, "Raw"),
(PERSISTENCE_CACHED, "Cached"),
(PERSISTENCE_TEMP, "Temp"),
)
SCHEDULING_CLASS_INTERACTIVE = "IN"
SCHEDULING_CLASS_BATCH = "BA"
SCHEDULING_CLASS_CHOICES = (
(SCHEDULING_CLASS_INTERACTIVE, "Interactive"),
(SCHEDULING_CLASS_BATCH, "Batch"),
)
#: data type
type = models.CharField(
max_length=100,
validators=[
RegexValidator(
regex=r"^data:[a-z0-9:]+:$",
message="Type may be alphanumerics separated by colon",
code="invalid_type",
)
],
)
#: category
category = models.CharField(
max_length=200,
default="Other:",
validators=[
RegexValidator(
regex=r"^([a-zA-Z0-9]+[:\-])*[a-zA-Z0-9]+:$",
message="Category may be alphanumerics separated by colon",
code="invalid_category",
)
],
)
persistence = models.CharField(
max_length=3, choices=PERSISTENCE_CHOICES, default=PERSISTENCE_RAW
)
"""
Persistence of :class:`~resolwe.flow.models.Data` objects created
with this process. It can be one of the following:
- :attr:`PERSISTENCE_RAW`
- :attr:`PERSISTENCE_CACHED`
- :attr:`PERSISTENCE_TEMP`
.. note::
If persistence is set to ``PERSISTENCE_CACHED`` or
``PERSISTENCE_TEMP``, the process must be idempotent.
"""
#: designates whether this process should be treated as active
is_active = models.BooleanField("active", default=True)
#: detailed description
description = models.TextField(default="")
#: template for name of Data object created with Process
data_name = models.CharField(max_length=200, null=True, blank=True)
input_schema = models.JSONField(blank=True, default=list)
"""
process input schema (describes input parameters, form layout **"Inputs"** for :attr:`Data.input`)
Handling:
- schema defined by: *dev*
- default by: *user*
- changable by: *none*
"""
output_schema = models.JSONField(blank=True, default=list)
"""
process output schema (describes output JSON, form layout **"Results"** for :attr:`Data.output`)
Handling:
- schema defined by: *dev*
- default by: *dev*
- changable by: *dev*
Implicitly defined fields (by
:func:`resolwe.flow.management.commands.register` or
``resolwe.flow.executors.run.BaseFlowExecutor.run`` or its
derivatives):
- ``progress`` of type ``basic:float`` (from 0.0 to 1.0)
- ``proc`` of type ``basic:group`` containing:
- ``stdout`` of type ``basic:text``
- ``rc`` of type ``basic:integer``
- ``task`` of type ``basic:string`` (celery task id)
- ``worker`` of type ``basic:string`` (celery worker hostname)
- ``runtime`` of type ``basic:string`` (runtime instance hostname)
- ``pid`` of type ``basic:integer`` (process ID)
"""
entity_type = models.CharField(max_length=100, null=True, blank=True)
"""
Automatically add :class:`~resolwe.flow.models.Data` object created
with this process to an :class:`~resolwe.flow.models.Entity` object
representing a data-flow. If all input ``Data`` objects belong to
the same entity, add newly created ``Data`` object to it, otherwise
create a new one.
"""
entity_descriptor_schema = models.CharField(max_length=100, null=True, blank=True)
"""
Slug of the descriptor schema assigned to the Entity created with
:attr:`~resolwe.flow.models.Process.entity_type`.
"""
entity_input = models.CharField(max_length=100, null=True, blank=True)
"""
Limit the entity selection in
:attr:`~resolwe.flow.models.Process.entity_type` to a single input.
"""
entity_always_create = models.BooleanField(default=False)
"""
Create new entity, regardless of ``entity_input`` or
``entity_descriptor_schema`` fields.
"""
run = models.JSONField(default=dict)
"""
process command and environment description for internal use
Handling:
- schema defined by: *dev*
- default by: *dev*
- changable by: *dev*
"""
requirements = models.JSONField(default=dict)
"""
process requirements
"""
scheduling_class = models.CharField(
max_length=2, choices=SCHEDULING_CLASS_CHOICES, default=SCHEDULING_CLASS_BATCH
)
"""
process scheduling class
"""
[docs] def get_resource_limits(self, data: Optional[Data] = None):
"""Get the core count and memory usage limits for this process.
:return: A dictionary with the resource limits, containing the
following keys:
- ``memory``: Memory usage limit, in MB. Defaults to 4096 if
not otherwise specified in the resource requirements.
- ``cores``: Core count limit. Defaults to 1.
- ``storage``: Size (in gibibytes) of temporary volume used for
processing in kubernetes. Defaults to 200.
:rtype: dict
"""
# Known resources.
resources = ["cores", "memory", "storage"]
# When no resources are defined, use fallback.
fallback = {"cores": 1, "memory": 4096, "storage": 200}
# Prepare requirements from environment.
environment_settings = getattr(settings, "FLOW_PROCESS_RESOURCE_OVERRIDES", {})
environment_resources = {
resource: environment_settings.get(resource, {}).get(self.slug)
for resource in resources
if environment_settings.get(resource, {}).get(self.slug)
}
# Gather requirements for all resources from all sources.
# The order of requirements determines their priority.
resources_map = ChainMap(
data.process_resources if data is not None else {},
environment_resources,
self.requirements.get("resources", {}),
getattr(settings, "FLOW_PROCESS_RESOURCE_DEFAULTS", {}),
fallback,
)
# Prepare the limits and cap them.
limits = dict()
for resource in resources:
limits[resource] = resources_map[resource]
cap = getattr(settings, f"FLOW_PROCESS_MAX_{resource.upper()}", None)
if cap is not None:
limits[resource] = min(limits[resource], cap)
return limits