Page MenuHome
No OneTemporary

"""Task runner."""
import asyncio
import json
import typing
import attr
from . import attrs_extra
from . import json_encoder, timing, worker
class TaskRunner:
"""Runs tasks, sending updates back to the worker."""
shutdown_future = attr.ib(validator=attr.validators.instance_of(asyncio.Future))
subprocess_pid_file = attr.ib(validator=attr.validators.instance_of(str))
last_command_idx = attr.ib(default=0, init=False)
_log = attrs_extra.log('%s.TaskRunner' % __name__)
def __attrs_post_init__(self):
self.current_command = None
self._aggr_timing_info = timing.Timing()
async def execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
"""Executes a task, returns True iff the entire task was run succesfully."""
self._aggr_timing_info = timing.Timing()
return await self._execute(task, fworker)
await self.log_recorded_timings(fworker)
async def _execute(self, task: dict, fworker: worker.FlamencoWorker) -> bool:
from .commands import command_handlers
task_id = task['_id']
for cmd_idx, cmd_info in enumerate(task['commands']):
self.last_command_idx = cmd_idx
# Figure out the command name
cmd_name = cmd_info.get('name')
if not cmd_name:
raise ValueError('Command %i of task %s has no name' % (cmd_idx, task_id))
cmd_settings = cmd_info.get('settings')
if cmd_settings is None or not isinstance(cmd_settings, dict):
raise ValueError('Command %i of task %s has malformed settings %r' %
(cmd_idx, task_id, cmd_settings))
# Find the handler class
cmd_cls = command_handlers[cmd_name]
except KeyError:
raise ValueError('Command %i of task %s has unknown command name %r' %
(cmd_idx, task_id, cmd_name))
# Construct & execute the handler.
cmd = cmd_cls(
self.current_command = cmd
success = await
# Add the timings of this command to the aggregated timing info.
self._aggr_timing_info += cmd.timing
if not success:
self._log.warning('Command %i of task %s was not succesful, aborting task.',
cmd_idx, task_id)
return False'Task %s completed succesfully.', task_id)
self.current_command = None
return True
async def abort_current_task(self):
"""Aborts the current task by aborting the currently running command.
Asynchronous, because a subprocess has to be wait()ed upon before returning.
if self.current_command is None:'abort_current_task: no command running, nothing to abort.')
self._log.warning('abort_current_task: Aborting command %s', self.current_command)
await self.current_command.abort()
async def log_recorded_timings(self, fworker: worker.FlamencoWorker) -> None:
"""Send the timing to the task log."""
timing_info = self._aggr_timing_info
if not timing_info:
as_json = json.dumps(timing_info, cls=json_encoder.JSONEncoder)
log_line = f'Aggregated task timing info: {as_json}'
await fworker.register_log(log_line)
def aggr_timing_info(self) -> timing.Timing:
return self._aggr_timing_info

File Metadata

Mime Type
Sat, Dec 10, 12:58 PM (1 d, 23 h)
Storage Engine
Storage Format
Raw Data
Storage Handle

Event Timeline