Page Menu
Home
Search
Configure Global Search
Log In
Files
F13402057
upstream_update_queue.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
7 KB
Subscribers
None
upstream_update_queue.py
View Options
"""Queues task updates to Flamenco Manager.
Task updates are pickled and stored in a SQLite database. Pickling allows
for efficient conversion of Python objects into a binary data blob.
"""
import
asyncio
import
pickle
import
sqlite3
import
typing
import
attr
from
.
import
attrs_extra
,
upstream
BACKOFF_TIME
=
5
# seconds
SHUTDOWN_RECHECK_TIME
=
0.5
# seconds
@attr.s
class
TaskUpdateQueue
:
manager
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
upstream
.
FlamencoManager
))
shutdown_future
=
attr
.
ib
(
validator
=
attr
.
validators
.
optional
(
attr
.
validators
.
instance_of
(
asyncio
.
Future
)))
db_fname
=
attr
.
ib
(
validator
=
attr
.
validators
.
instance_of
(
str
))
backoff_time
=
attr
.
ib
(
default
=
BACKOFF_TIME
)
shutdown_recheck_time
=
attr
.
ib
(
default
=
SHUTDOWN_RECHECK_TIME
)
_stuff_queued
=
attr
.
ib
(
default
=
attr
.
Factory
(
asyncio
.
Event
),
init
=
False
)
_db
=
attr
.
ib
(
default
=
None
,
init
=
False
)
_queue_lock
=
attr
.
ib
(
default
=
attr
.
Factory
(
asyncio
.
Lock
),
init
=
False
)
_log
=
attrs_extra
.
log
(
'
%s
.TaskUpdateQueue'
%
__name__
)
def
_connect_db
(
self
):
self
.
_log
.
info
(
'Connecting to database
%s
'
,
self
.
db_fname
)
self
.
_db
=
sqlite3
.
connect
(
self
.
db_fname
,
isolation_level
=
None
)
# We don't need to create a primary key; we use the implicit 'rowid' column.
self
.
_db
.
execute
(
'CREATE TABLE IF NOT EXISTS fworker_queue(url TEXT, payload BLOB)'
)
# Start with a more-or-less compact database.
self
.
_db
.
execute
(
'VACUUM'
)
# Now that that is out of the way, we can use the default SQLite behaviour again.
self
.
_db
.
isolation_level
=
''
def
_disconnect_db
(
self
):
self
.
_log
.
info
(
'Disconnecting from database
%s
'
,
self
.
db_fname
)
self
.
_db
.
close
()
self
.
_db
=
None
def
queue
(
self
,
url
,
payload
,
*
,
loop
:
asyncio
.
AbstractEventLoop
)
->
asyncio
.
Future
:
"""Push some payload onto the queue."""
if
self
.
_db
is
None
:
self
.
_connect_db
()
# Store the pickled payload in the SQLite database.
pickled
=
pickle
.
dumps
(
payload
)
async
def
do_db_push
():
self
.
_db
.
execute
(
'INSERT INTO fworker_queue (url, payload) values (?, ?)'
,
(
url
,
pickled
))
self
.
_db
.
commit
()
# Notify the work loop that stuff has been queued.
self
.
_stuff_queued
.
set
()
return
asyncio
.
ensure_future
(
do_db_push
(),
loop
=
loop
)
async
def
work
(
self
,
*
,
loop
=
None
):
"""Loop that pushes queued payloads to the Flamenco Manager.
Keeps running until shutdown_future.done() returns True.
"""
import
requests
# Always start by inspecting the persisted queue, so act as if something
# was just queued.
self
.
_stuff_queued
.
set
()
while
not
self
.
shutdown_future
.
done
():
try
:
await
asyncio
.
wait_for
(
self
.
_stuff_queued
.
wait
(),
self
.
shutdown_recheck_time
,
loop
=
loop
)
except
asyncio
.
TimeoutError
:
# This is normal, it just means that there wasn't anything queued within
# SHUTDOWN_RECHECK_TIME seconds.
continue
except
asyncio
.
CancelledError
:
# We're being shut down.
break
self
.
_log
.
debug
(
'Inspecting queued task updates.'
)
await
self
.
flush_and_catch
(
loop
=
loop
)
self
.
_log
.
warning
(
'Stopping work loop'
)
def
_queue
(
self
)
->
typing
.
Iterable
[
typing
.
Tuple
[
int
,
str
,
object
]]:
"""Yields (rowid, url, unpickled payload) tuples from the database."""
if
self
.
_db
is
None
:
self
.
_connect_db
()
result
=
self
.
_db
.
execute
(
'''
SELECT rowid, url, payload
FROM fworker_queue
ORDER BY rowid ASC
'''
)
for
row
in
result
:
rowid
=
row
[
0
]
url
=
row
[
1
]
payload
=
pickle
.
loads
(
row
[
2
])
yield
rowid
,
url
,
payload
def
_unqueue
(
self
,
rowid
:
int
):
"""Removes a queued payload from the database."""
# TODO Sybren: every once in a while, run 'vacuum' on the database.
self
.
_db
.
execute
(
'DELETE FROM fworker_queue WHERE rowid=?'
,
(
rowid
,))
self
.
_db
.
commit
()
async
def
flush
(
self
,
*
,
loop
:
asyncio
.
AbstractEventLoop
)
->
bool
:
"""Tries to flush the queue to the Manager.
Returns True iff the queue was empty, even before flushing.
"""
with
(
await
self
.
_queue_lock
):
queue_is_empty
=
True
for
rowid
,
url
,
payload
in
self
.
_queue
():
queue_is_empty
=
False
self
.
_log
.
info
(
'Pushing task update to Manager'
)
resp
=
await
self
.
manager
.
post
(
url
,
json
=
payload
,
loop
=
loop
)
if
resp
.
status_code
==
409
:
# The task was assigned to another worker, so we're not allowed to
# push updates for it. We have to un-queue this update, as it will
# never be accepted.
self
.
_log
.
warning
(
'discarding update, Manager says
%s
'
,
resp
.
text
)
else
:
resp
.
raise_for_status
()
self
.
_log
.
debug
(
'Master accepted pushed update.'
)
self
.
_unqueue
(
rowid
)
if
queue_is_empty
:
# Only clear the flag once the queue has really been cleared.
self
.
_stuff_queued
.
clear
()
return
queue_is_empty
async
def
flush_and_report
(
self
,
*
,
loop
:
asyncio
.
AbstractEventLoop
):
"""Flushes the queue, and just reports errors, doesn't wait nor retry."""
import
requests
self
.
_log
.
info
(
'flush_and_report: trying one last push to get updates to Manager'
)
try
:
await
self
.
flush
(
loop
=
loop
)
except
requests
.
ConnectionError
:
self
.
_log
.
warning
(
'flush_and_report: Unable to connect to Manager, '
'some items are still queued.'
)
except
requests
.
HTTPError
as
ex
:
self
.
_log
.
warning
(
'flush_and_report: Manager did not accept our updates (
%s
),'
' some items are still queued.'
,
ex
)
except
Exception
:
self
.
_log
.
exception
(
'flush_and_report: Unexpected exception, '
'Some items are still queued.'
)
async
def
flush_and_catch
(
self
,
*
,
loop
:
asyncio
.
AbstractEventLoop
):
"""Flushes the queue, reports errors and waits before returning for another try."""
import
requests
try
:
await
self
.
flush
(
loop
=
loop
)
except
requests
.
ConnectionError
:
self
.
_log
.
warning
(
'Unable to connect to Manager, will retry later.'
)
await
asyncio
.
sleep
(
self
.
backoff_time
)
except
requests
.
HTTPError
as
ex
:
self
.
_log
.
warning
(
'Manager did not accept our updates (
%s
), will retry later.'
,
ex
)
await
asyncio
.
sleep
(
self
.
backoff_time
)
except
Exception
:
self
.
_log
.
exception
(
'Unexpected exception in work loop. '
'Backing off and retring later.'
)
await
asyncio
.
sleep
(
self
.
backoff_time
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sun, Aug 21, 12:29 AM (2 d)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
59/4d/ae2043325c258c6cbba67192a5bc
Attached To
rFW Flamenco Worker
Event Timeline
Log In to Comment