Page MenuHome

task_manager.py
No OneTemporary

File Metadata

Created
Thu, Oct 17, 12:16 AM

task_manager.py

"""Task management."""
import attr
import flask_login
import pillarsdk
from pillar.web.system_util import pillar_api
from . import attrs_extra
from .node_types.task import node_type_task
@attr.s
class TaskManager(object):
_log = attrs_extra.log('%s.TaskManager' % __name__)
def task_logged_in_svn(self, sender, task_id, log_entry):
"""Blinker signal receiver; connects the logged commit with the task.
:param sender: sender of the signal
:type sender: attract_server.subversion.CommitLogObserver
:param task_info: {'task_id': '123', 'log_entry': LogEntry} dict.
:type task_info: dict
"""
self._log.info("Task '%s' logged in SVN: %s", task_id, log_entry)
def create_task(self, project, task_type=None, parent=None):
"""Creates a new task, owned by the current user.
:rtype: pillarsdk.Node
"""
api = pillar_api()
node_type = project.get_node_type(node_type_task['name'])
if not node_type:
raise ValueError('Project %s not set up for Attract' % project._id)
node_props = dict(
name='New task',
project=project['_id'],
user=flask_login.current_user.objectid,
node_type=node_type['name'],
properties={
'status': node_type['dyn_schema']['status']['default'],
},
)
if task_type:
node_props['properties']['task_type'] = task_type
if parent:
node_props['parent'] = parent
task = pillarsdk.Node(node_props)
task.create(api=api)
return task
def edit_task(self, task_id, **fields):
"""Edits a task.
:type task_id: str
:type fields: dict
:rtype: pillarsdk.Node
"""
api = pillar_api()
task = pillarsdk.Node.find(task_id, api=api)
task.name = fields.pop('name')
task.description = fields.pop('description')
task.properties.status = fields.pop('status')
task.properties.task_type = fields.pop('task_type', '').strip() or None
users = fields.pop('users', None)
if users:
task.properties.assigned_to = {'users': users}
self._log.info('Saving task %s', task.to_dict())
if fields:
self._log.warning('edit_task(%r, ...) called with unknown fields %r; ignoring them.',
task_id, fields)
task.update(api=api)
return task

Event Timeline