Page MenuHome
No OneTemporary

"""Task management."""
import attr
import flask
import flask_login
import pillarsdk
from pillar import attrs_extra
from pillar.web.system_util import pillar_api
from attract import subquery
from attract.node_types.task import node_type_task
class TaskManager(object):
_log = attrs_extra.log('%s.TaskManager' % __name__)
def task_logged_in_svn(self, sender, task_id, log_entry):
"""Blinker signal receiver; connects the logged commit with the task.
:param sender: sender of the signal
:type sender: attract_server.subversion.CommitLogObserver
:param task_info: {'task_id': '123', 'log_entry': LogEntry} dict.
:type task_info: dict
""""Task '%s' logged in SVN: %s", task_id, log_entry)
def create_task(self, project, task_type=None, parent=None):
"""Creates a new task, owned by the current user.
:rtype: pillarsdk.Node
from pillar.web.jinja import format_undertitle
api = pillar_api()
node_type = project.get_node_type(node_type_task['name'])
if not node_type:
raise ValueError('Project %s not set up for Attract' % project._id)
node_props = dict(
name='New task',
'status': node_type['dyn_schema']['status']['default'],
if task_type:
node_props['name'] = format_undertitle(task_type)
node_props['properties']['task_type'] = task_type
if parent:
node_props['parent'] = parent
task = pillarsdk.Node(node_props)
return task
def edit_task(self, task_id, **fields):
"""Edits a task.
:type task_id: str
:type fields: dict
:rtype: pillarsdk.Node
api = pillar_api()
task = pillarsdk.Node.find(task_id, api=api)
task._etag = fields.pop('_etag') = fields.pop('name')
task.description = fields.pop('description') = fields.pop('status') = fields.pop('task_type', '').strip() or None
users = fields.pop('users', None) = {'users': users or []}'Saving task %s', task.to_dict())
if fields:
self._log.warning('edit_task(%r, ...) called with unknown fields %r; ignoring them.',
task_id, fields)
return task
def delete_task(self, task_id, etag):
api = pillar_api()'Deleting task %s', task_id)
task = pillarsdk.Node({'_id': task_id, '_etag': etag})
def tasks_for_user(self, user_id):
"""Returns the tasks for the given user.
:returns: {'_items': [task, task, ...], '_meta': {Eve metadata}}
api = pillar_api()
# TODO: also include tasks assigned to any of the user's groups.
tasks = pillarsdk.Node.all({
'where': {
'properties.assigned_to.users': user_id,
'node_type': node_type_task['name'],
}, api=api)
return tasks
def tasks_for_project(self, project_id):
"""Returns the tasks for the given project.
:returns: {'_items': [task, task, ...], '_meta': {Eve metadata}}
api = pillar_api()
tasks = pillarsdk.Node.all({
'where': {
'project': project_id,
'node_type': node_type_task['name'],
}}, api=api)
return tasks
def activities_for_task(self, task_id, max_results=20, page=1):
"""Returns a page of activities for the given task.
:returns: {'_items': [task, task, ...], '_meta': {Eve metadata}}
api = pillar_api()
activities = pillarsdk.Activity.all({
'where': {
'object_type': 'node',
'object': task_id,
'sort': [('_created', -1)],
'max_results': max_results,
'page': page,
}, api=api)
# Fetch more info for each activity.
for act in activities['_items']:
act.actor_user = subquery.get_user_info(act.actor_user)
return activities
def setup_app(app):
from . import eve_hooks

File Metadata

Mime Type
Fri, Jan 21, 10:31 PM (2 d)
Storage Engine
Storage Format
Raw Data
Storage Handle

Event Timeline