"""Resolwe entity model."""
from typing import Any, List, Optional, Union
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.indexes import GinIndex
from django.core.exceptions import ValidationError
from django.db import models, transaction
from resolwe.flow.models.annotations import (
AnnotationField,
AnnotationValue,
HandleMissingAnnotations,
)
from resolwe.observers.consumers import BackgroundTaskType
from resolwe.observers.decorators import move_to_container
from resolwe.observers.models import BackgroundTask
from resolwe.observers.utils import start_background_task
from resolwe.permissions.models import PermissionObject, PermissionQuerySet
from .base import BaseModel, BaseQuerySet
from .collection import BaseCollection, Collection
from .utils import DirtyError, validate_schema
class EntityQuerySet(BaseQuerySet, PermissionQuerySet):
"""Query set for ``Entity`` objects."""
def duplicate(self, contributor) -> BackgroundTask:
"""Duplicate (make a copy) ``Entity`` objects in the background."""
task_data = {
"entity_ids": list(self.values_list("pk", flat=True)),
"inherit_collection": True,
}
return start_background_task(
BackgroundTaskType.DUPLICATE_ENTITY,
"Duplicate entity",
task_data,
contributor,
)
def delete_background(self, contributor):
"""Delete the ``Entity`` objects in the background."""
task_data = {
"object_ids": list(self.values_list("pk", flat=True)),
"content_type_id": ContentType.objects.get_for_model(self.model).pk,
}
return start_background_task(
BackgroundTaskType.DELETE, "Delete entities", task_data, contributor
)
@transaction.atomic
def move_to_collection(
self,
destination_collection: Collection,
missing_annotations: HandleMissingAnnotations = HandleMissingAnnotations.ADD,
):
"""Move entities to destination collection."""
for entity in self:
entity.move_to_collection(destination_collection, missing_annotations)
def annotate_all(self, add_labels: bool = False):
"""Annotate with all metadata on the entities.
The returned dataset has one entry for every annotation on the entity.
It is annotated by "annotation_value" and the "annotation_field" in the
form of f"{group_name}.{field_name}".
:attr add_labels: when True and vocabulary is present, the annotation
"annotation_label" contains the value of the label.
"""
annotation_data = {
"annotation_value": models.F("annotations__json__value"),
"annotation_field": models.functions.Concat(
models.F("annotations__field__group__name"),
models.Value("."),
models.F("annotations__field__name"),
),
}
if add_labels:
annotation_data["annotation_label"] = models.functions.Coalesce(
models.Func(
models.F("annotations__field__vocabulary"),
models.Func(
models.F("annotations__json"),
models.Value("value"),
function="jsonb_extract_path_text",
),
function="jsonb_extract_path",
output_field=models.JSONField(),
),
models.functions.Cast("annotations___value__value", models.JSONField()),
)
return self.annotate(**annotation_data)
@classmethod
def _prepare_annotation_data(
cls,
path: str,
annotation_name: Optional[str] = None,
value_to_label: bool = False,
outerref_entity_pk_path: str = "pk",
):
group_name, field_name = path.split(".", maxsplit=1)
subquery = AnnotationValue.objects.filter(
field__name=field_name,
field__group__name=group_name,
entity_id=models.OuterRef(outerref_entity_pk_path),
)
if value_to_label:
subquery = subquery.annotate(
final_value=models.functions.Coalesce(
models.Func(
models.F("field__vocabulary"),
models.Func(
models.F("_value"),
models.Value("value"),
function="jsonb_extract_path_text",
),
function="jsonb_extract_path_text",
output_field=models.JSONField(),
),
models.Func(
models.F("_value"),
models.Value("value"),
function="jsonb_extract_path_text",
output_field=models.JSONField(),
),
)
)
else:
subquery = subquery.annotate(final_value=models.F("_value__value"))
annotation_name = annotation_name or f"{group_name}_{field_name}"
return {annotation_name: models.Subquery(subquery.values("final_value")[:1])}
def annotate_path(
self,
paths: Union[str, List[str]],
annotation_name: Optional[str] = None,
value_to_label=False,
):
"""Add annotation to the Entity QuerySet.
The annotation is the value of the field with the given name (in the
given group).
:attr paths: the path in the form 'group_name.field_name' or list of
such paths.
:attr field_name: the name of the annotation field.
:attr annotation_name: the name under which annotation will be stored.
When empty the name f"{group_name}_{field_name}" will be used. The
annotation_name can only be used when path is a single string.
:attr value_to_label: optionally annotate with label instead of the
value. Only applicable when the vocabulary on the field is given.
"""
if isinstance(paths, str):
paths = [paths]
queryset = self
for path in paths:
queryset = queryset.annotate(
**self._prepare_annotation_data(path, annotation_name, value_to_label)
)
return queryset
[docs]class Entity(BaseCollection, PermissionObject):
"""Postgres model for storing entities."""
class Meta(BaseCollection.Meta):
"""Entity Meta options."""
permissions = (
("view", "Can view entity"),
("edit", "Can edit entity"),
("share", "Can share entity"),
("owner", "Is owner of the entity"),
)
indexes = [
models.Index(name="idx_entity_name", fields=["name"]),
GinIndex(
name="idx_entity_name_trgm", fields=["name"], opclasses=["gin_trgm_ops"]
),
models.Index(name="idx_entity_slug", fields=["slug"]),
GinIndex(name="idx_entity_tags", fields=["tags"]),
GinIndex(name="idx_entity_search", fields=["search"]),
]
#: manager
objects = EntityQuerySet.as_manager()
#: collection to which entity belongs
collection = models.ForeignKey(
"Collection", blank=True, null=True, on_delete=models.CASCADE
)
#: entity type
type = models.CharField(max_length=100, db_index=True, null=True, blank=True)
#: duplication date and time
duplicated = models.DateTimeField(blank=True, null=True)
[docs] def get_annotation(self, path: str, default: Any = None) -> Any:
"""Get the annotation for the given path.
:attr path: the path to the annotation in the format 'group.field'.
:attr default: default value when annotation is not found.
:return: value of the annotation or default if not found.
"""
group_name, field_name = path.split(".", maxsplit=1)
annotation: AnnotationValue = self.annotations.filter(
field__group__name=group_name, field__name=field_name
).first()
return annotation.value if annotation else default
[docs] def set_annotation(self, path: str, value: Any):
"""Get the annotation for the given path.
:attr path: the path to the annotation in the format 'group.field'.
:attr value: the annotation value.
"""
field_id = AnnotationField.id_from_path(path)
if field_id is not None:
AnnotationValue.objects.create(field_id=field_id, entity=self, value=value)
else:
raise RuntimeError(f"No annotation field for path {path}.")
[docs] def is_duplicate(self):
"""Return True if entity is a duplicate."""
return bool(self.duplicated)
[docs] def duplicate(self, contributor) -> BackgroundTask:
"""Duplicate (make a copy) object in the background."""
task_data = {"entity_ids": [self.pk], "inherit_collection": True}
return start_background_task(
BackgroundTaskType.DUPLICATE_ENTITY,
"Duplicate entity",
task_data,
contributor,
)
[docs] def delete_background(self):
"""Delete the object in the background."""
task_data = {
"object_ids": [self.pk],
"content_type_id": ContentType.objects.get_for_model(self).pk,
}
return start_background_task(
BackgroundTaskType.DELETE, "Delete entitiy", task_data, self.contributor
)
@transaction.atomic
@move_to_container
def move_to_collection(
self,
collection: Collection,
handle_missing_annotations: HandleMissingAnnotations = HandleMissingAnnotations.ADD,
):
"""Move entity from the source to the destination collection.
:args collection: the collection to move entity into.
:args handle_missing_annotations: how to handle missing annotations fields in
the new collection
:raises ValidationError: when collection is set to None.
"""
if self.collection == collection:
return
if collection is None:
raise ValidationError(
f"Entity {self}({self.pk}) can only be moved to another container."
)
missing_annotations = AnnotationField.objects.none()
if self.collection is not None:
missing_annotations = self.collection.annotation_fields.exclude(
pk__in=collection.annotation_fields.values_list("pk", flat=True)
)
if missing_annotations:
if handle_missing_annotations == HandleMissingAnnotations.ADD:
collection.annotation_fields.add(*missing_annotations)
elif handle_missing_annotations == HandleMissingAnnotations.REMOVE:
self.annotations.filter(field__in=missing_annotations).delete()
self.collection = collection
self.tags = collection.tags
self.permission_group = collection.permission_group
self.save(update_fields=["collection", "tags", "permission_group"])
self.data.move_to_collection(collection)
[docs] def invalid_annotation_fields(self, annotation_fields=None):
"""Get the Queryset of invalid annotation fields.
The invalid annotation field is a field that has annotatiton but it is
not allowed in the collection this entity belongs to.
:attr annotation_fields: the iterable containing annotations fields to
be checked. When None is given the annotation fields belonging to
the entity are checked.
"""
annotation_fields = annotation_fields or self.collection.annotation_fields
return AnnotationField.objects.filter(values=self.annotations).exclude(
collection=self.collection
)
[docs] def copy_annotations(self, destination: "Entity") -> List[AnnotationValue]:
"""Copy annotation from this entity to the destination.
:raises ValidationError: when some of the annotation fields are missing on the
destination entity.
"""
# Entity without collection can not have annotations.
if self.collection is None:
return []
if destination.collection is None:
raise ValidationError(
f"The entity '{destination.slug}' must belong to the collection."
)
destination_annotations = []
annotation_field_ids = set()
# Create AnnotationValue objects from annotations and collect the set of
# annatiton field ids for them. This set must also be available on the
# destination entity, else exception is raised.
for source_annotation in self.annotations.all():
annotation_value = AnnotationValue(
entity=destination,
field_id=source_annotation.field_id,
value=source_annotation.value,
)
destination_annotations.append(annotation_value)
annotation_field_ids.add(source_annotation.field_id)
destination_fields_qs = destination.collection.annotation_fields.filter(
pk__in=annotation_field_ids
)
if destination_fields_qs.count() < len(annotation_field_ids):
missing_annotation_field_ids = annotation_field_ids - set(
destination_fields_qs.values_list("pk", flat=True)
)
missing_fields_qs = AnnotationField.objects.filter(
pk__in=missing_annotation_field_ids
).values_list("group__name", "name")
missing_field_paths = [".".join(entry) for entry in missing_fields_qs]
missing_fields = ", ".join(missing_field_paths)
raise ValidationError(
f"The collection of entity '{destination}' is missing annotation fields {missing_fields}."
)
return destination_annotations
[docs] def validate_annotations(self):
"""Perform streamlined descriptor validation.
:raises ValidationError: when annotations do not pass validation. All
fields are validated and error messages aggregated into single
exception.
"""
invalid_annotation_fields = self.invalid_annotation_fields()
if invalid_annotation_fields:
msg = ",".join(field.label for field in invalid_annotation_fields)
collection_id = getattr(self, "collection_id", None)
raise ValidationError(
f"The entity '{self.pk}' is annotated with fields '{msg}' "
f"that are not allowed on the collection '{collection_id}'."
)
# Validate all the annotation values on this entity.
validation_errors = []
for annotation in self.annotations:
try:
annotation.validate()
except ValidationError as validation_error:
validation_errors.append(validation_error)
if validation_errors:
raise ValidationError(validation_errors)
[docs] def update_annotations(self, annotations: dict[str, Any], update=True):
"""Update annotations with the given values.
When annotation value is set no None it is deleted.
:attr annotations: the dictionary with annotation values. Keys are annotation
paths.
"""
field_paths = annotations.keys()
field_map = {
field_path: AnnotationField.field_from_path(field_path)
for field_path in field_paths
}
# Create annotation values and prepare list of fields which annotations should
# be deleted.
annotation_values: list[AnnotationValue] = []
validation_errors: list[ValidationError] = []
fields_to_delete: list[int] = []
for field_path, field_value in annotations.items():
if field_value is None:
fields_to_delete.append(field_map[field_path].pk)
else:
value = AnnotationValue(
field=field_map[field_path], value=field_value, entity=self
)
annotation_values.append(value)
try:
value.validate()
except ValidationError as e:
validation_errors.append(e)
if validation_errors:
raise ValidationError(validation_errors)
# Delete and update annotations in a transaction.
with transaction.atomic():
to_delete = AnnotationValue.objects.filter(entity=self)
if update:
to_delete = to_delete.filter(field_id__in=fields_to_delete)
to_delete.delete()
AnnotationValue.objects.bulk_create(
annotation_values,
update_conflicts=True,
update_fields=["_value"],
unique_fields=["entity", "field"],
)
[docs]class RelationType(models.Model):
"""Model for storing relation types."""
#: relation type name
name = models.CharField(max_length=100, unique=True)
#: indicates if order of entities in relation is important or not
ordered = models.BooleanField(default=False)
def __str__(self):
"""Format the object representation."""
return self.name
[docs]class Relation(BaseModel, PermissionObject):
"""Relations between entities.
The Relation model defines the associations and dependencies between
entities in a given collection:
.. code-block:: json
{
"collection": "<collection_id>",
"type": "comparison",
"category": "case-control study",
"entities": [
{"enetity": "<entity1_id>", "label": "control"},
{"enetity": "<entity2_id>", "label": "case"},
{"enetity": "<entity3_id>", "label": "case"}
]
}
Relation ``type`` defines a specific set of associations among
entities. It can be something like ``group``, ``comparison`` or
``series``. The relation ``type`` is an instance of
:class:`.RelationType` and should be defined in any Django app that
uses relations (e.g., as a fixture). Multiple relations of the same
type are allowed on the collection.
Relation ``category`` defines a specific use case. The relation
category must be unique in a collection, so that users can
distinguish between different relations. In the example above, we
could add another ``comparison`` relation of ``category``, say
``Case-case study`` to compare ``<entity2>`` with ``<entity3>``.
Relation is linked to :class:`resolwe.flow.models.Collection` to
enable defining different relations structures in different
collections. This also greatly speed up retrieving of relations, as
they are envisioned to be mainly used on a collection level.
``unit`` defines units used in partitions where it is applicable,
e.g. in relations of type ``series``.
"""
UNIT_SECOND = "s"
UNIT_MINUTE = "min"
UNIT_HOUR = "hr"
UNIT_DAY = "d"
UNIT_WEEK = "wk"
UNIT_CHOICES = (
(UNIT_SECOND, "Second"),
(UNIT_MINUTE, "Minute"),
(UNIT_HOUR, "Hour"),
(UNIT_DAY, "Day"),
(UNIT_WEEK, "Week"),
)
#: type of the relation
type = models.ForeignKey("RelationType", on_delete=models.PROTECT)
#: collection to which relation belongs
collection = models.ForeignKey("Collection", on_delete=models.CASCADE)
#: partitions of entities in the relation
entities = models.ManyToManyField(Entity, through="RelationPartition")
#: category of the relation
category = models.CharField(max_length=100)
#: unit used in the partitions' positions (where applicable, e.g. for serieses)
unit = models.CharField(max_length=3, choices=UNIT_CHOICES, null=True, blank=True)
#: relation descriptor schema
descriptor_schema = models.ForeignKey(
"flow.DescriptorSchema", blank=True, null=True, on_delete=models.PROTECT
)
#: relation descriptor
descriptor = models.JSONField(default=dict)
#: indicate whether `descriptor` doesn't match `descriptor_schema` (is dirty)
descriptor_dirty = models.BooleanField(default=False)
#: custom manager with permission filtering methods
objects = PermissionQuerySet.as_manager()
class Meta(BaseModel.Meta):
"""Relation Meta options."""
unique_together = ("collection", "category")
permissions = (
("view", "Can view relation"),
("edit", "Can edit relation"),
("share", "Can share relation"),
("owner", "Is owner of the relation"),
)
[docs] def save(self, *args, **kwargs):
"""Perform descriptor validation and save object."""
if self.descriptor_schema:
try:
validate_schema(self.descriptor, self.descriptor_schema.schema)
self.descriptor_dirty = False
except DirtyError:
self.descriptor_dirty = True
elif self.descriptor and self.descriptor != {}:
raise ValueError(
"`descriptor_schema` must be defined if `descriptor` is given"
)
super().save()
class RelationPartition(models.Model):
"""Intermediate model to define Relations between Entities."""
#: entity in the relation
entity = models.ForeignKey(Entity, on_delete=models.CASCADE)
#: relation to which entity belongs
relation = models.ForeignKey(Relation, on_delete=models.CASCADE)
#: number used for ordering entities in the relation
position = models.PositiveSmallIntegerField(null=True, blank=True, db_index=True)
#: Human-readable label of the position
label = models.CharField(max_length=30, null=True, blank=True, db_index=True)
class Meta:
"""RelationPartition Meta options."""
unique_together = ("entity", "relation")