Source code for resolwe.flow.models.data

"""Reslowe process model."""
import copy
import enum
import json
import logging
from typing import Any, Optional, Union

import jsonschema

from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.fields import ArrayField
from django.contrib.postgres.indexes import GinIndex
from django.contrib.postgres.search import SearchVectorField
from django.core.exceptions import PermissionDenied, ValidationError
from django.core.validators import RegexValidator
from django.db import models, transaction

from resolwe.flow.expression_engines.exceptions import EvaluationError
from resolwe.flow.models.utils import (
    DirtyError,
    fill_with_defaults,
    validate_schema,
    validation_schema,
)
from resolwe.flow.utils import dict_dot, get_data_checksum, iterate_fields
from resolwe.observers.consumers import BackgroundTaskType
from resolwe.observers.decorators import move_to_container
from resolwe.observers.models import BackgroundTask
from resolwe.observers.utils import start_background_task
from resolwe.permissions.models import PermissionObject, PermissionQuerySet
from resolwe.permissions.utils import assign_contributor_permissions, copy_permissions

from .base import BaseModel, BaseQuerySet
from .descriptor import DescriptorSchema
from .entity import Entity, EntityQuerySet
from .history_manager import HistoryMixin
from .secret import Secret
from .storage import Storage
from .utils import (
    hydrate_input_references,
    hydrate_size,
    render_descriptor,
    render_template,
)
from .worker import Worker

# Compatibility for Python < 3.5.
if not hasattr(json, "JSONDecodeError"):
    json.JSONDecodeError = ValueError

logger = logging.getLogger(__name__)


@enum.unique
class HandleEntityOperation(enum.Enum):
    """Constants for entity handling."""

    CREATE = "CREATE"
    ADD = "ADD"
    PASS = "PASS"


class DataQuerySet(BaseQuerySet, PermissionQuerySet):
    """Query set for Data objects."""

    @staticmethod
    def _handle_entity(obj: "Data"):
        """Create entity if `entity.type` is defined in process.

        Following rules applies for adding `Data` object to `Entity`:
        * Only add `Data object` to `Entity` if process has defined
        `entity.type` field
        * Create new entity if parents do not belong to any `Entity`
        * Add object to existing `Entity`, if all parents that are part
        of it (but not necessary all parents), are part of the same
        `Entity`
        * If parents belong to different `Entities` don't do anything

        """
        entity_type = obj.process.entity_type
        entity_descriptor_schema = obj.process.entity_descriptor_schema
        entity_input = obj.process.entity_input
        entity_always_create = obj.process.entity_always_create
        operation = HandleEntityOperation.PASS

        if entity_type:
            data_filter: dict[str, Union[list, int]] = {}
            if entity_input:
                input_id = dict_dot(obj.input, entity_input, default=lambda: None)
                if input_id is None:
                    logger.warning("Skipping creation of entity due to missing input.")
                    return
                if isinstance(input_id, int):
                    data_filter["data__pk"] = input_id
                elif isinstance(input_id, list):
                    data_filter["data__pk__in"] = input_id
                else:
                    raise ValueError(
                        "Cannot create entity due to invalid value of field {}.".format(
                            entity_input
                        )
                    )
            else:
                data_filter["data__in"] = obj.parents.all()

            entity_query = Entity.objects.filter(
                type=entity_type, **data_filter
            ).distinct()
            entity_count = entity_query.count()

            if entity_count == 0 or entity_always_create:
                descriptor_schema = DescriptorSchema.objects.filter(
                    slug=entity_descriptor_schema
                ).latest()
                entity = Entity.objects.create(
                    contributor=obj.contributor,
                    descriptor_schema=descriptor_schema,
                    type=entity_type,
                    name=obj.name,
                    tags=obj.tags,
                )
                assign_contributor_permissions(entity)
                obj.move_to_entity(entity)
                operation = HandleEntityOperation.CREATE

            elif entity_count == 1:
                entity = entity_query.first()
                operation = HandleEntityOperation.ADD
                obj.move_to_entity(entity)

            else:
                logger.info(
                    "Skipping creation of entity due to multiple entities found."
                )

            return operation

    @staticmethod
    def _handle_collection(obj: "Data", entity_operation: HandleEntityOperation):
        """Correctly assign Collection to Data and it's Entity.

        There are 2 x 4 possible scenarios how to handle collection
        assignment. One dimension in "decision matrix" is Data.collection:

            1.x Data.collection = None
            2.x Data.collection != None

        Second dimension is about Data.entity:

            x.1 Data.entity is None
            x.2 Data.entity was just created
            x.3 Data.entity already exists and Data.entity.collection = None
            x.4 Data.entity already exists and Data.entity.collection != None
        """
        # 1.2 and 1.3 require no action.

        # 1.1 and 2.1:
        if not obj.entity:
            return
        if entity_operation == HandleEntityOperation.ADD and obj.collection:
            # 2.3
            if not obj.entity.collection:
                raise ValueError(
                    "Created Data has collection {} assigned, but it is added to entity {} that is not "
                    "inside this collection.".format(obj.collection, obj.entity)
                )
            # 2.4
            assert obj.collection == obj.entity.collection

        # 1.4
        if not obj.collection and obj.entity and obj.entity.collection:
            obj.move_to_collection(obj.entity.collection)
        # 2.2
        if entity_operation == HandleEntityOperation.CREATE and obj.collection:
            obj.entity.move_to_collection(obj.collection)

    @transaction.atomic
    def create(self, subprocess_parent=None, **kwargs):
        """Create new object with the given kwargs."""
        obj = super().create(**kwargs)

        # Data dependencies
        obj.save_dependencies(obj.input, obj.process.input_schema)
        if subprocess_parent:
            DataDependency.objects.create(
                parent=subprocess_parent,
                child=obj,
                kind=DataDependency.KIND_SUBPROCESS,
            )
            # Data was from a workflow / spawned process
            if not obj.in_container():
                copy_permissions(subprocess_parent, obj)

        # Entity, Collection assignment
        entity_operation = self._handle_entity(obj)
        self._handle_collection(obj, entity_operation=entity_operation)

        # Assign contributor permission only if Data is not in the container.
        if not obj.in_container():
            assign_contributor_permissions(obj)

        return obj

    def duplicate(self, contributor) -> BackgroundTask:
        """Duplicate (make a copy) ``Data`` objects in the background."""
        task_data = {
            "data_ids": list(self.values_list("pk", flat=True)),
            "inherit_entity": True,
            "inherit_collection": True,
        }
        return start_background_task(
            BackgroundTaskType.DUPLICATE_DATA, "Duplicate data", task_data, contributor
        )

    def delete_background(self, contributor):
        """Delete the ``Data`` objects in the background."""
        task_data = {
            "object_ids": list(self.values_list("pk", flat=True)),
            "content_type_id": ContentType.objects.get_for_model(self.model).pk,
        }
        return start_background_task(
            BackgroundTaskType.DELETE, "Delete data", task_data, contributor
        )

    @transaction.atomic
    def move_to_collection(self, destination_collection):
        """Move data objects to destination collection.

        Note that this method will also copy tags and permissions
        of the destination collection to the data objects.
        """
        for data in self:
            data.move_to_collection(destination_collection)

    def annotate_sample_path(self, path, annotation_name, value_to_label=False):
        """Add annotation to the Entity QuerySet.

        The annotation is the value of the field with the given name (in the
        given group).

        :attr group_name: the name of the group annotation field belongs to.
        :attr field_name: the name of the annotation field.
        :attr annotation_name: the name under which annotation will be stored.
            When empty the name f"{group_name}_{field_name}" will be used.
        :attr value_to_label: optionally annotate with label instead of the
            value. Only applicable when the vocabulary on the field is given.
        """
        annotation_data = EntityQuerySet._prepare_annotation_data(
            path,
            annotation_name,
            value_to_label,
            outerref_entity_pk_path="entity_id",
        )
        return self.annotate(**annotation_data)


[docs]class Data(HistoryMixin, BaseModel, PermissionObject): """Postgres model for storing data.""" class Meta(BaseModel.Meta): """Data Meta options.""" permissions = ( ("view", "Can view data"), ("edit", "Can edit data"), ("share", "Can share data"), ("owner", "Is owner of the data"), ) indexes = [ models.Index(name="idx_data_name", fields=["name"]), GinIndex( name="idx_data_name_trgm", fields=["name"], opclasses=["gin_trgm_ops"] ), models.Index(name="idx_data_slug", fields=["slug"]), models.Index(name="idx_data_status", fields=["status"]), GinIndex(name="idx_data_tags", fields=["tags"]), GinIndex(name="idx_data_search", fields=["search"]), ] #: data object is uploading STATUS_UPLOADING = "UP" #: data object is being resolved STATUS_RESOLVING = "RE" #: data object is waiting STATUS_WAITING = "WT" #: data object is preparing STATUS_PREPARING = "PP" #: data object is processing STATUS_PROCESSING = "PR" #: data object is done STATUS_DONE = "OK" #: data object is in error state STATUS_ERROR = "ER" #: data object is in dirty state STATUS_DIRTY = "DR" # Assumption (in listener): ordered from least to most problematic. STATUS_CHOICES = ( (STATUS_UPLOADING, "Uploading"), (STATUS_RESOLVING, "Resolving"), (STATUS_WAITING, "Waiting"), (STATUS_PREPARING, "Preparing"), (STATUS_PROCESSING, "Processing"), (STATUS_DONE, "Done"), (STATUS_ERROR, "Error"), (STATUS_DIRTY, "Dirty"), ) #: manager objects = DataQuerySet.as_manager() #: date and time when process was dispatched to the scheduling system #: (set by``resolwe.flow.managers.dispatcher.Manager.run`` scheduled = models.DateTimeField(blank=True, null=True, db_index=True) #: process started date and time (set by #: ``resolwe.flow.executors.run.BaseFlowExecutor.run`` or its derivatives) started = models.DateTimeField(blank=True, null=True, db_index=True) #: process finished date date and time (set by #: ``resolwe.flow.executors.run.BaseFlowExecutor.run`` or its derivatives) finished = models.DateTimeField(blank=True, null=True, db_index=True) #: duplication date and time duplicated = models.DateTimeField(blank=True, null=True) #: checksum field calculated on inputs checksum = models.CharField( max_length=64, db_index=True, validators=[ RegexValidator( regex=r"^[0-9a-f]{64}$", message="Checksum is exactly 40 alphanumerics", code="invalid_checksum", ) ], ) status = models.CharField( max_length=2, choices=STATUS_CHOICES, default=STATUS_RESOLVING ) """ :class:`Data` status It can be one of the following: - :attr:`STATUS_UPLOADING` - :attr:`STATUS_RESOLVING` - :attr:`STATUS_WAITING` - :attr:`STATUS_PROCESSING` - :attr:`STATUS_DONE` - :attr:`STATUS_ERROR` """ #: process used to compute the data object process = models.ForeignKey("Process", on_delete=models.PROTECT) #: process id process_pid = models.PositiveIntegerField(blank=True, null=True) #: progress process_progress = models.PositiveSmallIntegerField(default=0) #: return code process_rc = models.SmallIntegerField(blank=True, null=True) #: info log message process_info = ArrayField(models.CharField(max_length=255), default=list) #: warning log message process_warning = ArrayField(models.CharField(max_length=255), default=list) #: error log message process_error = ArrayField(models.CharField(max_length=255), default=list) #: actual inputs used by the process input = models.JSONField(default=dict) #: actual outputs of the process output = models.JSONField(default=dict) #: total size of data's outputs in bytes size = models.BigIntegerField() #: data descriptor schema descriptor_schema = models.ForeignKey( "DescriptorSchema", blank=True, null=True, on_delete=models.PROTECT ) #: actual descriptor descriptor = models.JSONField(default=dict) #: indicate whether `descriptor` doesn't match `descriptor_schema` (is dirty) descriptor_dirty = models.BooleanField(default=False) #: track if user set the data name explicitly named_by_user = models.BooleanField(default=False) #: dependencies between data objects parents = models.ManyToManyField( "self", through="DataDependency", symmetrical=False, related_name="children" ) #: tags for categorizing objects tags = ArrayField(models.CharField(max_length=255), default=list) #: actual allocated memory process_memory = models.PositiveIntegerField(default=0) #: actual allocated cores process_cores = models.PositiveSmallIntegerField(default=0) #: data location location = models.ForeignKey( "storage.FileStorage", blank=True, null=True, on_delete=models.PROTECT, related_name="data", ) #: entity entity = models.ForeignKey( "Entity", blank=True, null=True, on_delete=models.CASCADE, related_name="data" ) #: collection collection = models.ForeignKey( "Collection", blank=True, null=True, on_delete=models.CASCADE, related_name="data", ) #: field used for full-text search search = SearchVectorField(null=True) #: process requirements overrides process_resources = models.JSONField(default=dict) def __init__(self, *args, **kwargs): """Initialize attributes.""" super().__init__(*args, **kwargs) self._original_name = self.name self._original_output = self.output
[docs] def resolve_secrets(self): """Retrieve handles for all basic:secret: fields on input. The process must have the ``secrets`` resource requirement specified in order to access any secrets. Otherwise this method will raise a ``PermissionDenied`` exception. :return: A dictionary of secrets where key is the secret handle and value is the secret value. """ secrets = {} for field_schema, fields in iterate_fields( self.input, self.process.input_schema ): if not field_schema.get("type", "").startswith("basic:secret:"): continue name = field_schema["name"] value = fields[name] try: handle = value["handle"] except KeyError: continue try: secrets[handle] = Secret.objects.get_secret( handle, contributor=self.contributor ) except Secret.DoesNotExist: raise PermissionDenied( "Access to secret not allowed or secret does not exist" ) # If the process does not not have the right requirements it is not # allowed to access any secrets. allowed = self.process.requirements.get("resources", {}).get("secrets", False) if secrets and not allowed: raise PermissionDenied( "Process '{}' has secret inputs, but no permission to see secrets".format( self.process.slug ) ) return secrets
[docs] def save_dependencies(self, instance, schema): """Save data: and list:data: references as parents.""" def add_dependency(value): """Add parent Data dependency.""" try: DataDependency.objects.update_or_create( parent=Data.objects.get(pk=value), child=self, defaults={"kind": DataDependency.KIND_IO}, ) except Data.DoesNotExist: pass for field_schema, fields in iterate_fields(instance, schema): name = field_schema["name"] value = fields[name] if field_schema.get("type", "").startswith("data:"): add_dependency(value) elif field_schema.get("type", "").startswith("list:data:"): for data in value: add_dependency(data)
[docs] def save(self, render_name=False, *args, **kwargs): """Save the data model.""" if self.name != self._original_name: self.named_by_user = True try: jsonschema.validate( self.process_resources, validation_schema("process_resources") ) except jsonschema.exceptions.ValidationError as exception: # Re-raise as Django ValidationError raise ValidationError(exception.message) create = self.pk is None if create: fill_with_defaults(self.input, self.process.input_schema) if not self.name: self._render_name() else: self.named_by_user = True self.checksum = get_data_checksum( self.input, self.process.slug, self.process.version ) validate_schema(self.input, self.process.input_schema) hydrate_size(self) # If only specified fields are updated (e.g. in executor), size needs to be added if "update_fields" in kwargs: kwargs["update_fields"].append("size") elif render_name: self._render_name() render_descriptor(self) if self.descriptor_schema: try: validate_schema(self.descriptor, self.descriptor_schema.schema) self.descriptor_dirty = False except DirtyError: self.descriptor_dirty = True elif self.descriptor and self.descriptor != {}: raise ValueError( "`descriptor_schema` must be defined if `descriptor` is given" ) with transaction.atomic(): self._perform_save(*args, **kwargs) self._original_output = self.output
def _perform_save(self, *args, **kwargs): """Save the data model.""" super().save(*args, **kwargs)
[docs] def delete(self, *args, **kwargs): """Delete the data model.""" if hasattr(self, "worker"): if self.worker.status not in Worker.FINAL_STATUSES: self.worker.terminate() # Store ids in memory as relations are also deleted with the Data object. storage_ids = list(self.storages.values_list("pk", flat=True)) super().delete(*args, **kwargs) Storage.objects.filter(pk__in=storage_ids, data=None).delete()
[docs] def delete_background(self): """Delete the object in the background.""" task_data = { "object_ids": [self.pk], "content_type_id": ContentType.objects.get_for_model(self).pk, } return start_background_task( BackgroundTaskType.DELETE, "Delete data", task_data, self.contributor )
[docs] def is_duplicate(self): """Return True if data object is a duplicate.""" return bool(self.duplicated)
[docs] def duplicate(self, contributor) -> BackgroundTask: """Duplicate (make a copy) object in the background.""" task_data = { "data_ids": [self.pk], "inherit_entity": True, "inherit_collection": True, } return start_background_task( BackgroundTaskType.DUPLICATE_DATA, "Duplicate data", task_data, contributor )
@move_to_container def move_to_collection(self, collection): """Move data object to collection.""" self.validate_change_collection(collection) self.collection = collection self.permission_group = collection.permission_group if collection: self.tags = collection.tags self.save(update_fields=["tags", "permission_group", "collection"]) @move_to_container def move_to_entity(self, entity): """Move data object to entity.""" if entity is None and self.in_container(): raise ValidationError("Data object must not be removed from container.") self.entity = entity if entity: if entity.collection: self.move_to_collection(entity.collection) else: self.permission_group = entity.permission_group self.tags = entity.tags self.save(update_fields=["permission_group", "entity", "tags"])
[docs] def validate_change_collection(self, collection): """Raise validation error if data object cannot change collection.""" if self.entity and self.entity.collection != collection: raise ValidationError( "If Data is in entity, you can only move it to another collection " "by moving entire entity." ) if collection is None: raise ValidationError("Data object can not be removed from the container.")
def _render_name(self): """Render data name. The rendering is based on name template (`process.data_name`) and input context. """ if not self.process.data_name or self.named_by_user: return inputs = copy.deepcopy(self.input) hydrate_input_references( inputs, self.process.input_schema, hydrate_values=False ) template_context = inputs try: name = render_template( self.process, self.process.data_name, template_context ) except EvaluationError: name = "?" self.name = name
[docs] def get_resource_limits(self): """Return the resource limits for this data.""" return self.process.get_resource_limits(self)
[docs] def restart(self, resource_overrides: dict = {}): """Restart the data object and all its children. The status of the data object must be ERROR and stasus of its dependencies must be DONE. :param resource_overrides: dictionary mapping ids of data objects to resource overrides. :raises RuntimeError: if the object is not in the right state. :raises RuntimeError: when object dependencies are not in the status DONE. """ def reset_data(data: Data): """Prepare the data object for another processing.""" assert data.status == Data.STATUS_ERROR reset_dict: dict[str, Any] = { "started": None, "finished": None, "status": Data.STATUS_RESOLVING, "process_info": [], "process_warning": [], "process_error": [], "size": 0, "process_progress": 0, "process_rc": None, "process_pid": None, "descriptor": {}, "descriptor_dirty": False, "duplicated": None, } for attribute, value in reset_dict.items(): setattr(data, attribute, value) if hasattr(data, "worker"): # The worker can be saved here as it is not observable. data.worker.status = Worker.STATUS_PREPARING data.worker.save() # Avoid circular dependencies. from resolwe.flow.signals import commit_signal # Sanity check: only data in status ERROR can be restarted. if self.status != Data.STATUS_ERROR: raise RuntimeError( f"Only data in status {Data.STATUS_ERROR} can be restarted, not {self.status}." ) # Sanity check: all dependencies must be in status DONE. if self.dependency_status() != Data.STATUS_DONE: raise RuntimeError( f"Data dependencies must have status '{Data.STATUS_DONE}'." ) with transaction.atomic(): # When data object is restarted we also need to restart all its children. # We have to clear them all and reset their statuses to be restarted. dependencies = Data.objects.filter( parents_dependency__parent=self, parents_dependency__kind__in=[DataDependency.KIND_IO], ) # Subprocess dependencies must be deleted, they will be recreated. Data.objects.filter( parents_dependency__parent=self, parents_dependency__kind__in=[DataDependency.KIND_SUBPROCESS], ).delete() # Construct the map id -> data. to_process = {data.id: data for data in dependencies} to_process[self.pk] = self # Prevent circular import. from resolwe.flow.managers.listener.listener import cache_manager # Clear the Redis cache for objects to be restarted. for data_id in to_process.keys(): cache_manager.clear(Data, (data_id,)) # Evaluate lazy generator by listing it. list(map(reset_data, to_process.values())) # Set the overrides. for data_pk, override in resource_overrides.items(): if int(data_pk) in to_process: to_process[int(data_pk)].process_resources = override # Save the data objects. This will send out notifications. for data in to_process.values(): data.save() # Start processing the duplicate. commit_signal(self, False, update_fields=None)
[docs] def dependency_status(self) -> Optional[str]: """Return abstracted status of instance IO dependencies. :returns: - ``STATUS_ERROR`` .. one dependency has error status or was deleted - ``STATUS_DONE`` .. all dependencies have done status - ``None`` .. other """ parents_statuses = set( DataDependency.objects.filter(child=self, kind=DataDependency.KIND_IO) .distinct("parent__status") .values_list("parent__status", flat=True) ) if not parents_statuses: return Data.STATUS_DONE if None in parents_statuses: # Some parents have been deleted. return Data.STATUS_ERROR if Data.STATUS_ERROR in parents_statuses: return Data.STATUS_ERROR if len(parents_statuses) == 1 and Data.STATUS_DONE in parents_statuses: return Data.STATUS_DONE return None
[docs]class DataDependency(models.Model): """Dependency relation between data objects.""" #: child uses parent's output as its input KIND_IO = "io" #: child was spawned by the parent KIND_SUBPROCESS = "subprocess" KIND_DUPLICATE = "duplicate" KIND_CHOICES = ( (KIND_IO, "Input/output dependency"), (KIND_SUBPROCESS, "Subprocess"), (KIND_DUPLICATE, "Duplicate"), ) #: child data object child = models.ForeignKey( Data, on_delete=models.CASCADE, related_name="parents_dependency" ) #: parent data object parent = models.ForeignKey( Data, null=True, on_delete=models.SET_NULL, related_name="children_dependency" ) #: kind of dependency kind = models.CharField(max_length=16, choices=KIND_CHOICES)