Page Menu
Home
Search
Configure Global Search
Log In
Files
F13399991
moving.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
5 KB
Subscribers
None
moving.py
View Options
"""Code for moving files between backends."""
import
datetime
import
logging
import
os
import
tempfile
from
bson
import
ObjectId
import
bson.tz_util
from
flask
import
current_app
import
requests
import
requests.exceptions
from
.
import
stream_to_gcs
,
generate_all_links
,
ensure_valid_link
__all__
=
[
'PrerequisiteNotMetError'
,
'change_file_storage_backend'
]
log
=
logging
.
getLogger
(
__name__
)
class
PrerequisiteNotMetError
(
RuntimeError
):
"""Raised when a file cannot be moved due to unmet prerequisites."""
def
change_file_storage_backend
(
file_id
,
dest_backend
):
"""Given a file document, move it to the specified backend (if not already
there) and update the document to reflect that.
Files on the original backend are not deleted automatically.
"""
dest_backend
=
unicode
(
dest_backend
)
file_id
=
ObjectId
(
file_id
)
# Fetch file document
files_collection
=
current_app
.
data
.
driver
.
db
[
'files'
]
f
=
files_collection
.
find_one
(
file_id
)
if
f
is
None
:
raise
ValueError
(
'File with _id: {} not found'
.
format
(
file_id
))
# Check that new backend differs from current one
if
dest_backend
==
f
[
'backend'
]:
raise
PrerequisiteNotMetError
(
'Destination backend ({}) matches the current backend, we '
'are not moving the file'
.
format
(
dest_backend
))
# TODO Check that new backend is allowed (make conf var)
# Check that the file has a project; without project, we don't know
# which bucket to store the file into.
try
:
project_id
=
f
[
'project'
]
except
KeyError
:
raise
PrerequisiteNotMetError
(
'File document does not have a project'
)
# Ensure that all links are up to date before we even attempt a download.
ensure_valid_link
(
f
)
# Upload file and variations to the new backend
variations
=
f
.
get
(
'variations'
,
())
try
:
copy_file_to_backend
(
file_id
,
project_id
,
f
,
f
[
'backend'
],
dest_backend
)
except
requests
.
exceptions
.
HTTPError
as
ex
:
# allow the main file to be removed from storage.
if
ex
.
response
.
status_code
not
in
{
404
,
410
}:
raise
if
not
variations
:
raise
PrerequisiteNotMetError
(
'Main file ({link}) does not exist on server, '
'and no variations exist either'
.
format
(
**
f
))
log
.
warning
(
'Main file
%s
does not exist; skipping main and visiting variations'
,
f
[
'link'
])
for
var
in
variations
:
copy_file_to_backend
(
file_id
,
project_id
,
var
,
f
[
'backend'
],
dest_backend
)
# Generate new links for the file & all variations. This also saves
# the new backend we set here.
f
[
'backend'
]
=
dest_backend
now
=
datetime
.
datetime
.
now
(
tz
=
bson
.
tz_util
.
utc
)
generate_all_links
(
f
,
now
)
def
copy_file_to_backend
(
file_id
,
project_id
,
file_or_var
,
src_backend
,
dest_backend
):
# Filenames on GCS do not contain paths, by our convention
internal_fname
=
os
.
path
.
basename
(
file_or_var
[
'file_path'
])
file_or_var
[
'file_path'
]
=
internal_fname
# If the file is not local already, fetch it
if
src_backend
==
'pillar'
:
local_finfo
=
fetch_file_from_local
(
file_or_var
)
else
:
local_finfo
=
fetch_file_from_link
(
file_or_var
[
'link'
])
# Upload to GCS
if
dest_backend
!=
'gcs'
:
raise
ValueError
(
'Only dest_backend="gcs" is supported now.'
)
if
current_app
.
config
[
'TESTING'
]:
log
.
warning
(
'Skipping actual upload to GCS due to TESTING'
)
else
:
# TODO check for name collisions
stream_to_gcs
(
file_id
,
local_finfo
[
'file_size'
],
internal_fname
=
internal_fname
,
project_id
=
str
(
project_id
),
stream_for_gcs
=
local_finfo
[
'local_file'
],
content_type
=
local_finfo
[
'content_type'
])
# No longer needed, so it can be closed & dispersed of.
local_finfo
[
'local_file'
]
.
close
()
def
fetch_file_from_link
(
link
):
"""Utility to download a file from a remote location and return it with
additional info (for upload to a different storage backend).
"""
log
.
info
(
'Downloading
%s
'
,
link
)
r
=
requests
.
get
(
link
,
stream
=
True
)
r
.
raise_for_status
()
local_file
=
tempfile
.
NamedTemporaryFile
(
dir
=
current_app
.
config
[
'STORAGE_DIR'
])
log
.
info
(
'Downloading to
%s
'
,
local_file
.
name
)
for
chunk
in
r
.
iter_content
(
chunk_size
=
1024
):
if
chunk
:
local_file
.
write
(
chunk
)
local_file
.
seek
(
0
)
file_dict
=
{
'file_size'
:
os
.
fstat
(
local_file
.
fileno
())
.
st_size
,
'content_type'
:
r
.
headers
.
get
(
'content-type'
,
'application/octet-stream'
),
'local_file'
:
local_file
}
return
file_dict
def
fetch_file_from_local
(
file_doc
):
"""Mimicks fetch_file_from_link(), but just returns the local file.
:param file_doc: dict with 'link' key pointing to a path in STORAGE_DIR, and
'content_type' key.
:type file_doc: dict
:rtype: dict
"""
local_file
=
open
(
os
.
path
.
join
(
current_app
.
config
[
'STORAGE_DIR'
],
file_doc
[
'file_path'
]),
'rb'
)
local_finfo
=
{
'file_size'
:
os
.
fstat
(
local_file
.
fileno
())
.
st_size
,
'content_type'
:
file_doc
[
'content_type'
],
'local_file'
:
local_file
}
return
local_finfo
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Aug 20, 12:26 PM (2 d)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
04/7a/0fba393ee32dd3a7aca5a4ce34dc
Attached To
rPS Pillar
Event Timeline
Log In to Comment