Source code for resolwe.flow.models.annotations

"""Resolwe annotations model."""
import abc
import datetime
import re
from collections import defaultdict
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Any,
    Iterable,
    List,
    Mapping,
    MutableSequence,
    Optional,
    Sequence,
    Type,
)

from django.core.exceptions import ValidationError
from django.db import models

from resolwe.flow.models.base import BaseModel
from resolwe.permissions.models import Permission, PermissionObject

if TYPE_CHECKING:
    from resolwe.flow.models import Collection, Entity

from .base import AuditModel

VALIDATOR_LENGTH = 128
NAME_LENGTH = 128
LABEL_LENGTH = 128
DESCRIPTION_LENGTH = 256


class HandleMissingAnnotations(Enum):
    """How to handle missing annotations."""

    ADD = "ADD"
    REMOVE = "REMOVE"


class AnnotationType(Enum):
    """Supported annotation types."""

    DATE = "DATE"
    DECIMAL = "DECIMAL"
    INTEGER = "INTEGER"
    STRING = "STRING"


class AnnotationValueValidator:
    """Class that validates the annotation value.

    It uses plugins (classes) to validate the annotation of the given type.
    """

    def __init__(self):
        """Initialize."""
        self._validators: Mapping[
            AnnotationType, MutableSequence["AnnotationFieldBaseValidator"]
        ] = defaultdict(list)

    def _add_validator(self, validator: "AnnotationFieldBaseValidator"):
        """Add the given validator."""
        for field_type in validator.annotation_field_types:
            self._validators[field_type].append(validator)

    def validate(
        self, annotation_value: "AnnotationValue", raise_exception: bool = False
    ) -> Sequence[ValidationError]:
        """Validate the given AnnotationValue object.

        All validation errors are grouped together.

        :raises ValidationError: if annotation fails and raise_exception is True. The
            actual errors are nested inside outer ValidationError.
        """
        errors = []
        validators = self._validators[annotation_value.field.annotation_type]
        for validator in validators:
            try:
                validator.validate(annotation_value.value, annotation_value.field)
            except ValidationError as error:
                errors.append(error)
        if errors and raise_exception:
            raise ValidationError(errors)
        return errors


annotation_value_validator = AnnotationValueValidator()


class AnnotationFieldBaseValidator(metaclass=abc.ABCMeta):
    """Base class for annotation class validation."""

    annotation_field_types: Iterable[AnnotationType]

    @classmethod
    def __init_subclass__(cls: Type["AnnotationFieldBaseValidator"], **kwargs):
        """Register instance of the class with the validator."""
        super().__init_subclass__(**kwargs)
        annotation_value_validator._add_validator(cls())

    def validate(self, value: Any, annotation_field: "AnnotationField"):
        """Validate the value.

        :raises ValidatiorError: on validation failure.
        """
        raise NotImplementedError("Subclass should implement the 'validate' method.")


class AnnotationFieldStringValidator(AnnotationFieldBaseValidator):
    """Validates the string field."""

    annotation_field_types = (AnnotationType.STRING,)

    def validate(self, value: Any, annotation_field: "AnnotationField"):
        """Validate value to be of type str."""
        if not isinstance(value, str):
            raise ValidationError(
                f"The value '{value}' is not of the expected type 'str'."
            )


class AnnotationFieldIntegerValidator(AnnotationFieldBaseValidator):
    """Validates the integer field."""

    annotation_field_types = (AnnotationType.INTEGER,)

    def validate(self, value: Any, annotation_field: "AnnotationField"):
        """Validate value to be of type int."""
        if not isinstance(value, int):
            raise ValidationError(
                f"The value '{value}' is not of the expected type 'int'."
            )


class AnnotationFieldDecimalValidator(AnnotationFieldBaseValidator):
    """Validates the decimal field."""

    annotation_field_types = (AnnotationType.DECIMAL,)

    def validate(self, value: Any, annotation_field: "AnnotationField"):
        """Validate value to be of type int or float."""
        if not isinstance(value, (int, float)):
            raise ValidationError(
                f"The value '{value}' is not of the expected types 'int' or 'float'."
            )


class AnnotationFieldDateValidator(AnnotationFieldBaseValidator):
    """Validates the date field."""

    annotation_field_types = (AnnotationType.DATE,)

    def validate(self, value: Any, annotation_field: "AnnotationField"):
        """Validate value represents a date in the format YYYY-MM-DD.

        The month and day part can contain a single character.
        """
        if not isinstance(value, str):
            raise ValidationError(
                f"The value '{value}' is not of the expected type 'str'."
            )
        try:
            datetime.datetime.strptime(value, "%Y-%m-%d")
        except ValueError:
            raise ValidationError(
                f"Value {value} has incorrect format, use YYYY-MM-DD."
            )


class AnnotationFieldRegexValidator(AnnotationFieldBaseValidator):
    """Validates regex for the annotation value."""

    annotation_field_types = (
        AnnotationType.INTEGER,
        AnnotationType.DATE,
        AnnotationType.DECIMAL,
        AnnotationType.STRING,
    )

    def validate(self, value: Any, annotation_field: "AnnotationField"):
        """Validate that the value matches regex defined on the field.

        The value is converted to string before matching.
        """
        # Validate the regex.
        if annotation_field.validator_regex is not None:
            regex = annotation_field.validator_regex
            value = str(value)
            if re.search(regex, value) is None:
                raise ValidationError(
                    f"The value '{value}' for the field '{annotation_field.pk}' does "
                    f"not match the regex '{regex}'."
                )


class AnnotationFieldVocabularyValidator(AnnotationFieldBaseValidator):
    """Validates that the value matches vocabulary."""

    annotation_field_types = (
        AnnotationType.INTEGER,
        AnnotationType.DATE,
        AnnotationType.DECIMAL,
        AnnotationType.STRING,
    )

    def validate(self, value: Any, annotation_field: "AnnotationField"):
        """Validate that the value matches vocabulary on the field."""
        if (
            annotation_field.vocabulary is not None
            and value not in annotation_field.vocabulary
        ):
            raise ValidationError(
                f"The value '{value}' is not valid for the field {annotation_field}."
            )


[docs]class AnnotationGroup(models.Model): """Group of annotation fields.""" #: the name of the annotation group name = models.CharField(max_length=NAME_LENGTH) #: the label of the annotation group label = models.CharField(max_length=LABEL_LENGTH) #: the sorting order among annotation groups sort_order = models.PositiveSmallIntegerField() def __str__(self) -> str: """Return user-friendly string representation.""" return f"{self.name}" class Meta: """Set the default ordering.""" ordering = ["sort_order"]
[docs]class AnnotationField(models.Model): """Annotation field.""" #: the name of the annotation fields name = models.CharField(max_length=NAME_LENGTH) #: user visible field name label = models.CharField(max_length=LABEL_LENGTH) #: user visible field description description = models.CharField(max_length=DESCRIPTION_LENGTH) #: the type of the annotation field type = models.CharField(max_length=16) #: the annotation group this field belongs to group = models.ForeignKey( AnnotationGroup, on_delete=models.CASCADE, related_name="fields" ) #: the sorting order among annotation fields sort_order = models.PositiveSmallIntegerField() #: optional regular expression for validation validator_regex = models.CharField( max_length=VALIDATOR_LENGTH, null=True, blank=True ) #: optional map of valid values to labels vocabulary = models.JSONField(null=True, blank=True) #: is this field required required = models.BooleanField(default=False) def __init__(self, *args, **kwargs): """Store original vocabulary to private variable.""" super().__init__(*args, **kwargs) self._original_vocabulary = self.vocabulary self._original_type = self.type
[docs] @staticmethod def add_to_collection(source: "Collection", destination: "Collection"): """Add fields from the source to the destination collection.""" destination.annotation_fields.add(*source.annotation_fields.all())
@property def annotation_type(self) -> AnnotationType: """Get the field type as enum.""" return AnnotationType(self.type.upper())
[docs] @staticmethod def group_field_from_path(path: str) -> List[str]: """Return the group and field name from path.""" match = re.fullmatch( r"(?P<group>[a-zA-Z][\w ]+)\.(?P<field>[a-zA-Z][\w ]+)", path, re.ASCII ) if match is None: raise ValidationError(f"Invalid path '{path}'.") return [match["group"], match["field"]]
[docs] @classmethod def id_from_path(cls, path: str) -> Optional[int]: """Get the field id from the field path.""" group_name, field_name = cls.group_field_from_path(path) return ( cls.objects.filter(group__name=group_name, name=field_name) .values_list("id", flat=True) .first() )
[docs] @classmethod def field_from_path(cls, path: str) -> "AnnotationField": """Get the field id from the field path. :raises ValidationError: when field does not exist. """ group_name, field_name = cls.group_field_from_path(path) field = cls.objects.filter(group__name=group_name, name=field_name).first() if not field: raise ValidationError(f"Field '{path}' does not exist.") return field
[docs] def save(self, *args, **kwargs): """Recompute the labels for annotation values if vocabulary changes. :raises ValidationError: when vocabulary changes so that annotation values are no longer valid. """ super().save(*args, **kwargs) if self.vocabulary != self._original_vocabulary: updated_fields = [] for annotation_value_field in self.values.all(): annotation_value_field.recompute_label() annotation_value_field.validate() updated_fields.append(annotation_value_field) AnnotationValue.objects.bulk_update(updated_fields, ["_value"]) self._original_vocabulary = self.vocabulary if self.type != self._original_type: self.revalidate_values() self._original_type = self.type
[docs] def revalidate_values(self): """Revalidate all annotation values. :raises ValidationError: when validation fails. """ for annotation_value in self.values.iterator(): annotation_value.validate()
[docs] def label_by_value(self, label: str) -> str: """Get the value by label. When no value is found the label is returned. """ if self.vocabulary is not None: for vocabulary_value, vocabulary_label in self.vocabulary.items(): if label == vocabulary_label: return vocabulary_value return label
def __str__(self) -> str: """Return user-friendly string representation.""" return f"{self.group.name}.{self.name}" class Meta: """Set the constraints and the default ordering.""" constraints = [ # Accept only supported annotation types. models.constraints.CheckConstraint( check=models.Q(type__in=[e.value for e in AnnotationType]), name="annotation_type", ), models.constraints.UniqueConstraint( fields=["name", "group"], name="uniquetogether_name_group" ), ] ordering = ["group__sort_order", "sort_order"]
[docs]class AnnotationPreset(BaseModel, PermissionObject): """The named set of annotation fields. The presets have permissions. """ #: the fields belonging to this preset fields = models.ManyToManyField(AnnotationField, related_name="presets") class Meta: """Override parent meta.""" def __str__(self): """Return the string representation.""" return self.name
class AnnotationValueQuerySet(models.QuerySet): """Custom queryset for AnnotationValue.""" def filter_for_user(self, user) -> "AnnotationValueQuerySet": """Filter annotation values for user. Return the annotation values on entities user has view permission for. """ # Avoid circular import. from resolwe.flow.models import Entity return self.filter( entity__in=Entity.objects.filter( pk__in=self.values("entity") ).filter_for_user(user) )
[docs]class AnnotationValue(AuditModel): """The value of the annotation.""" class Meta: """Set the unique constraints.""" constraints = [ models.constraints.UniqueConstraint( fields=["entity", "field"], name="uniquetogether_entity_field" ), ] ordering = ["field__group__sort_order", "field__sort_order"] objects = AnnotationValueQuerySet.as_manager() #: the entity this field belongs to entity: "Entity" = models.ForeignKey( "Entity", related_name="annotations", on_delete=models.CASCADE ) #: the field this field belongs to field: AnnotationField = models.ForeignKey( AnnotationField, related_name="values", on_delete=models.PROTECT ) #: the date when field was last modified modified = models.DateTimeField(auto_now=True) #: value is stored under key 'value' in the json field to simplify lookups _value: Any = models.JSONField(default=dict) def __init__(self, *args, **kwargs): """Allow us to set the 'value' in the constructor. This is a little tricky, as value can be passed in args or kwargs. We only intercept the value when given as kwarg, since args are used by Django when constructing class from database value. """ if "value" in kwargs: kwargs["_value"] = {"value": kwargs.pop("value")} super().__init__(*args, **kwargs) if "label" not in self._value: self.recompute_label() @property def value(self) -> str | int | float | datetime.date: """Get the actual value.""" return self._value["value"] @value.setter def value(self, value: str | int | float | datetime.date): """Set the value. The label is recomputed when the value is set. """ self._value["value"] = value self.recompute_label() @property def _computed_label(self) -> Any: """Return computed label from value. :raises KeyError: when the value is not in the vocabulary. """ if self.field.vocabulary is None: return self.value else: # Always return a value even if it is not in the vocabulary. The validation # step will take care of the wrong values. return self.field.vocabulary.get(self.value, self.value) @property def label(self) -> Any: """Return the cached label.""" return self._value["label"]
[docs] def validate(self): """Validate the given value. The validation is always done in full, all errors are gathered and sent in the response. :raises ValidationError: when the validation fails. """ annotation_value_validator.validate(self, raise_exception=True)
[docs] def save(self, *args, **kwargs): """Save the annotation value after validation has passed.""" annotation_value_validator.validate(self, raise_exception=True) # Make sure the label is always set. if "label" not in self._value: self.recompute_label() super().save(*args, **kwargs)
[docs] def recompute_label(self): """Recompute label from value and set it to the model instance.""" self._value["label"] = self._computed_label
[docs] @staticmethod def from_path(entity_id: int, path: str) -> Optional["AnnotationValue"]: """Get the annotation value from the path.""" field_id = AnnotationField.id_from_path(path) return AnnotationValue.objects.filter( entity_id=entity_id, field_id=field_id ).first()
[docs] def has_permission(self, permission: Permission, user) -> bool: """Return if user permission on this object. The permission is checked on the entity. """ return self.entity.has_permission(permission, user)
def __str__(self) -> str: """Return user-friendly string representation.""" return f"{self.label}"