"""Task runner."""
import asyncio
import attr
from . import attrs_extra
from . import worker
class TaskRunner:
"""Runs tasks, sending updates back to the worker."""
shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future))
subprocess_pid_file = attr.ib(validator=attr.validators.instance_of(str))
last_command_idx = attr.ib(default=0, init=False)
_log = attrs_extra.log('%s.TaskRunner' % __name__)
def __attrs_post_init__(self):
self.current_command = None
async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
"""Executes a task, returns True iff the entire task was run succesfully."""
from .commands import command_handlers
task_id = task['_id']
for cmd_idx, cmd_info in enumerate(task['commands']):
self.last_command_idx = cmd_idx
# Figure out the command name
cmd_name = cmd_info.get('name')
if not cmd_name:
raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id))
cmd_settings = cmd_info.get('settings')
if cmd_settings is None or not isinstance(cmd_settings, dict):
raise ValueError('Command %i of task %s has malformed settings %r' %
(cmd_idx, task_id, cmd_settings))
# Find the handler class
cmd_cls = command_handlers[cmd_name]
except KeyError:
raise ValueError('Command %i of task %s has unknown command name %r' %
(cmd_idx, task_id, cmd_name))
# Construct & execute the handler.
cmd = cmd_cls(
self.current_command = cmd
success = await
if not success:
self._log.warning('Command %i of task %s was not succesful, aborting task.',
cmd_idx, task_id)
return False'Task %s completed succesfully.', task_id)
self.current_command = None
return True
async def abort_current_task(self):
"""Aborts the current task by aborting the currently running command.
Asynchronous, because a subprocess has to be wait()ed upon before returning.
if self.current_command is None:'abort_current_task: no command running, nothing to abort.')
self._log.warning('abort_current_task: Aborting command %s', self.current_command)
await self.current_command.abort()

