""".. Ignore pydocstyle D400.
========
Consumer
========
Manager Channels consumer.
"""
import asyncio
import logging
import os
from contextlib import suppress
from pathlib import Path
from channels.consumer import AsyncConsumer
from channels.db import database_sync_to_async
from channels.layers import get_channel_layer
from channels.testing import ApplicationCommunicator
from django.db import connection
from resolwe.utils import BraceMessage as __
from . import state
logger = logging.getLogger(__name__)
CHANNEL_HEALTH_CHECK = os.environ.get("HOSTNAME")
[docs]async def send_event(message):
"""Construct a Channels event packet with the given message.
:param message: The message to send to the manager workers.
"""
packet = {
"type": "control_event", # This is used as the method name in the consumer.
"content": message,
}
await get_channel_layer().send(state.MANAGER_CONTROL_CHANNEL, packet)
[docs]async def run_consumer(timeout=None):
"""Run the consumer until it finishes processing.
:param timeout: Set maximum execution time before cancellation, or
``None`` (default) for unlimited.
"""
manager_channel = state.MANAGER_CONTROL_CHANNEL
manager_scope = {
"type": "control_event",
"channel": manager_channel,
}
manager_app = ApplicationCommunicator(ManagerConsumer(), manager_scope)
channel_layer = get_channel_layer()
async def _consume_loop(channel, scope, app):
"""Run a loop to consume messages off the channels layer."""
message = await channel_layer.receive(channel)
while message.get("type", {}) != "_resolwe_manager_quit":
message.update(scope)
await app.send_input(message)
message = await channel_layer.receive(channel)
consume_future = asyncio.ensure_future(
_consume_loop(manager_channel, manager_scope, manager_app)
)
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(consume_future, timeout=timeout)
await manager_app.wait()
[docs]async def exit_consumer():
"""Cause the synchronous consumer to exit cleanly."""
packet = {
"type": "_resolwe_manager_quit",
}
await get_channel_layer().send(state.MANAGER_CONTROL_CHANNEL, packet)
[docs]class ManagerConsumer(AsyncConsumer):
"""Channels consumer for handling manager events."""
def __init__(self, *args, **kwargs):
"""Initialize a consumer instance with the given manager."""
# This import is local in order to avoid startup import loops.
from . import manager
self.manager = manager
[docs] async def control_event(self, message):
"""Forward control events to the manager dispatcher."""
logger.debug("control_event got message %s", message)
try:
await self.manager.handle_control_event(message["content"])
except:
logger.exception("control_event exception.")
[docs]class HealtCheckConsumer(AsyncConsumer):
"""Channels consumer for handling health-check events."""
[docs] async def health_check(self, message: dict):
"""Perform health check.
We are testing the channels layer and database layer. The channels
layer is already functioning if this method is called so we have to
perform database check.
If the check is successfull touch the file specified in the channels
message.
"""
logger.debug(__("Performing health check with message {}.", message))
path = Path(message["file"])
if await database_sync_to_async(self.check_database, thread_sensitive=False)():
logger.debug("Health check passed.")
path.touch(exist_ok=True)
[docs] def check_database(self) -> bool:
"""Perform a simple database check."""
with connection.cursor() as cursor:
cursor.execute("SELECT 1;")
result = cursor.fetchone()[0]
return result == 1