Source code for

""".. Ignore pydocstyle D400.

.. autoclass::

import asyncio
import contextlib
import logging
import os
import shlex
from asyncio import subprocess

from import BaseFlowExecutor

logger = logging.getLogger(__name__)

[docs]class FlowExecutor(BaseFlowExecutor): """Local dataflow executor proxy.""" name = "local" def __init__(self, *args, **kwargs): """Initialize attributes.""" super().__init__(*args, **kwargs) self.kill_delay = 5 self.proc = None self.stdout = None self.command = "/bin/sh" async def start(self): """Start process execution.""" # Workaround for pylint issue #1469 # ( self.proc = await subprocess.create_subprocess_exec( *shlex.split(self.command), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) self.stdout = self.proc.stdout return async def run_script(self, script): """Execute the script and save results.""" script = os.linesep.join(["set -x", "set +B", script, "exit"]) + os.linesep self.proc.stdin.write(script.encode("utf-8")) await self.proc.stdin.drain() self.proc.stdin.close() async def end(self): """End process execution.""" await self.proc.wait() return self.proc.returncode async def terminate(self): """Terminate a running script.""" try: self.proc.terminate() except ProcessLookupError: # Process has already been terminated. pass else: with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for(self.proc.wait(), self.kill_delay) if self.proc.returncode is None: self.proc.kill() await self.proc.wait() await super().terminate()