"""Resolwe entity model."""
from typing import Any, List, Optional, Union

from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.indexes import GinIndex
from django.core.exceptions import ValidationError
from django.db import models, transaction

from resolwe.flow.models.annotations import (
from resolwe.observers.consumers import BackgroundTaskType
from resolwe.observers.decorators import move_to_container
from resolwe.observers.models import BackgroundTask
from resolwe.observers.utils import start_background_task
from resolwe.permissions.models import PermissionObject, PermissionQuerySet

from .base import BaseModel, BaseQuerySet
from .collection import BaseCollection, Collection
from .utils import DirtyError, validate_schema

class EntityQuerySet(BaseQuerySet, PermissionQuerySet):
    """Query set for ``Entity`` objects."""

    def duplicate(self, contributor) -> BackgroundTask:
        """Duplicate (make a copy) ``Entity`` objects in the background."""
        task_data = {
            "entity_ids": list(self.values_list("pk", flat=True)),
            "inherit_collection": True,
        return start_background_task(
            "Duplicate entity",

    def delete_background(self, contributor):
        """Delete the ``Entity`` objects in the background."""
        task_data = {
            "object_ids": list(self.values_list("pk", flat=True)),
            "content_type_id": ContentType.objects.get_for_model(self.model).pk,
        return start_background_task(
            BackgroundTaskType.DELETE, "Delete entities", task_data, contributor

    def move_to_collection(
        destination_collection: Collection,
        missing_annotations: HandleMissingAnnotations = HandleMissingAnnotations.ADD,
        """Move entities to destination collection."""
        for entity in self:
            entity.move_to_collection(destination_collection, missing_annotations)

    def annotate_all(self, add_labels: bool = False):
        """Annotate with all metadata on the entities.

        The returned dataset has one entry for every annotation on the entity.
        It is annotated by "annotation_value" and the "annotation_field" in the
        form of f"{group_name}.{field_name}".

        :attr add_labels: when True and vocabulary is present, the annotation
            "annotation_label" contains the value of the label.
        annotation_data = {
            "annotation_value": models.F("annotations__json__value"),
            "annotation_field": models.functions.Concat(
        if add_labels:
            annotation_data["annotation_label"] = models.functions.Coalesce(
                models.functions.Cast("annotations___value__value", models.JSONField()),
        return self.annotate(**annotation_data)

    def _prepare_annotation_data(
        path: str,
        annotation_name: Optional[str] = None,
        value_to_label: bool = False,
        outerref_entity_pk_path: str = "pk",
        group_name, field_name = path.split(".", maxsplit=1)

        subquery = AnnotationValue.objects.filter(
        if value_to_label:
            subquery = subquery.annotate(
            subquery = subquery.annotate(final_value=models.F("_value__value"))

        annotation_name = annotation_name or f"{group_name}_{field_name}"
        return {annotation_name: models.Subquery(subquery.values("final_value")[:1])}

    def annotate_path(
        paths: Union[str, List[str]],
        annotation_name: Optional[str] = None,
        """Add annotation to the Entity QuerySet.

        The annotation is the value of the field with the given name (in the
        given group).

        :attr paths: the path in the form 'group_name.field_name' or list of
            such paths.
        :attr field_name: the name of the annotation field.
        :attr annotation_name: the name under which annotation will be stored.
            When empty the name f"{group_name}_{field_name}" will be used. The
            annotation_name can only be used when path is a single string.
        :attr value_to_label: optionally annotate with label instead of the
            value. Only applicable when the vocabulary on the field is given.
        if isinstance(paths, str):
            paths = [paths]
        queryset = self
        for path in paths:
            queryset = queryset.annotate(
                **self._prepare_annotation_data(path, annotation_name, value_to_label)
        return queryset

[docs]class Entity(BaseCollection, PermissionObject): """Postgres model for storing entities.""" class Meta(BaseCollection.Meta): """Entity Meta options.""" permissions = ( ("view", "Can view entity"), ("edit", "Can edit entity"), ("share", "Can share entity"), ("owner", "Is owner of the entity"), ) indexes = [ models.Index(name="idx_entity_name", fields=["name"]), GinIndex( name="idx_entity_name_trgm", fields=["name"], opclasses=["gin_trgm_ops"] ), models.Index(name="idx_entity_slug", fields=["slug"]), GinIndex(name="idx_entity_tags", fields=["tags"]), GinIndex(name="idx_entity_search", fields=["search"]), ] #: manager objects = EntityQuerySet.as_manager() #: collection to which entity belongs collection = models.ForeignKey( "Collection", blank=True, null=True, on_delete=models.CASCADE ) #: entity type type = models.CharField(max_length=100, db_index=True, null=True, blank=True) #: duplication date and time duplicated = models.DateTimeField(blank=True, null=True)
[docs] def get_annotation(self, path: str, default: Any = None) -> Any: """Get the annotation for the given path. :attr path: the path to the annotation in the format 'group.field'. :attr default: default value when annotation is not found. :return: value of the annotation or default if not found. """ group_name, field_name = path.split(".", maxsplit=1) annotation: AnnotationValue = self.annotations.filter( field__group__name=group_name, field__name=field_name ).first() return annotation.value if annotation else default
[docs] def set_annotation(self, path: str, value: Any): """Get the annotation for the given path. :attr path: the path to the annotation in the format 'group.field'. :attr value: the annotation value. """ field_id = AnnotationField.id_from_path(path) if field_id is not None: AnnotationValue.objects.create(field_id=field_id, entity=self, value=value) else: raise RuntimeError(f"No annotation field for path {path}.")
[docs] def is_duplicate(self): """Return True if entity is a duplicate.""" return bool(self.duplicated)
[docs] def duplicate(self, contributor) -> BackgroundTask: """Duplicate (make a copy) object in the background.""" task_data = {"entity_ids": [], "inherit_collection": True} return start_background_task( BackgroundTaskType.DUPLICATE_ENTITY, "Duplicate entity", task_data, contributor, )
[docs] def delete_background(self): """Delete the object in the background.""" task_data = { "object_ids": [], "content_type_id": ContentType.objects.get_for_model(self).pk, } return start_background_task( BackgroundTaskType.DELETE, "Delete entitiy", task_data, self.contributor )
@transaction.atomic @move_to_container def move_to_collection( self, collection: Collection, handle_missing_annotations: HandleMissingAnnotations = HandleMissingAnnotations.ADD, ): """Move entity from the source to the destination collection. :args collection: the collection to move entity into. :args handle_missing_annotations: how to handle missing annotations fields in the new collection :raises ValidationError: when collection is set to None. """ if self.collection == collection: return if collection is None: raise ValidationError( f"Entity {self}({}) can only be moved to another container." ) missing_annotations = AnnotationField.objects.none() if self.collection is not None: missing_annotations = self.collection.annotation_fields.exclude( pk__in=collection.annotation_fields.values_list("pk", flat=True) ) if missing_annotations: if handle_missing_annotations == HandleMissingAnnotations.ADD: collection.annotation_fields.add(*missing_annotations) elif handle_missing_annotations == HandleMissingAnnotations.REMOVE: self.annotations.filter(field__in=missing_annotations).delete() self.collection = collection self.tags = collection.tags self.permission_group = collection.permission_group["collection", "tags", "permission_group"])
[docs] def invalid_annotation_fields(self, annotation_fields=None): """Get the Queryset of invalid annotation fields. The invalid annotation field is a field that has annotatiton but it is not allowed in the collection this entity belongs to. :attr annotation_fields: the iterable containing annotations fields to be checked. When None is given the annotation fields belonging to the entity are checked. """ annotation_fields = annotation_fields or self.collection.annotation_fields return AnnotationField.objects.filter(values=self.annotations).exclude( collection=self.collection )
[docs] def copy_annotations(self, destination: "Entity") -> List[AnnotationValue]: """Copy annotation from this entity to the destination. :raises ValidationError: when some of the annotation fields are missing on the destination entity. """ # Entity without collection can not have annotations. if self.collection is None: return [] if destination.collection is None: raise ValidationError( f"The entity '{destination.slug}' must belong to the collection." ) destination_annotations = [] annotation_field_ids = set() # Create AnnotationValue objects from annotations and collect the set of # annatiton field ids for them. This set must also be available on the # destination entity, else exception is raised. for source_annotation in self.annotations.all(): annotation_value = AnnotationValue( entity=destination, field_id=source_annotation.field_id, value=source_annotation.value, ) destination_annotations.append(annotation_value) annotation_field_ids.add(source_annotation.field_id) destination_fields_qs = destination.collection.annotation_fields.filter( pk__in=annotation_field_ids ) if destination_fields_qs.count() < len(annotation_field_ids): missing_annotation_field_ids = annotation_field_ids - set( destination_fields_qs.values_list("pk", flat=True) ) missing_fields_qs = AnnotationField.objects.filter( pk__in=missing_annotation_field_ids ).values_list("group__name", "name") missing_field_paths = [".".join(entry) for entry in missing_fields_qs] missing_fields = ", ".join(missing_field_paths) raise ValidationError( f"The collection of entity '{destination}' is missing annotation fields {missing_fields}." ) return destination_annotations
[docs] def validate_annotations(self): """Perform streamlined descriptor validation. :raises ValidationError: when annotations do not pass validation. All fields are validated and error messages aggregated into single exception. """ invalid_annotation_fields = self.invalid_annotation_fields() if invalid_annotation_fields: msg = ",".join(field.label for field in invalid_annotation_fields) collection_id = getattr(self, "collection_id", None) raise ValidationError( f"The entity '{}' is annotated with fields '{msg}' " f"that are not allowed on the collection '{collection_id}'." ) # Validate all the annotation values on this entity. validation_errors = [] for annotation in self.annotations: try: annotation.validate() except ValidationError as validation_error: validation_errors.append(validation_error) if validation_errors: raise ValidationError(validation_errors)
[docs] def update_annotations(self, annotations: dict[str, Any], update=True): """Update annotations with the given values. When annotation value is set no None it is deleted. :attr annotations: the dictionary with annotation values. Keys are annotation paths. """ field_paths = annotations.keys() field_map = { field_path: AnnotationField.field_from_path(field_path) for field_path in field_paths } # Create annotation values and prepare list of fields which annotations should # be deleted. annotation_values: list[AnnotationValue] = [] validation_errors: list[ValidationError] = [] fields_to_delete: list[int] = [] for field_path, field_value in annotations.items(): if field_value is None: fields_to_delete.append(field_map[field_path].pk) else: value = AnnotationValue( field=field_map[field_path], value=field_value, entity=self ) annotation_values.append(value) try: value.validate() except ValidationError as e: validation_errors.append(e) if validation_errors: raise ValidationError(validation_errors) # Delete and update annotations in a transaction. with transaction.atomic(): to_delete = AnnotationValue.objects.filter(entity=self) if update: to_delete = to_delete.filter(field_id__in=fields_to_delete) to_delete.delete() AnnotationValue.objects.bulk_create( annotation_values, update_conflicts=True, update_fields=["_value"], unique_fields=["entity", "field"], )
[docs]class RelationType(models.Model): """Model for storing relation types.""" #: relation type name name = models.CharField(max_length=100, unique=True) #: indicates if order of entities in relation is important or not ordered = models.BooleanField(default=False) def __str__(self): """Format the object representation.""" return
[docs]class Relation(BaseModel, PermissionObject): """Relations between entities. The Relation model defines the associations and dependencies between entities in a given collection: .. code-block:: json { "collection": "<collection_id>", "type": "comparison", "category": "case-control study", "entities": [ {"enetity": "<entity1_id>", "label": "control"}, {"enetity": "<entity2_id>", "label": "case"}, {"enetity": "<entity3_id>", "label": "case"} ] } Relation ``type`` defines a specific set of associations among entities. It can be something like ``group``, ``comparison`` or ``series``. The relation ``type`` is an instance of :class:`.RelationType` and should be defined in any Django app that uses relations (e.g., as a fixture). Multiple relations of the same type are allowed on the collection. Relation ``category`` defines a specific use case. The relation category must be unique in a collection, so that users can distinguish between different relations. In the example above, we could add another ``comparison`` relation of ``category``, say ``Case-case study`` to compare ``<entity2>`` with ``<entity3>``. Relation is linked to :class:`resolwe.flow.models.Collection` to enable defining different relations structures in different collections. This also greatly speed up retrieving of relations, as they are envisioned to be mainly used on a collection level. ``unit`` defines units used in partitions where it is applicable, e.g. in relations of type ``series``. """ UNIT_SECOND = "s" UNIT_MINUTE = "min" UNIT_HOUR = "hr" UNIT_DAY = "d" UNIT_WEEK = "wk" UNIT_CHOICES = ( (UNIT_SECOND, "Second"), (UNIT_MINUTE, "Minute"), (UNIT_HOUR, "Hour"), (UNIT_DAY, "Day"), (UNIT_WEEK, "Week"), ) #: type of the relation type = models.ForeignKey("RelationType", on_delete=models.PROTECT) #: collection to which relation belongs collection = models.ForeignKey("Collection", on_delete=models.CASCADE) #: partitions of entities in the relation entities = models.ManyToManyField(Entity, through="RelationPartition") #: category of the relation category = models.CharField(max_length=100) #: unit used in the partitions' positions (where applicable, e.g. for serieses) unit = models.CharField(max_length=3, choices=UNIT_CHOICES, null=True, blank=True) #: relation descriptor schema descriptor_schema = models.ForeignKey( "flow.DescriptorSchema", blank=True, null=True, on_delete=models.PROTECT ) #: relation descriptor descriptor = models.JSONField(default=dict) #: indicate whether `descriptor` doesn't match `descriptor_schema` (is dirty) descriptor_dirty = models.BooleanField(default=False) #: custom manager with permission filtering methods objects = PermissionQuerySet.as_manager() class Meta(BaseModel.Meta): """Relation Meta options.""" unique_together = ("collection", "category") permissions = ( ("view", "Can view relation"), ("edit", "Can edit relation"), ("share", "Can share relation"), ("owner", "Is owner of the relation"), )
[docs] def save(self, *args, **kwargs): """Perform descriptor validation and save object.""" if self.descriptor_schema: try: validate_schema(self.descriptor, self.descriptor_schema.schema) self.descriptor_dirty = False except DirtyError: self.descriptor_dirty = True elif self.descriptor and self.descriptor != {}: raise ValueError( "`descriptor_schema` must be defined if `descriptor` is given" ) super().save()
class RelationPartition(models.Model): """Intermediate model to define Relations between Entities.""" #: entity in the relation entity = models.ForeignKey(Entity, on_delete=models.CASCADE) #: relation to which entity belongs relation = models.ForeignKey(Relation, on_delete=models.CASCADE) #: number used for ordering entities in the relation position = models.PositiveSmallIntegerField(null=True, blank=True, db_index=True) #: Human-readable label of the position label = models.CharField(max_length=30, null=True, blank=True, db_index=True) class Meta: """RelationPartition Meta options.""" unique_together = ("entity", "relation")