""".. Ignore pydocstyle D400.
.. autoclass:: resolwe.test.ProcessTestCase
:members:
"""
import contextlib
import filecmp
import gzip
import hashlib
import io
import json
import os
import shutil
import sys
import tarfile
import tempfile
import time
import uuid
import zipfile
from itertools import filterfalse
from django.conf import settings
from django.core import management
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.db import transaction
from django.utils.text import slugify
from resolwe.flow.models import Data, DescriptorSchema, Process, Storage
from resolwe.flow.utils import dict_dot, iterate_fields, iterate_schema
from resolwe.permissions.models import PermissionGroup
from resolwe.storage.connectors import connectors
from resolwe.test import TransactionTestCase
from ..utils import get_processes_from_tags, has_process_tag
SCHEMAS_FIXTURE_CACHE = None
class TestProfiler:
"""Simple test profiler."""
def __init__(self, test):
"""Initialize test profiler.
:param test: Unit test instance
"""
self._test = test
self._start = time.time()
if getattr(settings, "TEST_PROCESS_PROFILE", False):
self._file = open(
"profile-resolwe-process-tests-{}.json".format(os.getpid()), "a"
)
else:
self._file = None
# Automatically cleanup when test completes.
test.addCleanup(self.close)
def add(self, data):
"""Add output to profile log.
:param data: Arbitrary data dictionary
"""
if not self._file:
return
data.update({"test": self._test.id()})
self._file.write(json.dumps(data))
self._file.write("\n")
@contextlib.contextmanager
def add_block(self, name):
"""Profile a named block of code.
:param name: Block name
"""
block_start = time.time()
try:
yield
finally:
block_end = time.time()
self.add({name: block_end - block_start})
def close(self):
"""Close profiler log."""
if not self._file:
return
self.add({"total": time.time() - self._start})
self._file.close()
[docs]class ProcessTestCase(TransactionTestCase):
"""Base class for writing process tests.
It is a subclass of :class:`.TransactionTestCase` with some specific
functions used for testing processes.
To write a process test use standard Django's syntax for writing
tests and follow the next steps:
#. Put input files (if any) in ``tests/files`` directory of a
Django application.
#. Run the process using
:meth:`.run_process`.
#. Check if the process has the expected status using
:meth:`.assertStatus`.
#. Check process's output using :meth:`.assertFields`,
:meth:`.assertFile`, :meth:`.assertFileExists`,
:meth:`.assertFiles` and :meth:`.assertJSON`.
.. note::
When creating a test case for a custom Django application,
subclass this class and over-ride the ``self.files_path`` with:
.. code-block:: python
self.files_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'files')
.. DANGER::
If output files don't exist in ``tests/files`` directory of a
Django application, they are created automatically.
But you have to check that they are correct before using them
for further runs.
"""
def _update_schema_relations(self, schemas):
"""Update foreign keys on process and descriptor schema.
The field contributor is updated.
"""
for schema in schemas:
schema.contributor = self.admin
def _create_permission_groups(self, schemas):
"""Create permission groups."""
for schema in schemas:
schema.permission_group = PermissionGroup.objects.create()
def _register_schemas(
self, path=None, processes_paths=None, descriptors_paths=None
):
"""Register process and descriptor schemas.
If ``processes_paths`` or ``descriptor_path`` arguments are
given, only processes/descriptors from that path are registered.
``path`` argument is supported for backward compatibility and
applies to both arguments above.
Process and DescriptorSchema are cached to SCHEMAS_FIXTURE_CACHE
global variable based on ``processes_paths`` and
``descriptors_paths`` keys in ``kwargs``.
"""
def remove_pks(schemas):
"""Remove primary keys from the given schemas."""
for s in schemas:
s.pk = None
return schemas
if path:
if processes_paths or descriptors_paths:
raise ValueError(
"processes_paths and descriptors_paths arguments must be None if path is defined."
)
processes_paths = descriptors_paths = path
schemas_types = [
{
"name": "descriptor_schemas",
"model": DescriptorSchema,
"cache_key": descriptors_paths,
},
{
"name": "processes",
"model": Process,
"cache_key": processes_paths,
},
]
for schemas in schemas_types:
schemas["model"].objects.all().delete()
cache_key = str(
"processes_paths={};descriptors_paths={}".format(
processes_paths, descriptors_paths
)
)
global SCHEMAS_FIXTURE_CACHE
if not SCHEMAS_FIXTURE_CACHE:
SCHEMAS_FIXTURE_CACHE = {}
stdout, stderr = io.StringIO(), io.StringIO()
if cache_key in SCHEMAS_FIXTURE_CACHE:
for schemas in schemas_types:
# NOTE: Schemas' current primary keys may not be unique on the next runs of
# processes' tests, therefore we must remove them and let the DB re-create them
# properly.
# WARNING: Cached schemas' primary keys will be set on every call to bulk_create(),
# therefore we need to remove them and let the DB re-create them every time. For
# more details, see:
# https://github.com/django/django/blob/1.10.7/django/db/models/query.py#L455-L456
schemas_cache = remove_pks(
SCHEMAS_FIXTURE_CACHE[cache_key][schemas["name"]]
)
self._update_schema_relations(schemas_cache)
self._create_permission_groups(schemas_cache)
schemas["model"].objects.bulk_create(schemas_cache)
else:
if processes_paths is None and descriptors_paths is None:
management.call_command(
"register", force=True, stdout=stdout, stderr=stderr
)
else:
settings_overrides = {
"FLOW_PROCESSES_FINDERS": [
"resolwe.flow.finders.FileSystemProcessesFinder"
]
}
if processes_paths is not None:
settings_overrides["FLOW_PROCESSES_DIRS"] = processes_paths
if descriptors_paths is not None:
settings_overrides["FLOW_DESCRIPTORS_DIRS"] = descriptors_paths
with self.settings(**settings_overrides):
management.call_command(
"register", force=True, stdout=stdout, stderr=stderr
)
if cache_key not in SCHEMAS_FIXTURE_CACHE:
SCHEMAS_FIXTURE_CACHE[cache_key] = {}
# NOTE: list() forces DB query execution
for schemas in schemas_types:
SCHEMAS_FIXTURE_CACHE[cache_key][schemas["name"]] = list(
schemas["model"].objects.all()
)
return stdout, stderr
[docs] def setUp(self):
"""Initialize test data."""
super().setUp()
_, stderr = self._register_schemas()
stderr = stderr.getvalue()
if stderr:
raise RuntimeError(stderr)
upload_connectors = [
connector
for connector in connectors.for_storage("upload")
if connector.mountable
]
assert upload_connectors, "No upload connector defined on filesystem"
self.upload_dir = upload_connectors[0].path
self._profiler = TestProfiler(self)
self._preparation_stage = 0
self._executed_processes = set()
self._files_path = None
self._upload_files = []
# create upload dir if it doesn't exist
if not os.path.isdir(self.upload_dir):
os.mkdir(self.upload_dir)
[docs] def tearDown(self):
"""Clean up after the test."""
# delete Data objects and their files unless keep_data
for d in Data.objects.all():
if self._keep_data:
print("KEEPING DATA: {}".format(d.pk))
elif d.location:
data_dir = d.location.get_path()
shutil.rmtree(data_dir, ignore_errors=True)
d.delete()
# remove uploaded files
if not self._keep_data:
for fn in self._upload_files:
shutil.rmtree(fn, ignore_errors=True)
super().tearDown()
# Check test outcome to prevent failing the test twice.
# Adapted from: https://stackoverflow.com/a/39606065
def list2reason(exc_list):
"""Error reason conversion helper."""
if exc_list and exc_list[-1][0] is self:
return exc_list[-1][1]
result = self.defaultTestResult()
# Python 3.11 has no feedErrorsToResult method.
if sys.version_info.major == 3 and sys.version_info.minor <= 10:
self._feedErrorsToResult(result, self._outcome.errors)
error = list2reason(result.errors)
failure = list2reason(result.failures)
# Ensure all tagged processes were tested.
if (
not error
and not failure
and getattr(settings, "TEST_PROCESS_REQUIRE_TAGS", False)
):
test = getattr(self, self._testMethodName)
for slug in get_processes_from_tags(test):
if slug not in self._executed_processes:
self.fail(
'Test was tagged with process "{}", but this process was not '
"executed during test. Remove the tag or test the process.".format(
slug
)
)
[docs] @contextlib.contextmanager
def preparation_stage(self):
"""Context manager to mark input preparation stage."""
with self._profiler.add_block("preparation"):
self._preparation_stage += 1
try:
yield
finally:
self._preparation_stage -= 1
# TODO: Handle automatic caching.
@property
def files_path(self):
"""Path to test files."""
if self._files_path is None:
raise NotImplementedError
return self._files_path
@files_path.setter
def files_path(self, value):
self._files_path = value
[docs] def run_processor(self, *args, **kwargs):
"""Run process.
Deprecated method: use run_process.
"""
return self.run_process(*args, **kwargs)
# TODO: warning
[docs] def run_process(
self,
process_slug,
input_={},
assert_status=Data.STATUS_DONE,
descriptor=None,
descriptor_schema=None,
verbosity=0,
tags=None,
contributor=None,
collection=None,
process_resources=None,
):
"""Run the specified process with the given inputs.
If input is a file, file path should be given relative to the
``tests/files`` directory of a Django application.
If ``assert_status`` is given, check if
:class:`~resolwe.flow.models.Data` object's status matches
it after the process has finished.
.. note::
If you need to delay calling the manager, you must put the
desired code in a ``with transaction.atomic()`` block.
:param str process_slug: slug of the
:class:`~resolwe.flow.models.Process` to run
:param dict ``input_``: :class:`~resolwe.flow.models.Process`'s
input parameters
.. note::
You don't have to specify parameters with defined
default values.
:param str ``assert_status``: desired status of the
:class:`~resolwe.flow.models.Data` object
:param dict descriptor: descriptor to set on the
:class:`~resolwe.flow.models.Data` object
:param dict descriptor_schema: descriptor schema to set on the
:class:`~resolwe.flow.models.Data` object
:param list tags: list of tags that will be added to the created
:class:`~resolwe.flow.models.Data` object
:return: object created by
:class:`~resolwe.flow.models.Process`
:rtype: ~resolwe.flow.models.Data
"""
# Copy input_, to avoid mutation that would occur in ``mock_upload``
input_ = input_.copy()
# backward compatibility
process_slug = slugify(process_slug.replace(":", "-"))
# Enforce correct process tags.
if (
getattr(settings, "TEST_PROCESS_REQUIRE_TAGS", False)
and not self._preparation_stage
):
test = getattr(self, self._testMethodName)
if not has_process_tag(test, process_slug):
self.fail(
'Tried to run process with slug "{0}" outside of preparation_stage\n'
"block while test is not tagged for this process. Either tag the\n"
"test using tag_process decorator or move this under the preparation\n"
"stage block if this process is only used to prepare upstream inputs.\n"
"\n"
"To tag the test you can add the following decorator:\n"
" @tag_process('{0}')\n"
"".format(process_slug)
)
self._executed_processes.add(process_slug)
process = Process.objects.filter(slug=process_slug).order_by("-version").first()
if process is None:
self.fail('No process with slug "{}"'.format(process_slug))
def mock_upload(file_path):
"""Mock file upload."""
def is_url(path):
"""Check if path is a URL."""
validate = URLValidator()
try:
validate(path)
except (ValueError, ValidationError):
return False
return True
if is_url(file_path):
return {
"file": file_path,
"file_temp": file_path,
"is_remote": True,
}
else:
old_path = os.path.join(self.files_path, file_path)
if not os.path.isfile(old_path):
raise RuntimeError("Missing file: {}".format(old_path))
file_basename = os.path.basename(file_path)
file_temp = "{}_{}".format(file_basename, uuid.uuid4())
upload_file_path = os.path.join(self.upload_dir, file_temp)
# create directories needed by new_path
upload_file_dir = os.path.dirname(upload_file_path)
if not os.path.exists(upload_file_dir):
os.makedirs(upload_file_dir)
shutil.copy2(old_path, upload_file_path)
self._upload_files.append(upload_file_path)
return {
"file": file_basename,
"file_temp": file_temp,
}
for field_schema, fields in iterate_fields(input_, process.input_schema):
# copy referenced files to upload dir
if field_schema["type"] == "basic:file:":
fields[field_schema["name"]] = mock_upload(fields[field_schema["name"]])
elif field_schema["type"] == "list:basic:file:":
file_list = [
mock_upload(file_path) for file_path in fields[field_schema["name"]]
]
fields[field_schema["name"]] = file_list
# convert primary keys to strings
if field_schema["type"].startswith("data:"):
fields[field_schema["name"]] = fields[field_schema["name"]]
if field_schema["type"].startswith("list:data:"):
fields[field_schema["name"]] = [
obj for obj in fields[field_schema["name"]]
]
data = Data.objects.create(
input=input_,
contributor=contributor or self.admin,
process=process,
tags=tags or [],
descriptor_schema=descriptor_schema,
descriptor=descriptor or {},
collection=collection,
process_resources=process_resources or {},
)
# Fetch latest Data object from database
data = Data.objects.get(pk=data.pk)
if assert_status:
if not transaction.get_autocommit() and assert_status == Data.STATUS_DONE:
# We are in an atomic transaction block, hence the data object will not be done
# until after the block. Therefore the expected status is resolving.
assert_status = Data.STATUS_RESOLVING
self.assertStatus(data, assert_status)
return data
[docs] def get_json(self, file_name, storage):
"""Return JSON saved in file and test JSON to compare it to.
The method returns a tuple of the saved JSON and the test JSON.
In your test you should then compare the test JSON to the saved
JSON that is commited to the repository.
The storage argument could be a Storage object, Storage ID or a
Python dictionary. The test JSON is assigned a json field of
the Storage object or the complete Python dictionary
(if a dict is given).
If the file does not exist it is created, the test JSON is
written to the new file and an exception is rased.
:param str file_name: file name (and relative path) of a JSON
file. Path should be relative to the ``tests/files``
directory of a Django app. The file name must have a ``.gz`` extension.
:param storage: Storage object, Storage ID or a dict.
:type storage: :class:`~resolwe.flow.models.Storage`,
:class:`str` or :class:`dict`
:return: (reference JSON, test JSON)
:rtype: tuple
"""
self.assertEqual(
os.path.splitext(file_name)[1], ".gz", msg="File extension must be .gz"
)
if isinstance(storage, Storage):
json_dict = storage.json
elif isinstance(storage, int):
json_dict = Storage.objects.get(pk=storage).json
elif isinstance(storage, dict):
json_dict = storage
else:
raise ValueError("Argument storage should be of type Storage, int or dict.")
file_path = os.path.join(self.files_path, file_name)
if not os.path.isfile(file_path):
with gzip.open(file_path, mode="wt") as f:
json.dump(json_dict, f)
self.fail(msg="Output file {} missing so it was created.".format(file_name))
with gzip.open(file_path, mode="rt") as f:
return json.load(f), json_dict
[docs] def assertStatus(self, obj, status):
"""Check if object's status is equal to the given status.
:param obj: object for which to check the status
:type obj: ~resolwe.flow.models.Data
:param str status: desired value of object's
:attr:`~resolwe.flow.models.Data.status` attribute
"""
self.assertEqual(
obj.status,
status,
msg="Data status is '{}', not '{}'".format(obj.status, status)
+ self._debug_info(obj),
)
def _get_output_field(self, obj, path):
"""Return object's output field schema and field dict.
:param obj: object with the output field
:type obj: ~resolwe.flow.models.Data
:param str path: path to :class:`~resolwe.flow.models.Data`
object's output field
"""
for field_schema, field, field_path in iterate_fields(
obj.output, obj.process.output_schema, ""
):
if path == field_path:
return field_schema, field
self.fail("Field not found in path {}.".format(path))
[docs] def assertFields(self, obj, path, value):
"""Compare object's field to the given value.
The file size is ignored. Use assertFile to validate
file contents.
:param obj: object with the field to compare
:type obj: ~resolwe.flow.models.Data
:param str path: path to
:class:`~resolwe.flow.models.Data` object's field
:param str value: desired value of
:class:`~resolwe.flow.models.Data` object's field
"""
field_schema, field = None, None
for field_schema, field, field_path in iterate_schema(
obj.output, obj.process.output_schema, ""
):
if path == field_path:
break
else:
self.fail("Field not found in path {}".format(path))
field_name = field_schema["name"]
field_value = field[field_name]
def remove_file_size(field_value):
"""Remove size value from file field."""
if "size" in field_value:
del field_value["size"]
# Ignore size in file and dir fields
if field_schema["type"].startswith("basic:file:") or field_schema[
"type"
].startswith("basic:dir:"):
remove_file_size(field_value)
remove_file_size(value)
elif field_schema["type"].startswith("list:basic:file:") or field_schema[
"type"
].startswith("list:basic:dir:"):
for val in field_value:
remove_file_size(val)
for val in value:
remove_file_size(val)
self.assertEqual(
field_value,
value,
msg="Field 'output.{}' mismatch: {} != {}".format(path, field_value, value)
+ self._debug_info(obj),
)
def _assert_file(
self,
obj,
fn_tested,
fn_correct,
compression=None,
file_filter=lambda _: False,
sort=False,
):
"""Compare files."""
open_kwargs = {}
if compression is None:
open_fn = open
# by default, open() will open files as text and return str
# objects, but we need bytes objects
open_kwargs["mode"] = "rb"
elif compression == "gzip":
open_fn = gzip.open
elif compression == "zip":
open_fn = zipfile.ZipFile.open
else:
raise ValueError("Unsupported compression format.")
def get_sha256(filename, **kwargs):
"""Get sha256 for a given file."""
with open_fn(filename, **kwargs) as handle:
contents = [line for line in filterfalse(file_filter, handle)]
if sort:
contents = sorted(contents)
contents = b"".join(contents)
return hashlib.sha256(contents).hexdigest()
output = obj.location.get_path(filename=fn_tested)
output_hash = get_sha256(output, **open_kwargs)
correct_path = os.path.join(self.files_path, fn_correct)
if not os.path.isfile(correct_path):
shutil.copyfile(output, correct_path)
self.fail(
msg="Output file {} missing so it was created.".format(fn_correct)
)
correct_hash = get_sha256(correct_path, **open_kwargs)
self.assertEqual(
correct_hash,
output_hash,
msg=(
f"File contents hash mismatch: {correct_hash} != {output_hash}"
f" comparing '{output}' with '{correct_path}'."
)
+ self._debug_info(obj),
)
[docs] def assertFile(self, obj, field_path, fn, **kwargs):
"""Compare a process's output file to the given correct file.
:param obj: object that includes the file to compare
:type obj: ~resolwe.flow.models.Data
:param str field_path: path to
:class:`~resolwe.flow.models.Data` object's field with the
file name
:param str fn: file name (and relative path) of the correct
file to compare against. Path should be relative to the
``tests/files`` directory of a Django application.
:param str compression: if not ``None``, files will be
uncompressed with the appropriate compression library
before comparison.
Currently supported compression formats are *gzip* and
*zip*.
:param filter: function for filtering the contents of output
files. It is used in :func:`itertools.filterfalse` function
and takes one parameter, a line of the output file. If it
returns ``True``, the line is excluded from comparison of
the two files.
:type filter: ~types.FunctionType
:param bool sort: if set to ``True``, basic sort will be performed
on file contents before computing hash value.
"""
field = dict_dot(obj.output, field_path)
self._assert_file(obj, field["file"], fn, **kwargs)
[docs] def assertFiles(self, obj, field_path, fn_list, **kwargs):
"""Compare a process's output file to the given correct file.
:param obj: object which includes the files to compare
:type obj: ~resolwe.flow.models.Data
:param str field_path: path to
:class:`~resolwe.flow.models.Data` object's field with the
list of file names
:param list fn_list: list of file names (and relative paths) of
files to compare against. Paths should be relative to the
``tests/files`` directory of a Django application.
:param str compression: if not ``None``, files will be
uncompressed with the appropriate compression library
before comparison.
Currently supported compression formats are *gzip* and
*zip*.
:param filter: Function for filtering the contents of output
files. It is used in :obj:`itertools.filterfalse` function
and takes one parameter, a line of the output file. If it
returns ``True``, the line is excluded from comparison of
the two files.
:type filter: ~types.FunctionType
:param bool sort: if set to ``True``, basic sort will be performed
on file contents before computing hash value.
"""
field = dict_dot(obj.output, field_path)
if len(field) != len(fn_list):
self.fail(
msg="Lengths of list:basic:file field and files list are not equal."
)
for fn_tested, fn_correct in zip(field, fn_list):
self._assert_file(obj, fn_tested["file"], fn_correct, **kwargs)
[docs] def assertFileExists(self, obj, field_path):
"""Ensure a file in the given object's field exists.
:param obj: object that includes the file for which to check if
it exists
:type obj: ~resolwe.flow.models.Data
:param str field_path: path to
:class:`~resolwe.flow.models.Data` object's field with the
file name/path
"""
field = dict_dot(obj.output, field_path)
output = obj.location.get_path(filename=field["file"])
if not os.path.isfile(output):
self.fail(msg="File {} does not exist.".format(field_path))
[docs] def assertFilesExist(self, obj, field_path):
"""Ensure files in the given object's field exists.
:param obj: object that includes list of files for which to check
existance
:type obj: ~resolwe.flow.models.Data
:param str field_path: path to
:class:`~resolwe.flow.models.Data` object's field with the
file name/path
"""
field = dict_dot(obj.output, field_path)
for item in field:
output_file = obj.location.get_path(filename=item["file"])
if not os.path.isfile(output_file):
self.fail(
msg="File {} in output field {} does not exist.".format(
item["file"], field_path
)
)
[docs] def assertDirExists(self, obj, field_path):
"""Assert that a directory in the output field of the given object exists.
:param obj: object that includes the file for which to check if
it exists
:param field_path: directory name/path
"""
schema, field = self._get_output_field(obj, field_path)
if not schema["type"].startswith("basic:dir:"):
self.fail(msg="Field {} is not of type basic:dir:".format(field_path))
dir_path = obj.location.get_path(filename=field[field_path]["dir"])
if not os.path.isdir(dir_path):
self.fail(
msg="Directory {} in output field {} does not exist.".format(
dir_path, field_path
)
)
def _assert_dir_structure(self, dir_path, dir_struct, exact=True):
"""Compare tree structure of directory `dir_path` to `dir_struct`."""
test_dirs = set()
test_files = set()
for root, dirs, files in os.walk(dir_path):
for test_dir in dirs:
test_dirs.add(os.path.relpath(os.path.join(root, test_dir), dir_path))
for test_file in files:
test_files.add(os.path.relpath(os.path.join(root, test_file), dir_path))
def get_dirs(indict, root=""):
"""Generate directory names from dict."""
for key, value in indict.items():
if isinstance(value, dict):
yield os.path.join(root, key)
yield from get_dirs(value, os.path.join(root, key))
elif value is not None:
self.fail(msg="Directory structure specification is incorrect")
def get_files(indict, root=""):
"""Generate file names from dict."""
for key, value in indict.items():
if value is None:
yield os.path.join(root, key)
elif isinstance(value, dict) and value:
yield from get_files(value, os.path.join(root, key))
elif not isinstance(value, dict):
self.fail(msg="Directory structure specification is incorrect.")
correct_files = {file for file in get_files(dir_struct)}
correct_dirs = {correct_dir for correct_dir in get_dirs(dir_struct)}
if exact and (test_dirs != correct_dirs or test_files != correct_files):
self.fail(msg="Directory structure mismatch (exact check).")
if not exact and (correct_dirs - test_dirs or correct_files - test_files):
self.fail(msg="Directory structure mismatch (partial structure check).")
[docs] def assertDirStructure(self, obj, field_path, dir_struct, exact=True):
"""Assert correct tree structure in output field of given object.
Only names of directories and files are asserted. Content of files is
not compared.
:param obj: object that includes the directory to compare
:type obj: ~resolwe.flow.models.Data
:param str dir_path: path to the directory to compare
:param dict dir_struct: correct tree structure of the directory.
Dictionary keys are directory and file names with the correct nested
structure. Dictionary value associated with each directory is a new
dictionary which lists the content of the directory. Dictionary
value associated with each file name is ``None``
:param bool exact: if ``True`` tested directory structure must exactly
match `dir_struct`. If ``False`` `dir_struct` must be a partial
structure of the directory to compare
"""
self.assertDirExists(obj, field_path)
field = dict_dot(obj.output, field_path)
dir_path = obj.location.get_path(filename=field["dir"])
self._assert_dir_structure(dir_path, dir_struct, exact)
def _assert_dir(self, dir_path, fn_correct, fail_on_funny=True):
"""Compare directory `dir_path` to compressed directory `fn_correct`."""
correct_path = os.path.join(self.files_path, fn_correct)
if not os.path.isfile(correct_path):
with tarfile.open(correct_path, "w:gz") as f:
for content in os.listdir(dir_path):
f.add(os.path.join(dir_path, content), arcname=content)
self.fail(
msg="Compressed output directory {} missing so it was created.".format(
fn_correct
)
)
if not tarfile.is_tarfile(correct_path):
self.fail(msg="{} is not a tar file.".format(fn_correct))
with tempfile.TemporaryDirectory() as temp_dir:
with tarfile.open(correct_path) as tar:
tar.extractall(temp_dir)
cmp = filecmp.dircmp(dir_path, temp_dir)
if (
cmp.left_only
or cmp.right_only
or cmp.diff_files
or (fail_on_funny and cmp.funny_files)
):
self.fail(
msg="Directory {} content mismatch: {}.".format(
fn_correct, cmp.report()
)
)
[docs] def assertDir(self, obj, field_path, fn):
"""Compare process output directory to correct compressed directory.
:param obj: object that includes the directory to compare
:type obj: ~resolwe.flow.models.Data
:param str field_path: path to
:class:`~resolwe.flow.models.Data` object's field with the
file name
:param str fn: file name (and relative path) of the correct compressed
directory to compare against. Path should be relative to the
``tests/files`` directory of a Django application. Compressed
directory needs to be in ``tar.gz`` format.
"""
self.assertDirExists(obj, field_path)
field = dict_dot(obj.output, field_path)
dir_path = obj.location.get_path(filename=field["dir"])
self._assert_dir(dir_path, fn)
[docs] def assertJSON(self, obj, storage, field_path, file_name):
"""Compare JSON in Storage object to the given correct JSON.
:param obj: object to which the
:class:`~resolwe.flow.models.Storage` object belongs
:type obj: ~resolwe.flow.models.Data
:param storage: object or id which contains JSON to compare
:type storage: :class:`~resolwe.flow.models.Storage` or
:class:`str`
:param str field_path: path to JSON subset in the
:class:`~resolwe.flow.models.Storage`'s object to compare
against. If it is empty, the entire object will be
compared.
:param str file_name: file name (and relative path) of the file
with the correct JSON to compare against. Path should be
relative to the ``tests/files`` directory of a Django
application.
.. note::
The given JSON file should be compresed with *gzip* and
have the ``.gz`` extension.
"""
self.assertEqual(
os.path.splitext(file_name)[1], ".gz", msg="File extension must be .gz"
)
if not isinstance(storage, Storage):
storage = Storage.objects.get(pk=storage)
storage_obj = dict_dot(storage.json, field_path)
file_path = os.path.join(self.files_path, file_name)
if not os.path.isfile(file_path):
with gzip.open(file_path, mode="wt") as f:
json.dump(storage_obj, f)
self.fail(msg="Output file {} missing so it was created.".format(file_name))
with gzip.open(file_path, mode="rt") as f:
file_obj = json.load(f)
self.assertAlmostEqualGeneric(
storage_obj,
file_obj,
msg="Storage {} field '{}' does not match file {}".format(
storage.id, field_path, file_name
)
+ self._debug_info(obj),
)
def _debug_info(self, data):
"""Return data's debugging information."""
msg_header = "Debugging information for data object {}".format(data.pk)
msg = (
"\n\n"
+ len(msg_header) * "="
+ "\n"
+ msg_header
+ "\n"
+ len(msg_header) * "="
+ "\n"
)
path = data.location.get_path(filename="stdout.txt") if data.location else None
if path and os.path.isfile(path):
msg += "\nstdout.txt:\n" + 11 * "-" + "\n"
with io.open(path, mode="rt") as fn:
msg += fn.read()
if data.process_error:
msg += "\nProcess' errors:\n" + 16 * "-" + "\n"
msg += "\n".join(data.process_error)
if data.process_warning:
msg += "\nProcess' warnings:\n" + 18 * "-" + "\n"
msg += "\n".join(data.process_warning)
return msg