Page Menu
Home
Search
Configure Global Search
Log In
Files
F9595441
worker.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
29 KB
Subscribers
None
worker.py
View Options
import
asyncio
import
datetime
import
enum
import
pathlib
import
typing
import
attr
from
.
import
attrs_extra
from
.
import
documents
from
.
import
upstream
from
.
import
upstream_update_queue
# All durations/delays/etc are in seconds.
REGISTER_AT_MANAGER_FAILED_RETRY_DELAY
=
30
FETCH_TASK_FAILED_RETRY_DELAY
=
10
# when we failed obtaining a task
FETCH_TASK_EMPTY_RETRY_DELAY
=
5
# when there are no tasks to perform
FETCH_TASK_DONE_SCHEDULE_NEW_DELAY
=
3
# after a task is completed
PUSH_LOG_MAX_ENTRIES
=
1000
PUSH_LOG_MAX_INTERVAL
=
datetime
.
timedelta
(
seconds
=
30
)
PUSH_ACT_MAX_INTERVAL
=
datetime
.
timedelta
(
seconds
=
15
)
ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY
=
30
# If there are more than this number of queued task updates, we won't ask
# the Manager for another task to execute. Task execution is delayed until
# the queue size is below this threshold.
QUEUE_SIZE_THRESHOLD
=
10
class
UnableToRegisterError
(
Exception
):
"""Raised when the worker can't register at the manager.
Will cause an immediate shutdown.
"""
class
WorkerState
(
enum
.
Enum
):
STARTING
=
'starting'
AWAKE
=
'awake'
ASLEEP
=
'asleep'
SHUTTING_DOWN
=
'shutting-down'
@attr.s
class
FlamencoWorker
:
manager
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
upstream
.
FlamencoManager
))
trunner
=
attr
.
ib
()
# Instance of flamenco_worker.runner.TaskRunner
tuqueue
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
upstream_update_queue
.
TaskUpdateQueue
))
task_types
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
list
))
worker_id
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
str
))
worker_secret
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
str
))
loop
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
asyncio
.
AbstractEventLoop
))
shutdown_future
=
attr
.
ib
(
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
asyncio
.
Future
)))
state
=
attr
.
ib
(
default
=
WorkerState
.
STARTING
,
validator
=
attr
.
validators
.
instance_of
(
WorkerState
))
# Indicates the state in which the Worker should start
initial_state
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
str
),
default
=
'awake'
)
run_single_task
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
bool
),
default
=
False
)
# When Manager tells us we may no longer run our current task, this is set to True.
# As a result, the cancelled state isn't pushed to Manager any more. It is reset
# to False when a new task is started.
task_is_silently_aborting
=
attr
.
ib
(
default
=
False
,
init
=
False
,
validator
=
attr
.
validators
.
instance_of
(
bool
))
fetch_task_task
=
attr
.
ib
(
default
=
None
,
init
=
False
,
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
asyncio
.
Task
)))
asyncio_execution_task
=
attr
.
ib
(
default
=
None
,
init
=
False
,
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
asyncio
.
Task
)))
# See self.sleeping()
sleeping_task
=
attr
.
ib
(
default
=
None
,
init
=
False
,
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
asyncio
.
Task
)))
task_id
=
attr
.
ib
(
default
=
None
,
init
=
False
,
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
str
))
)
current_task_status
=
attr
.
ib
(
default
=
None
,
init
=
False
,
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
str
))
)
_queued_log_entries
=
attr
.
ib
(
default
=
attr
.
Factory
(
list
),
init
=
False
)
_queue_lock
=
attr
.
ib
(
default
=
attr
.
Factory
(
asyncio
.
Lock
),
init
=
False
)
last_log_push
=
attr
.
ib
(
default
=
attr
.
Factory
(
datetime
.
datetime
.
now
),
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
datetime
.
datetime
)))
last_activity_push
=
attr
.
ib
(
default
=
attr
.
Factory
(
datetime
.
datetime
.
now
),
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
datetime
.
datetime
)))
# Kept in sync with the task updates we send to upstream Manager, so that we can send
# a complete Activity each time.
last_task_activity
=
attr
.
ib
(
default
=
attr
.
Factory
(
documents
.
Activity
))
# Configuration
push_log_max_interval
=
attr
.
ib
(
default
=
PUSH_LOG_MAX_INTERVAL
,
validator
=
attr
.
validators
.
instance_of
(
datetime
.
timedelta
))
push_log_max_entries
=
attr
.
ib
(
default
=
PUSH_LOG_MAX_ENTRIES
,
validator
=
attr
.
validators
.
instance_of
(
int
))
push_act_max_interval
=
attr
.
ib
(
default
=
PUSH_ACT_MAX_INTERVAL
,
validator
=
attr
.
validators
.
instance_of
(
datetime
.
timedelta
))
# Futures that represent delayed calls to push_to_manager().
# They are scheduled when logs & activities are registered but not yet pushed. They are
# cancelled when a push_to_manager() actually happens for another reason. There are different
# futures for activity and log pushing, as these can have different max intervals.
_push_log_to_manager
=
attr
.
ib
(
default
=
None
,
init
=
False
,
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
asyncio
.
Future
)))
_push_act_to_manager
=
attr
.
ib
(
default
=
None
,
init
=
False
,
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
asyncio
.
Future
)))
# When the worker is shutting down, the currently running task will be
# handed back to the manager for re-scheduling. In such a situation,
# an abort is expected and acceptable.
failures_are_acceptable
=
attr
.
ib
(
default
=
False
,
init
=
False
,
validator
=
attr
.
validators
.
instance_of
(
bool
))
_log
=
attrs_extra
.
log
(
'
%s
.FlamencoWorker'
%
__name__
)
@property
def
active_task_id
(
self
)
->
typing
.
Optional
[
str
]:
"""Returns the task ID, but only if it is currently executing; returns None otherwise."""
if
self
.
asyncio_execution_task
is
None
or
self
.
asyncio_execution_task
.
done
():
return
None
return
self
.
task_id
async
def
startup
(
self
,
*
,
may_retry_loop
=
True
):
self
.
_log
.
info
(
'Starting up'
)
do_register
=
not
self
.
worker_id
or
not
self
.
worker_secret
if
do_register
:
await
self
.
register_at_manager
(
may_retry_loop
=
may_retry_loop
)
# Once we know our ID and secret, update the manager object so that we
# don't have to pass our authentication info each and every call.
self
.
manager
.
auth
=
(
self
.
worker_id
,
self
.
worker_secret
)
# We only need to sign on if we didn't just register. However, this
# can only happen after setting self.manager.auth.
if
not
do_register
:
await
self
.
signon
(
may_retry_loop
=
may_retry_loop
)
# If we're not supposed to start in 'awake' state, let the Manager know.
if
self
.
initial_state
!=
'awake'
:
self
.
_log
.
info
(
'Telling Manager we are in state
%r
'
,
self
.
initial_state
)
self
.
ack_status_change
(
self
.
initial_state
)
self
.
schedule_fetch_task
()
@staticmethod
def
hostname
()
->
str
:
import
socket
return
socket
.
gethostname
()
async
def
_keep_posting_to_manager
(
self
,
url
:
str
,
json
:
dict
,
*
,
use_auth
=
True
,
may_retry_loop
:
bool
):
import
requests
post_kwargs
=
{
'json'
:
json
,
'loop'
:
self
.
loop
,
}
if
not
use_auth
:
post_kwargs
[
'auth'
]
=
None
while
True
:
try
:
resp
=
await
self
.
manager
.
post
(
url
,
**
post_kwargs
)
resp
.
raise_for_status
()
except
requests
.
RequestException
as
ex
:
if
not
may_retry_loop
:
self
.
_log
.
error
(
'Unable to POST to manager
%s
:
%s
'
,
url
,
ex
)
raise
UnableToRegisterError
()
self
.
_log
.
warning
(
'Unable to POST to manager
%s
, retrying in
%i
seconds:
%s
'
,
url
,
REGISTER_AT_MANAGER_FAILED_RETRY_DELAY
,
ex
)
await
asyncio
.
sleep
(
REGISTER_AT_MANAGER_FAILED_RETRY_DELAY
)
else
:
return
resp
async
def
signon
(
self
,
*
,
may_retry_loop
:
bool
):
"""Signs on at the manager.
Only needed when we didn't just register.
"""
self
.
_log
.
info
(
'Signing on at manager.'
)
await
self
.
_keep_posting_to_manager
(
'/sign-on'
,
json
=
{
'supported_task_types'
:
self
.
task_types
,
'nickname'
:
self
.
hostname
(),
},
may_retry_loop
=
may_retry_loop
,
)
self
.
_log
.
info
(
'Manager accepted sign-on.'
)
async
def
register_at_manager
(
self
,
*
,
may_retry_loop
:
bool
):
self
.
_log
.
info
(
'Registering at manager'
)
self
.
worker_secret
=
generate_secret
()
platform
=
detect_platform
()
resp
=
await
self
.
_keep_posting_to_manager
(
'/register-worker'
,
json
=
{
'secret'
:
self
.
worker_secret
,
'platform'
:
platform
,
'supported_task_types'
:
self
.
task_types
,
'nickname'
:
self
.
hostname
(),
},
use_auth
=
False
,
# explicitly do not use authentication
may_retry_loop
=
may_retry_loop
,
)
result
=
resp
.
json
()
self
.
_log
.
info
(
'Response:
%s
'
,
result
)
self
.
worker_id
=
result
[
'_id'
]
self
.
write_registration_info
()
def
write_registration_info
(
self
):
"""Writes the current worker ID and secret to the home dir."""
from
.
import
config
config
.
merge_with_home_config
({
'worker_id'
:
self
.
worker_id
,
'worker_secret'
:
self
.
worker_secret
,
})
def
mainloop
(
self
):
self
.
_log
.
info
(
'Entering main loop'
)
# TODO: add "watchdog" task that checks the asyncio loop and ensures there is
# always either a task being executed or a task fetch scheduled.
self
.
loop
.
run_forever
()
def
schedule_fetch_task
(
self
,
delay
=
0
):
"""Schedules a task fetch.
If a task fetch was already queued, that one is cancelled.
:param delay: delay in seconds, after which the task fetch will be performed.
"""
# The current task may still be running, as fetch_task() calls schedule_fetch_task() to
# schedule a future run. This may result in the task not being awaited when we are
# shutting down.
if
self
.
shutdown_future
.
done
():
self
.
_log
.
warning
(
'Shutting down, not scheduling another fetch-task task.'
)
return
self
.
fetch_task_task
=
asyncio
.
ensure_future
(
self
.
fetch_task
(
delay
),
loop
=
self
.
loop
)
async
def
stop_current_task
(
self
):
"""Stops the current task by canceling the AsyncIO task.
This causes a CancelledError in the self.fetch_task() function, which then takes care
of the task status change and subsequent activity push.
"""
if
not
self
.
asyncio_execution_task
or
self
.
asyncio_execution_task
.
done
():
self
.
_log
.
warning
(
'stop_current_task() called but no task is running.'
)
return
self
.
_log
.
warning
(
'Stopping task
%s
'
,
self
.
task_id
)
self
.
task_is_silently_aborting
=
True
try
:
await
self
.
trunner
.
abort_current_task
()
except
asyncio
.
CancelledError
:
self
.
_log
.
info
(
'asyncio task was canceled for task runner task
%s
'
,
self
.
task_id
)
self
.
asyncio_execution_task
.
cancel
()
await
self
.
register_log
(
'Worker
%s
stopped running this task,'
' no longer allowed to run by Manager'
,
self
.
worker_id
)
await
self
.
push_to_manager
()
await
self
.
tuqueue
.
flush_and_report
(
loop
=
self
.
loop
)
def
shutdown
(
self
):
"""Gracefully shuts down any asynchronous tasks."""
self
.
_log
.
warning
(
'Shutting down'
)
self
.
state
=
WorkerState
.
SHUTTING_DOWN
self
.
failures_are_acceptable
=
True
self
.
stop_fetching_tasks
()
self
.
stop_sleeping
()
# Stop the task runner
self
.
loop
.
run_until_complete
(
self
.
trunner
.
abort_current_task
())
# Queue anything that should still be pushed to the Manager
push_act_sched
=
self
.
_push_act_to_manager
is
not
None
\
and
not
self
.
_push_act_to_manager
.
done
()
push_log_sched
=
self
.
_push_log_to_manager
is
not
None
\
and
not
self
.
_push_log_to_manager
.
done
()
if
push_act_sched
or
push_log_sched
:
# Try to push queued task updates to manager before shutting down
self
.
_log
.
info
(
'shutdown(): pushing queued updates to manager'
)
self
.
loop
.
run_until_complete
(
self
.
push_to_manager
())
# Try to do a final push of queued updates to the Manager.
self
.
loop
.
run_until_complete
(
self
.
tuqueue
.
flush_and_report
(
loop
=
self
.
loop
))
# Let the Manager know we're shutting down
self
.
_log
.
info
(
'shutdown(): signing off at Manager'
)
try
:
self
.
loop
.
run_until_complete
(
self
.
manager
.
post
(
'/sign-off'
,
loop
=
self
.
loop
))
except
Exception
as
ex
:
self
.
_log
.
warning
(
'Error signing off. Continuing with shutdown.
%s
'
,
ex
)
self
.
failures_are_acceptable
=
False
def
stop_fetching_tasks
(
self
):
"""Stops the delayed task-fetching from running.
Used in shutdown and when we're going to status 'asleep'.
"""
if
self
.
fetch_task_task
is
None
or
self
.
fetch_task_task
.
done
():
return
self
.
_log
.
info
(
'stopping task fetching task
%s
'
,
self
.
fetch_task_task
)
self
.
fetch_task_task
.
cancel
()
# This prevents a 'Task was destroyed but it is pending!' warning on the console.
# Sybren: I've only seen this in unit tests, so maybe this code should be moved
# there, instead.
try
:
if
not
self
.
loop
.
is_running
():
self
.
loop
.
run_until_complete
(
self
.
fetch_task_task
)
except
asyncio
.
CancelledError
:
pass
async
def
fetch_task
(
self
,
delay
:
float
):
"""Fetches a single task to perform from Flamenco Manager, and executes it.
:param delay: waits this many seconds before fetching a task.
"""
import
traceback
import
requests
self
.
state
=
WorkerState
.
AWAKE
self
.
_cleanup_state_for_new_task
()
self
.
_log
.
debug
(
'Going to fetch task in
%s
seconds'
,
delay
)
await
asyncio
.
sleep
(
delay
)
# Prevent outgoing queue overflowing by waiting until it's below the
# threshold before starting another task.
# TODO(sybren): introduce another worker state for this, and handle there.
with
(
await
self
.
_queue_lock
):
queue_size
=
self
.
tuqueue
.
queue_size
()
if
queue_size
>
QUEUE_SIZE_THRESHOLD
:
self
.
_log
.
info
(
'Task Update Queue size too large (
%d
>
%d
), waiting until it shrinks.'
,
queue_size
,
QUEUE_SIZE_THRESHOLD
)
self
.
schedule_fetch_task
(
FETCH_TASK_FAILED_RETRY_DELAY
)
return
# TODO: use exponential backoff instead of retrying every fixed N seconds.
self
.
_log
.
debug
(
'Fetching task'
)
try
:
resp
=
await
self
.
manager
.
post
(
'/task'
,
loop
=
self
.
loop
)
except
requests
.
exceptions
.
RequestException
as
ex
:
self
.
_log
.
warning
(
'Error fetching new task, will retry in
%i
seconds:
%s
'
,
FETCH_TASK_FAILED_RETRY_DELAY
,
ex
)
self
.
schedule_fetch_task
(
FETCH_TASK_FAILED_RETRY_DELAY
)
return
if
resp
.
status_code
==
204
:
self
.
_log
.
debug
(
'No tasks available, will retry in
%i
seconds.'
,
FETCH_TASK_EMPTY_RETRY_DELAY
)
self
.
schedule_fetch_task
(
FETCH_TASK_EMPTY_RETRY_DELAY
)
return
if
resp
.
status_code
==
423
:
status_change
=
documents
.
StatusChangeRequest
(
**
resp
.
json
())
self
.
_log
.
info
(
'status change to
%r
requested when fetching new task'
,
status_change
.
status_requested
)
self
.
change_status
(
status_change
.
status_requested
)
return
if
resp
.
status_code
!=
200
:
self
.
_log
.
warning
(
'Error
%i
fetching new task, will retry in
%i
seconds.'
,
resp
.
status_code
,
FETCH_TASK_FAILED_RETRY_DELAY
)
self
.
schedule_fetch_task
(
FETCH_TASK_FAILED_RETRY_DELAY
)
return
task_info
=
resp
.
json
()
self
.
task_id
=
task_info
[
'_id'
]
self
.
_log
.
info
(
'Received task:
%s
'
,
self
.
task_id
)
self
.
_log
.
debug
(
'Received task:
%s
'
,
task_info
)
try
:
await
self
.
register_task_update
(
task_status
=
'active'
)
self
.
asyncio_execution_task
=
asyncio
.
ensure_future
(
self
.
trunner
.
execute
(
task_info
,
self
),
loop
=
self
.
loop
)
ok
=
await
self
.
asyncio_execution_task
if
ok
:
await
self
.
register_task_update
(
task_status
=
'completed'
,
activity
=
'Task completed'
,
)
elif
self
.
failures_are_acceptable
:
self
.
_log
.
warning
(
'Task
%s
failed, but ignoring it since we are shutting down.'
,
self
.
task_id
)
else
:
self
.
_log
.
error
(
'Task
%s
failed'
,
self
.
task_id
)
await
self
.
register_task_update
(
task_status
=
'failed'
)
except
asyncio
.
CancelledError
:
if
self
.
failures_are_acceptable
:
self
.
_log
.
warning
(
'Task
%s
was cancelled, but ignoring it since '
'we are shutting down.'
,
self
.
task_id
)
elif
self
.
task_is_silently_aborting
:
self
.
_log
.
warning
(
'Task
%s
was cancelled, but ignoring it since '
'we are no longer allowed to run it.'
,
self
.
task_id
)
else
:
self
.
_log
.
warning
(
'Task
%s
was cancelled'
,
self
.
task_id
)
await
self
.
register_task_update
(
task_status
=
'canceled'
,
activity
=
'Task was canceled'
)
except
Exception
as
ex
:
self
.
_log
.
exception
(
'Uncaught exception executing task
%s
'
%
self
.
task_id
)
try
:
# Such a failure will always result in a failed task, even when
# self.failures_are_acceptable = True; only expected failures are
# acceptable then.
with
(
await
self
.
_queue_lock
):
self
.
_queued_log_entries
.
append
(
traceback
.
format_exc
())
await
self
.
register_task_update
(
task_status
=
'failed'
,
activity
=
'Uncaught exception:
%s
%s
'
%
(
type
(
ex
)
.
__name__
,
ex
),
)
except
Exception
:
self
.
_log
.
exception
(
'While notifying manager of failure, another error happened.'
)
finally
:
if
self
.
run_single_task
:
self
.
_log
.
info
(
'Running in single-task mode, exiting.'
)
self
.
go_to_state_shutdown
()
return
if
self
.
state
==
WorkerState
.
AWAKE
:
# Schedule a new task run unless shutting down or sleeping; after a little delay to
# not hammer the world when we're in some infinite failure loop.
self
.
schedule_fetch_task
(
FETCH_TASK_DONE_SCHEDULE_NEW_DELAY
)
def
_cleanup_state_for_new_task
(
self
):
"""Cleans up internal state to prepare for a new task to be executed."""
self
.
last_task_activity
=
documents
.
Activity
()
self
.
task_is_silently_aborting
=
False
self
.
current_task_status
=
''
async
def
push_to_manager
(
self
,
*
,
delay
:
datetime
.
timedelta
=
None
):
"""Updates a task's status and activity.
Uses the TaskUpdateQueue to handle persistent queueing.
"""
if
delay
is
not
None
:
delay_sec
=
delay
.
total_seconds
()
self
.
_log
.
debug
(
'Scheduled delayed push to manager in
%r
seconds'
,
delay_sec
)
await
asyncio
.
sleep
(
delay_sec
)
if
self
.
shutdown_future
.
done
():
self
.
_log
.
info
(
'Shutting down, not pushing changes to manager.'
)
self
.
_log
.
info
(
'Updating task
%s
with status
%r
and activity
%r
'
,
self
.
task_id
,
self
.
current_task_status
,
self
.
last_task_activity
)
if
self
.
task_is_silently_aborting
:
self
.
_log
.
info
(
'push_to_manager: task is silently aborting, will only push logs'
)
payload
=
{}
else
:
payload
=
attr
.
asdict
(
self
.
last_task_activity
)
if
self
.
current_task_status
:
payload
[
'task_status'
]
=
self
.
current_task_status
now
=
datetime
.
datetime
.
now
()
self
.
last_activity_push
=
now
# Cancel any pending push task, as we're pushing activities now.
if
self
.
_push_act_to_manager
is
not
None
:
self
.
_push_act_to_manager
.
cancel
()
with
(
await
self
.
_queue_lock
):
if
self
.
_queued_log_entries
:
payload
[
'log'
]
=
'
\n
'
.
join
(
self
.
_queued_log_entries
)
self
.
_queued_log_entries
.
clear
()
self
.
last_log_push
=
now
# Cancel any pending push task, as we're pushing logs now.
if
self
.
_push_log_to_manager
is
not
None
:
self
.
_push_log_to_manager
.
cancel
()
if
not
payload
:
self
.
_log
.
debug
(
'push_to_manager: nothing to push'
)
return
self
.
tuqueue
.
queue
(
'/tasks/
%s
/update'
%
self
.
task_id
,
payload
)
async
def
register_task_update
(
self
,
*
,
task_status
:
str
=
None
,
**
kwargs
):
"""Stores the task status and activity, and possibly sends to Flamenco Manager.
If the last update to Manager was long enough ago, or the task status changed,
the info is sent to Manager. This way we can update command progress percentage
hundreds of times per second, without worrying about network overhead.
"""
self
.
_log
.
debug
(
'Task update: task_status=
%s
,
%s
'
,
task_status
,
kwargs
)
# Update the current activity
for
key
,
value
in
kwargs
.
items
():
setattr
(
self
.
last_task_activity
,
key
,
value
)
if
task_status
is
None
:
task_status_changed
=
False
else
:
task_status_changed
=
self
.
current_task_status
!=
task_status
self
.
current_task_status
=
task_status
if
task_status_changed
:
self
.
_log
.
info
(
'Task changed status to
%s
, pushing to manager'
,
task_status
)
await
self
.
push_to_manager
()
elif
datetime
.
datetime
.
now
()
-
self
.
last_activity_push
>
self
.
push_act_max_interval
:
self
.
_log
.
info
(
'More than
%s
since last activity update, pushing to manager'
,
self
.
push_act_max_interval
)
await
self
.
push_to_manager
()
elif
self
.
_push_act_to_manager
is
None
or
self
.
_push_act_to_manager
.
done
():
# Schedule a future push to manager.
self
.
_push_act_to_manager
=
asyncio
.
ensure_future
(
self
.
push_to_manager
(
delay
=
self
.
push_act_max_interval
))
async
def
register_log
(
self
,
log_entry
,
*
fmt_args
):
"""Registers a log entry, and possibly sends all queued log entries to upstream Manager.
Supports variable arguments, just like the logger.{info,warn,error}(...) family
of methods.
"""
from
.
import
tz
if
fmt_args
:
log_entry
%=
fmt_args
now
=
datetime
.
datetime
.
now
(
tz
.
tzutc
())
.
isoformat
()
with
(
await
self
.
_queue_lock
):
self
.
_queued_log_entries
.
append
(
'
%s
:
%s
'
%
(
now
,
log_entry
))
queue_size
=
len
(
self
.
_queued_log_entries
)
if
queue_size
>
self
.
push_log_max_entries
:
self
.
_log
.
info
(
'Queued up
%i
>
%i
log entries, pushing to manager'
,
queue_size
,
self
.
push_log_max_entries
)
await
self
.
push_to_manager
()
elif
datetime
.
datetime
.
now
()
-
self
.
last_log_push
>
self
.
push_log_max_interval
:
self
.
_log
.
info
(
'More than
%s
since last log update, pushing to manager'
,
self
.
push_log_max_interval
)
await
self
.
push_to_manager
()
elif
self
.
_push_log_to_manager
is
None
or
self
.
_push_log_to_manager
.
done
():
# Schedule a future push to manager.
self
.
_push_log_to_manager
=
asyncio
.
ensure_future
(
self
.
push_to_manager
(
delay
=
self
.
push_log_max_interval
))
def
output_produced
(
self
,
*
paths
:
typing
.
Union
[
str
,
pathlib
.
PurePath
]):
"""Registers a produced output (e.g. rendered frame) with the manager.
This performs a HTTP POST in a background task, returning as soon as
the task is scheduled.
"""
async
def
do_post
():
try
:
self
.
_log
.
info
(
'Sending
%i
path(s) to Manager'
,
len
(
paths
))
resp
=
await
self
.
manager
.
post
(
'/output-produced'
,
json
=
{
'paths'
:
[
str
(
p
)
for
p
in
paths
]},
loop
=
self
.
loop
)
if
resp
.
status_code
==
204
:
self
.
_log
.
info
(
'Manager accepted our output notification for
%s
'
,
paths
)
else
:
self
.
_log
.
warning
(
'Manager rejected our output notification:
%d
%s
'
,
resp
.
status_code
,
resp
.
text
)
except
Exception
:
self
.
_log
.
exception
(
'error POSTing to manager /output-produced'
)
self
.
loop
.
create_task
(
do_post
())
def
change_status
(
self
,
new_status
:
str
):
"""Called whenever the Flamenco Manager has a change in current status for us."""
self
.
_log
.
info
(
'Manager requested we go to status
%r
'
,
new_status
)
status_change_handlers
=
{
'asleep'
:
self
.
go_to_state_asleep
,
'awake'
:
self
.
go_to_state_awake
,
'shutdown'
:
self
.
go_to_state_shutdown
,
}
try
:
handler
=
status_change_handlers
[
new_status
]
except
KeyError
:
self
.
_log
.
error
(
'We have no way to go to status
%r
, going to sleep instead'
,
new_status
)
handler
=
self
.
go_to_state_asleep
handler
()
def
ack_status_change
(
self
,
new_status
:
str
)
->
asyncio
.
Task
:
"""Confirm that we're now in a certain state.
This ACK can be given without a request from the server, for example to support
state changes originating from UNIX signals.
"""
try
:
post
=
self
.
manager
.
post
(
'/ack-status-change/
%s
'
%
new_status
,
loop
=
self
.
loop
)
return
self
.
loop
.
create_task
(
post
)
except
Exception
:
self
.
_log
.
exception
(
'unable to notify Manager'
)
def
go_to_state_asleep
(
self
):
"""Starts polling for wakeup calls."""
self
.
_log
.
info
(
'Going to sleep'
)
self
.
state
=
WorkerState
.
ASLEEP
self
.
stop_fetching_tasks
()
self
.
sleeping_task
=
self
.
loop
.
create_task
(
self
.
sleeping
())
self
.
_log
.
debug
(
'Created task
%s
'
,
self
.
sleeping_task
)
self
.
ack_status_change
(
'asleep'
)
def
go_to_state_awake
(
self
):
"""Restarts the task-fetching asyncio task."""
self
.
_log
.
info
(
'Waking up'
)
self
.
state
=
WorkerState
.
AWAKE
self
.
stop_sleeping
()
self
.
schedule_fetch_task
(
3
)
self
.
ack_status_change
(
'awake'
)
def
go_to_state_shutdown
(
self
):
"""Shuts down the Flamenco Worker.
Whether it comes back up depends on the environment. For example,
using systemd on Linux with Restart=always will do this.
"""
self
.
_log
.
info
(
'Shutting down by request of the Manager or due to single-task mode'
)
self
.
state
=
WorkerState
.
SHUTTING_DOWN
# Don't bother acknowledging this status, as we'll push an "offline" status anyway.
# This also makes sure that when we're asleep and told to shut down, the Manager
# sees an asleep → offline status change, and can remember that we should go back
# to asleep status when we come back online.
self
.
loop
.
stop
()
def
stop_sleeping
(
self
):
"""Stops the asyncio task for sleeping."""
if
self
.
sleeping_task
is
None
or
self
.
sleeping_task
.
done
():
return
self
.
sleeping_task
.
cancel
()
try
:
self
.
sleeping_task
.
result
()
except
(
asyncio
.
CancelledError
,
asyncio
.
InvalidStateError
):
pass
except
Exception
:
self
.
_log
.
exception
(
'Unexpected exception in sleeping() task.'
)
async
def
sleeping
(
self
):
"""Regularly polls the Manager to see if we're allowed to wake up again."""
while
True
:
try
:
await
asyncio
.
sleep
(
ASLEEP_POLL_STATUS_CHANGE_REQUESTED_DELAY
)
resp
=
await
self
.
manager
.
get
(
'/status-change'
,
loop
=
self
.
loop
)
if
resp
.
status_code
==
204
:
# No change, don't do anything
self
.
_log
.
debug
(
'status the same, continuing sleeping'
)
elif
resp
.
status_code
==
200
:
# There is a status change
self
.
_log
.
debug
(
'/status-change:
%s
'
,
resp
.
json
())
new_status
=
resp
.
json
()[
'status_requested'
]
self
.
change_status
(
new_status
)
return
else
:
self
.
_log
.
error
(
'Error
%d
trying to fetch /status-change on Manager, will retry later.'
,
resp
.
status_code
)
except
asyncio
.
CancelledError
:
self
.
_log
.
info
(
'Sleeping ended'
)
return
except
:
self
.
_log
.
exception
(
'problems while sleeping'
)
def
generate_secret
()
->
str
:
"""Generates a 64-character secret key."""
import
random
import
string
randomizer
=
random
.
SystemRandom
()
tokens
=
string
.
ascii_letters
+
string
.
digits
secret
=
''
.
join
(
randomizer
.
choice
(
tokens
)
for
_
in
range
(
64
))
return
secret
def
detect_platform
()
->
str
:
"""Detects the platform, returning 'linux', 'windows' or 'darwin'.
Raises an exception when the current platform cannot be detected
as one of those three.
"""
import
platform
plat
=
platform
.
system
()
.
lower
()
if
not
plat
:
raise
EnvironmentError
(
'Unable to determine platform.'
)
if
plat
in
{
'linux'
,
'windows'
,
'darwin'
}:
return
plat
raise
EnvironmentError
(
'Unable to determine platform; unknown platform
%r
'
,
plat
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Tue, Jan 26, 4:21 PM (1 d, 23 h)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
82/f7/3f2575cc15b0a722695482320e11
Attached To
rFW Flamenco Worker
Event Timeline
Log In to Comment