Page Menu
Home
Search
Configure Global Search
Log In
Files
F14211251
__init__.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
30 KB
Subscribers
None
__init__.py
View Options
import
io
import
logging
import
mimetypes
import
tempfile
import
uuid
from
hashlib
import
md5
import
os
import
requests
import
bson.tz_util
import
datetime
import
eve.utils
import
pymongo
import
werkzeug.exceptions
as
wz_exceptions
from
bson
import
ObjectId
from
flask
import
Blueprint
from
flask
import
current_app
from
flask
import
g
from
flask
import
jsonify
from
flask
import
request
from
flask
import
send_from_directory
from
flask
import
url_for
,
helpers
from
pillar.api
import
utils
from
pillar.api.utils.imaging
import
generate_local_thumbnails
from
pillar.api.utils
import
remove_private_keys
,
authentication
from
pillar.api.utils.authorization
import
require_login
,
user_has_role
,
\
user_matches_roles
from
pillar.api.utils.cdn
import
hash_file_path
from
pillar.api.utils.encoding
import
Encoder
from
pillar.api.utils.gcs
import
GoogleCloudStorageBucket
log
=
logging
.
getLogger
(
__name__
)
file_storage
=
Blueprint
(
'file_storage'
,
__name__
,
template_folder
=
'templates'
,
static_folder
=
'../../static/storage'
,
)
# Overrides for browser-specified mimetypes
OVERRIDE_MIMETYPES
=
{
# We don't want to thumbnail EXR files right now, so don't handle as image/...
'image/x-exr'
:
'application/x-exr'
,
}
# Add our own extensions to the mimetypes package
mimetypes
.
add_type
(
'application/x-blender'
,
'.blend'
)
mimetypes
.
add_type
(
'application/x-radiance-hdr'
,
'.hdr'
)
mimetypes
.
add_type
(
'application/x-exr'
,
'.exr'
)
@file_storage.route
(
'/gcs/<bucket_name>/<subdir>/'
)
@file_storage.route
(
'/gcs/<bucket_name>/<subdir>/<path:file_path>'
)
def
browse_gcs
(
bucket_name
,
subdir
,
file_path
=
None
):
"""Browse the content of a Google Cloud Storage bucket"""
# Initialize storage client
storage
=
GoogleCloudStorageBucket
(
bucket_name
,
subdir
=
subdir
)
if
file_path
:
# If we provided a file_path, we try to fetch it
file_object
=
storage
.
Get
(
file_path
)
if
file_object
:
# If it exists, return file properties in a dictionary
return
jsonify
(
file_object
)
else
:
listing
=
storage
.
List
(
file_path
)
return
jsonify
(
listing
)
# We always return an empty listing even if the directory does not
# exist. This can be changed later.
# return abort(404)
else
:
listing
=
storage
.
List
(
''
)
return
jsonify
(
listing
)
@file_storage.route
(
'/file'
,
methods
=
[
'POST'
])
@file_storage.route
(
'/file/<path:file_name>'
,
methods
=
[
'GET'
,
'POST'
])
def
index
(
file_name
=
None
):
# GET file -> read it
if
request
.
method
==
'GET'
:
return
send_from_directory
(
current_app
.
config
[
'STORAGE_DIR'
],
file_name
)
# POST file -> save it
# Sanitize the filename; source: http://stackoverflow.com/questions/7406102/
file_name
=
request
.
form
[
'name'
]
keepcharacters
=
{
' '
,
'.'
,
'_'
}
file_name
=
''
.
join
(
c
for
c
in
file_name
if
c
.
isalnum
()
or
c
in
keepcharacters
)
.
strip
()
file_name
=
file_name
.
lstrip
(
'.'
)
# Determine & create storage directory
folder_name
=
file_name
[:
2
]
file_folder_path
=
helpers
.
safe_join
(
current_app
.
config
[
'STORAGE_DIR'
],
folder_name
)
if
not
os
.
path
.
exists
(
file_folder_path
):
log
.
info
(
'Creating folder path
%r
'
,
file_folder_path
)
os
.
mkdir
(
file_folder_path
)
# Save uploaded file
file_path
=
helpers
.
safe_join
(
file_folder_path
,
file_name
)
log
.
info
(
'Saving file
%r
'
,
file_path
)
request
.
files
[
'data'
]
.
save
(
file_path
)
# TODO: possibly nicer to just return a redirect to the file's URL.
return
jsonify
({
'url'
:
url_for
(
'file_storage.index'
,
file_name
=
file_name
)})
def
_process_image
(
gcs
,
file_id
,
local_file
,
src_file
):
from
PIL
import
Image
im
=
Image
.
open
(
local_file
)
res
=
im
.
size
src_file
[
'width'
]
=
res
[
0
]
src_file
[
'height'
]
=
res
[
1
]
# Generate previews
log
.
info
(
'Generating thumbnails for file
%s
'
,
file_id
)
src_file
[
'variations'
]
=
generate_local_thumbnails
(
src_file
[
'name'
],
local_file
.
name
)
# Send those previews to Google Cloud Storage.
log
.
info
(
'Uploading
%i
thumbnails for file
%s
to Google Cloud Storage '
'(GCS)'
,
len
(
src_file
[
'variations'
]),
file_id
)
# TODO: parallelize this at some point.
for
variation
in
src_file
[
'variations'
]:
fname
=
variation
[
'file_path'
]
if
current_app
.
config
[
'TESTING'
]:
log
.
warning
(
' - NOT sending thumbnail
%s
to GCS'
,
fname
)
else
:
log
.
debug
(
' - Sending thumbnail
%s
to GCS'
,
fname
)
blob
=
gcs
.
bucket
.
blob
(
'_/'
+
fname
,
chunk_size
=
256
*
1024
*
2
)
blob
.
upload_from_filename
(
variation
[
'local_path'
],
content_type
=
variation
[
'content_type'
])
if
variation
.
get
(
'size'
)
==
't'
:
blob
.
make_public
()
try
:
os
.
unlink
(
variation
[
'local_path'
])
except
OSError
:
log
.
warning
(
'Unable to unlink
%s
, ignoring this but it will need '
'cleanup later.'
,
variation
[
'local_path'
])
del
variation
[
'local_path'
]
log
.
info
(
'Done processing file
%s
'
,
file_id
)
src_file
[
'status'
]
=
'complete'
def
_process_video
(
gcs
,
file_id
,
local_file
,
src_file
):
"""Video is processed by Zencoder; the file isn't even stored locally."""
log
.
info
(
'Processing video for file
%s
'
,
file_id
)
# Create variations
root
,
_
=
os
.
path
.
splitext
(
src_file
[
'file_path'
])
src_file
[
'variations'
]
=
[]
# Most of these properties will be available after encode.
v
=
'mp4'
file_variation
=
dict
(
format
=
v
,
content_type
=
'video/{}'
.
format
(
v
),
file_path
=
'{}-{}.{}'
.
format
(
root
,
v
,
v
),
size
=
''
,
duration
=
0
,
width
=
0
,
height
=
0
,
length
=
0
,
md5
=
''
,
)
# Append file variation. Originally mp4 and webm were the available options,
# that's why we build a list.
src_file
[
'variations'
]
.
append
(
file_variation
)
if
current_app
.
config
[
'TESTING'
]:
log
.
warning
(
'_process_video: NOT sending out encoding job due to '
'TESTING=
%r
'
,
current_app
.
config
[
'TESTING'
])
j
=
type
(
'EncoderJob'
,
(),
{
'process_id'
:
'fake-process-id'
,
'backend'
:
'fake'
})
else
:
j
=
Encoder
.
job_create
(
src_file
)
if
j
is
None
:
log
.
warning
(
'_process_video: unable to create encoder job for file '
'
%s
.'
,
file_id
)
return
log
.
info
(
'Created asynchronous Zencoder job
%s
for file
%s
'
,
j
[
'process_id'
],
file_id
)
# Add the processing status to the file object
src_file
[
'processing'
]
=
{
'status'
:
'pending'
,
'job_id'
:
str
(
j
[
'process_id'
]),
'backend'
:
j
[
'backend'
]}
def
process_file
(
gcs
,
file_id
,
local_file
):
"""Process the file by creating thumbnails, sending to Zencoder, etc.
:param file_id: '_id' key of the file
:type file_id: ObjectId or str
:param local_file: locally stored file, or None if no local processing is
needed.
:type local_file: file
"""
file_id
=
ObjectId
(
file_id
)
# Fetch the src_file document from MongoDB.
files
=
current_app
.
data
.
driver
.
db
[
'files'
]
src_file
=
files
.
find_one
(
file_id
)
if
not
src_file
:
log
.
warning
(
'process_file(
%s
): no such file document found, ignoring.'
)
return
src_file
=
utils
.
remove_private_keys
(
src_file
)
# Update the 'format' field from the content type.
# TODO: overrule the content type based on file extention & magic numbers.
mime_category
,
src_file
[
'format'
]
=
src_file
[
'content_type'
]
.
split
(
'/'
,
1
)
# Prevent video handling for non-admins.
if
not
user_has_role
(
u'admin'
)
and
mime_category
==
'video'
:
if
src_file
[
'format'
]
.
startswith
(
'x-'
):
xified
=
src_file
[
'format'
]
else
:
xified
=
'x-'
+
src_file
[
'format'
]
src_file
[
'content_type'
]
=
'application/
%s
'
%
xified
mime_category
=
'application'
log
.
info
(
'Not processing video file
%s
for non-admin user'
,
file_id
)
# Run the required processor, based on the MIME category.
processors
=
{
'image'
:
_process_image
,
'video'
:
_process_video
,
}
try
:
processor
=
processors
[
mime_category
]
except
KeyError
:
log
.
info
(
"POSTed file
%s
was of type
%r
, which isn't "
"thumbnailed/encoded."
,
file_id
,
mime_category
)
src_file
[
'status'
]
=
'complete'
else
:
log
.
debug
(
'process_file(
%s
): marking file status as "processing"'
,
file_id
)
src_file
[
'status'
]
=
'processing'
update_file_doc
(
file_id
,
status
=
'processing'
)
try
:
processor
(
gcs
,
file_id
,
local_file
,
src_file
)
except
Exception
:
log
.
warning
(
'process_file(
%s
): error when processing file, '
'resetting status to '
'"queued_for_processing"'
,
file_id
,
exc_info
=
True
)
update_file_doc
(
file_id
,
status
=
'queued_for_processing'
)
return
# Update the original file with additional info, e.g. image resolution
r
,
_
,
_
,
status
=
current_app
.
put_internal
(
'files'
,
src_file
,
_id
=
file_id
)
if
status
not
in
(
200
,
201
):
log
.
warning
(
'process_file(
%s
): status
%i
when saving processed file '
'info to MongoDB:
%s
'
,
file_id
,
status
,
r
)
def
delete_file
(
file_item
):
def
process_file_delete
(
file_item
):
"""Given a file item, delete the actual file from the storage backend.
This function can be probably made self-calling."""
if
file_item
[
'backend'
]
==
'gcs'
:
storage
=
GoogleCloudStorageBucket
(
str
(
file_item
[
'project'
]))
storage
.
Delete
(
file_item
[
'file_path'
])
# Delete any file variation found in the file_item document
if
'variations'
in
file_item
:
for
v
in
file_item
[
'variations'
]:
storage
.
Delete
(
v
[
'file_path'
])
return
True
elif
file_item
[
'backend'
]
==
'pillar'
:
pass
elif
file_item
[
'backend'
]
==
'cdnsun'
:
pass
else
:
pass
files_collection
=
current_app
.
data
.
driver
.
db
[
'files'
]
# Collect children (variations) of the original file
children
=
files_collection
.
find
({
'parent'
:
file_item
[
'_id'
]})
for
child
in
children
:
process_file_delete
(
child
)
# Finally remove the original file
process_file_delete
(
file_item
)
def
generate_link
(
backend
,
file_path
,
project_id
=
None
,
is_public
=
False
):
"""Hook to check the backend of a file resource, to build an appropriate link
that can be used by the client to retrieve the actual file.
"""
if
backend
==
'gcs'
:
if
current_app
.
config
[
'TESTING'
]:
log
.
info
(
'Skipping GCS link generation, and returning a fake link '
'instead.'
)
return
'/path/to/testing/gcs/
%s
'
%
file_path
storage
=
GoogleCloudStorageBucket
(
project_id
)
blob
=
storage
.
Get
(
file_path
)
if
blob
is
None
:
log
.
warning
(
'generate_link(
%r
,
%r
): unable to find blob for file path,'
' returning empty link.'
,
backend
,
file_path
)
return
''
if
is_public
:
return
blob
[
'public_url'
]
return
blob
[
'signed_url'
]
if
backend
==
'pillar'
:
return
url_for
(
'file_storage.index'
,
file_name
=
file_path
,
_external
=
True
,
_scheme
=
current_app
.
config
[
'SCHEME'
])
if
backend
==
'cdnsun'
:
return
hash_file_path
(
file_path
,
None
)
if
backend
==
'unittest'
:
return
'https://unit.test/
%s
'
%
md5
(
file_path
)
.
hexdigest
()
log
.
warning
(
'generate_link(): Unknown backend
%r
, returning empty string as new link.'
,
backend
)
return
''
def
before_returning_file
(
response
):
ensure_valid_link
(
response
)
# Enable this call later, when we have implemented the is_public field on
# files.
# strip_link_and_variations(response)
def
strip_link_and_variations
(
response
):
# Check the access level of the user.
if
g
.
current_user
is
None
:
has_full_access
=
False
else
:
user_roles
=
g
.
current_user
[
'roles'
]
access_roles
=
current_app
.
config
[
'FULL_FILE_ACCESS_ROLES'
]
has_full_access
=
bool
(
user_roles
.
intersection
(
access_roles
))
# Strip all file variations (unless image) and link to the actual file.
if
not
has_full_access
:
response
.
pop
(
'link'
,
None
)
response
.
pop
(
'link_expires'
,
None
)
# Image files have public variations, other files don't.
if
not
response
.
get
(
'content_type'
,
''
)
.
startswith
(
'image/'
):
if
response
.
get
(
'variations'
)
is
not
None
:
response
[
'variations'
]
=
[]
def
before_returning_files
(
response
):
for
item
in
response
[
'_items'
]:
ensure_valid_link
(
item
)
def
ensure_valid_link
(
response
):
"""Ensures the file item has valid file links using generate_link(...)."""
# Log to function-specific logger, so we can easily turn it off.
log_link
=
logging
.
getLogger
(
'
%s
.ensure_valid_link'
%
__name__
)
# log.debug('Inspecting link for file %s', response['_id'])
# Check link expiry.
now
=
datetime
.
datetime
.
now
(
tz
=
bson
.
tz_util
.
utc
)
if
'link_expires'
in
response
:
link_expires
=
response
[
'link_expires'
]
if
now
<
link_expires
:
# Not expired yet, so don't bother regenerating anything.
log_link
.
debug
(
'Link expires at
%s
, which is in the future, so not '
'generating new link'
,
link_expires
)
return
log_link
.
debug
(
'Link expired at
%s
, which is in the past; generating '
'new link'
,
link_expires
)
else
:
log_link
.
debug
(
'No expiry date for link; generating new link'
)
_generate_all_links
(
response
,
now
)
def
_generate_all_links
(
response
,
now
):
"""Generate a new link for the file and all its variations.
:param response: the file document that should be updated.
:param now: datetime that reflects 'now', for consistent expiry generation.
"""
project_id
=
str
(
response
[
'project'
])
if
'project'
in
response
else
None
# TODO: add project id to all files
backend
=
response
[
'backend'
]
response
[
'link'
]
=
generate_link
(
backend
,
response
[
'file_path'
],
project_id
)
variations
=
response
.
get
(
'variations'
)
if
variations
:
for
variation
in
variations
:
variation
[
'link'
]
=
generate_link
(
backend
,
variation
[
'file_path'
],
project_id
)
# Construct the new expiry datetime.
validity_secs
=
current_app
.
config
[
'FILE_LINK_VALIDITY'
][
backend
]
response
[
'link_expires'
]
=
now
+
datetime
.
timedelta
(
seconds
=
validity_secs
)
patch_info
=
remove_private_keys
(
response
)
file_id
=
ObjectId
(
response
[
'_id'
])
(
patch_resp
,
_
,
_
,
_
)
=
current_app
.
patch_internal
(
'files'
,
patch_info
,
_id
=
file_id
)
if
patch_resp
.
get
(
'_status'
)
==
'ERR'
:
log
.
warning
(
'Unable to save new links for file
%s
:
%r
'
,
response
[
'_id'
],
patch_resp
)
# TODO: raise a snag.
response
[
'_updated'
]
=
now
else
:
response
[
'_updated'
]
=
patch_resp
[
'_updated'
]
# Be silly and re-fetch the etag ourselves. TODO: handle this better.
etag_doc
=
current_app
.
data
.
driver
.
db
[
'files'
]
.
find_one
({
'_id'
:
file_id
},
{
'_etag'
:
1
})
response
[
'_etag'
]
=
etag_doc
[
'_etag'
]
def
before_deleting_file
(
item
):
delete_file
(
item
)
def
on_pre_get_files
(
_
,
lookup
):
# Override the HTTP header, we always want to fetch the document from
# MongoDB.
parsed_req
=
eve
.
utils
.
parse_request
(
'files'
)
parsed_req
.
if_modified_since
=
None
# Only fetch it if the date got expired.
now
=
datetime
.
datetime
.
now
(
tz
=
bson
.
tz_util
.
utc
)
lookup_expired
=
lookup
.
copy
()
lookup_expired
[
'link_expires'
]
=
{
'$lte'
:
now
}
cursor
=
current_app
.
data
.
find
(
'files'
,
parsed_req
,
lookup_expired
)
for
file_doc
in
cursor
:
# log.debug('Updating expired links for file %r.', file_doc['_id'])
_generate_all_links
(
file_doc
,
now
)
def
refresh_links_for_project
(
project_uuid
,
chunk_size
,
expiry_seconds
):
if
chunk_size
:
log
.
info
(
'Refreshing the first
%i
links for project
%s
'
,
chunk_size
,
project_uuid
)
else
:
log
.
info
(
'Refreshing all links for project
%s
'
,
project_uuid
)
# Retrieve expired links.
files_collection
=
current_app
.
data
.
driver
.
db
[
'files'
]
now
=
datetime
.
datetime
.
now
(
tz
=
bson
.
tz_util
.
utc
)
expire_before
=
now
+
datetime
.
timedelta
(
seconds
=
expiry_seconds
)
log
.
info
(
'Limiting to links that expire before
%s
'
,
expire_before
)
to_refresh
=
files_collection
.
find
(
{
'project'
:
ObjectId
(
project_uuid
),
'link_expires'
:
{
'$lt'
:
expire_before
},
})
.
sort
([(
'link_expires'
,
pymongo
.
ASCENDING
)])
.
limit
(
chunk_size
)
if
to_refresh
.
count
()
==
0
:
log
.
info
(
'No links to refresh.'
)
return
for
file_doc
in
to_refresh
:
log
.
debug
(
'Refreshing links for file
%s
'
,
file_doc
[
'_id'
])
_generate_all_links
(
file_doc
,
now
)
log
.
info
(
'Refreshed
%i
links'
,
min
(
chunk_size
,
to_refresh
.
count
()))
def
refresh_links_for_backend
(
backend_name
,
chunk_size
,
expiry_seconds
):
import
gcloud.exceptions
# Retrieve expired links.
files_collection
=
current_app
.
data
.
driver
.
db
[
'files'
]
proj_coll
=
current_app
.
data
.
driver
.
db
[
'projects'
]
now
=
datetime
.
datetime
.
now
(
tz
=
bson
.
tz_util
.
utc
)
expire_before
=
now
+
datetime
.
timedelta
(
seconds
=
expiry_seconds
)
log
.
info
(
'Limiting to links that expire before
%s
'
,
expire_before
)
to_refresh
=
files_collection
.
find
(
{
'$or'
:
[{
'backend'
:
backend_name
,
'link_expires'
:
None
},
{
'backend'
:
backend_name
,
'link_expires'
:
{
'$lt'
:
expire_before
}},
{
'backend'
:
backend_name
,
'link'
:
None
}]
})
.
sort
([(
'link_expires'
,
pymongo
.
ASCENDING
)])
.
limit
(
chunk_size
)
.
batch_size
(
5
)
if
to_refresh
.
count
()
==
0
:
log
.
info
(
'No links to refresh.'
)
return
refreshed
=
0
for
file_doc
in
to_refresh
:
try
:
file_id
=
file_doc
[
'_id'
]
project_id
=
file_doc
.
get
(
'project'
)
if
project_id
is
None
:
log
.
debug
(
'Skipping file
%s
, it has no project.'
,
file_id
)
continue
count
=
proj_coll
.
count
({
'_id'
:
project_id
,
'$or'
:
[
{
'_deleted'
:
{
'$exists'
:
False
}},
{
'_deleted'
:
False
},
]})
if
count
==
0
:
log
.
debug
(
'Skipping file
%s
, project
%s
does not exist.'
,
file_id
,
project_id
)
continue
if
'file_path'
not
in
file_doc
:
log
.
warning
(
"Skipping file
%s
, missing 'file_path' property."
,
file_id
)
continue
log
.
debug
(
'Refreshing links for file
%s
'
,
file_id
)
try
:
_generate_all_links
(
file_doc
,
now
)
except
gcloud
.
exceptions
.
Forbidden
:
log
.
warning
(
'Skipping file
%s
, GCS forbids us access to '
'project
%s
bucket.'
,
file_id
,
project_id
)
continue
refreshed
+=
1
except
KeyboardInterrupt
:
log
.
warning
(
'Aborting due to KeyboardInterrupt after refreshing
%i
'
'links'
,
refreshed
)
return
log
.
info
(
'Refreshed
%i
links'
,
refreshed
)
@require_login
()
def
create_file_doc
(
name
,
filename
,
content_type
,
length
,
project
,
backend
=
'gcs'
,
**
extra_fields
):
"""Creates a minimal File document for storage in MongoDB.
Doesn't save it to MongoDB yet.
"""
current_user
=
g
.
get
(
'current_user'
)
file_doc
=
{
'name'
:
name
,
'filename'
:
filename
,
'file_path'
:
''
,
'user'
:
current_user
[
'user_id'
],
'backend'
:
backend
,
'md5'
:
''
,
'content_type'
:
content_type
,
'length'
:
length
,
'project'
:
project
}
file_doc
.
update
(
extra_fields
)
return
file_doc
def
override_content_type
(
uploaded_file
):
"""Overrides the content type based on file extensions.
:param uploaded_file: file from request.files['form-key']
:type uploaded_file: werkzeug.datastructures.FileStorage
"""
# Possibly use the browser-provided mime type
mimetype
=
uploaded_file
.
mimetype
try
:
mimetype
=
OVERRIDE_MIMETYPES
[
mimetype
]
except
KeyError
:
pass
if
'/'
in
mimetype
:
mimecat
=
mimetype
.
split
(
'/'
)[
0
]
if
mimecat
in
{
'video'
,
'audio'
,
'image'
}:
# The browser's mime type is probably ok, just use it.
return
# And then use it to set the mime type.
(
mimetype
,
encoding
)
=
mimetypes
.
guess_type
(
uploaded_file
.
filename
)
# Only override the mime type if we can detect it, otherwise just
# keep whatever the browser gave us.
if
mimetype
:
# content_type property can't be set directly
uploaded_file
.
headers
[
'content-type'
]
=
mimetype
# It has this, because we used uploaded_file.mimetype earlier this
# function.
del
uploaded_file
.
_parsed_content_type
def
assert_file_size_allowed
(
file_size
):
"""Asserts that the current user is allowed to upload a file of the given size.
:raises
"""
roles
=
current_app
.
config
[
'ROLES_FOR_UNLIMITED_UPLOADS'
]
if
user_matches_roles
(
require_roles
=
roles
):
return
filesize_limit
=
current_app
.
config
[
'FILESIZE_LIMIT_BYTES_NONSUBS'
]
if
file_size
<
filesize_limit
:
return
filesize_limit_mb
=
filesize_limit
/
2.0
**
20
log
.
info
(
'User
%s
tried to upload a
%.3f
MiB file, but is only allowed '
'
%.3f
MiB.'
,
authentication
.
current_user_id
(),
file_size
/
2.0
**
20
,
filesize_limit_mb
)
raise
wz_exceptions
.
RequestEntityTooLarge
(
'To upload files larger than
%i
MiB, subscribe to Blender Cloud'
%
filesize_limit_mb
)
@file_storage.route
(
'/stream/<string:project_id>'
,
methods
=
[
'POST'
,
'OPTIONS'
])
@require_login
()
def
stream_to_storage
(
project_id
):
project_oid
=
utils
.
str2id
(
project_id
)
projects
=
current_app
.
data
.
driver
.
db
[
'projects'
]
project
=
projects
.
find_one
(
project_oid
,
projection
=
{
'_id'
:
1
})
if
not
project
:
raise
wz_exceptions
.
NotFound
(
'Project
%s
does not exist'
%
project_id
)
log
.
info
(
'Streaming file to bucket for project=
%s
user_id=
%s
'
,
project_id
,
authentication
.
current_user_id
())
log
.
info
(
'request.headers[Origin] =
%r
'
,
request
.
headers
.
get
(
'Origin'
))
uploaded_file
=
request
.
files
[
'file'
]
# Not every upload has a Content-Length header. If it was passed, we might
# as well check for its value before we require the user to upload the
# entire file. (At least I hope that this part of the code is processed
# before the body is read in its entirety)
if
uploaded_file
.
content_length
:
assert_file_size_allowed
(
uploaded_file
.
content_length
)
override_content_type
(
uploaded_file
)
if
not
uploaded_file
.
content_type
:
log
.
warning
(
'File uploaded to project
%s
without content type.'
,
project_oid
)
raise
wz_exceptions
.
BadRequest
(
'Missing content type.'
)
if
uploaded_file
.
content_type
.
startswith
(
'image/'
):
# We need to do local thumbnailing, so we have to write the stream
# both to Google Cloud Storage and to local storage.
local_file
=
tempfile
.
NamedTemporaryFile
(
dir
=
current_app
.
config
[
'STORAGE_DIR'
])
uploaded_file
.
save
(
local_file
)
local_file
.
seek
(
0
)
# Make sure that a re-read starts from the beginning.
stream_for_gcs
=
local_file
else
:
local_file
=
None
stream_for_gcs
=
uploaded_file
.
stream
# Figure out the file size, as we need to pass this in explicitly to GCloud.
# Otherwise it always uses os.fstat(file_obj.fileno()).st_size, which isn't
# supported by a BytesIO object (even though it does have a fileno
# attribute).
if
isinstance
(
stream_for_gcs
,
io
.
BytesIO
):
file_size
=
len
(
stream_for_gcs
.
getvalue
())
else
:
file_size
=
os
.
fstat
(
stream_for_gcs
.
fileno
())
.
st_size
# Check the file size again, now that we know its size for sure.
assert_file_size_allowed
(
file_size
)
# Create file document in MongoDB.
file_id
,
internal_fname
,
status
=
create_file_doc_for_upload
(
project_oid
,
uploaded_file
)
if
current_app
.
config
[
'TESTING'
]:
log
.
warning
(
'NOT streaming to GCS because TESTING=
%r
'
,
current_app
.
config
[
'TESTING'
])
# Fake a Blob object.
gcs
=
None
blob
=
type
(
'Blob'
,
(),
{
'size'
:
file_size
})
else
:
blob
,
gcs
=
stream_to_gcs
(
file_id
,
file_size
,
internal_fname
,
project_id
,
stream_for_gcs
,
uploaded_file
.
mimetype
)
log
.
debug
(
'Marking uploaded file id=
%s
, fname=
%s
, '
'size=
%i
as "queued_for_processing"'
,
file_id
,
internal_fname
,
blob
.
size
)
update_file_doc
(
file_id
,
status
=
'queued_for_processing'
,
file_path
=
internal_fname
,
length
=
blob
.
size
,
content_type
=
uploaded_file
.
mimetype
)
log
.
debug
(
'Processing uploaded file id=
%s
, fname=
%s
, size=
%i
'
,
file_id
,
internal_fname
,
blob
.
size
)
process_file
(
gcs
,
file_id
,
local_file
)
# Local processing is done, we can close the local file so it is removed.
if
local_file
is
not
None
:
local_file
.
close
()
log
.
debug
(
'Handled uploaded file id=
%s
, fname=
%s
, size=
%i
, status=
%i
'
,
file_id
,
internal_fname
,
blob
.
size
,
status
)
# Status is 200 if the file already existed, and 201 if it was newly
# created.
# TODO: add a link to a thumbnail in the response.
resp
=
jsonify
(
status
=
'ok'
,
file_id
=
str
(
file_id
))
resp
.
status_code
=
status
add_access_control_headers
(
resp
)
return
resp
def
stream_to_gcs
(
file_id
,
file_size
,
internal_fname
,
project_id
,
stream_for_gcs
,
content_type
):
# Upload the file to GCS.
from
gcloud.streaming
import
transfer
log
.
debug
(
'Streaming file to GCS bucket; id=
%s
, fname=
%s
, size=
%i
'
,
file_id
,
internal_fname
,
file_size
)
# Files larger than this many bytes will be streamed directly from disk,
# smaller ones will be read into memory and then uploaded.
transfer
.
RESUMABLE_UPLOAD_THRESHOLD
=
102400
try
:
gcs
=
GoogleCloudStorageBucket
(
project_id
)
blob
=
gcs
.
bucket
.
blob
(
'_/'
+
internal_fname
,
chunk_size
=
256
*
1024
*
2
)
blob
.
upload_from_file
(
stream_for_gcs
,
size
=
file_size
,
content_type
=
content_type
)
except
Exception
:
log
.
exception
(
'Error uploading file to Google Cloud Storage (GCS),'
' aborting handling of uploaded file (id=
%s
).'
,
file_id
)
update_file_doc
(
file_id
,
status
=
'failed'
)
raise
wz_exceptions
.
InternalServerError
(
'Unable to stream file to Google Cloud Storage'
)
# Reload the blob to get the file size according to Google.
blob
.
reload
()
return
blob
,
gcs
def
add_access_control_headers
(
resp
):
"""Allows cross-site requests from the configured domain."""
if
'Origin'
not
in
request
.
headers
:
return
resp
resp
.
headers
[
'Access-Control-Allow-Origin'
]
=
request
.
headers
[
'Origin'
]
resp
.
headers
[
'Access-Control-Allow-Credentials'
]
=
'true'
return
resp
def
update_file_doc
(
file_id
,
**
updates
):
files
=
current_app
.
data
.
driver
.
db
[
'files'
]
res
=
files
.
update_one
({
'_id'
:
ObjectId
(
file_id
)},
{
'$set'
:
updates
})
log
.
debug
(
'update_file_doc(
%s
,
%s
):
%i
matched,
%i
updated.'
,
file_id
,
updates
,
res
.
matched_count
,
res
.
modified_count
)
return
res
def
create_file_doc_for_upload
(
project_id
,
uploaded_file
):
"""Creates a secure filename and a document in MongoDB for the file.
The (project_id, filename) tuple should be unique. If such a document already
exists, it is updated with the new file.
:param uploaded_file: file from request.files['form-key']
:type uploaded_file: werkzeug.datastructures.FileStorage
:returns: a tuple (file_id, filename, status), where 'filename' is the internal
filename used on GCS.
"""
project_id
=
ObjectId
(
project_id
)
# Hash the filename with path info to get the internal name. This should
# be unique for the project.
# internal_filename = uploaded_file.filename
_
,
ext
=
os
.
path
.
splitext
(
uploaded_file
.
filename
)
internal_filename
=
uuid
.
uuid4
()
.
hex
+
ext
# For now, we don't support overwriting files, and create a new one every time.
# # See if we can find a pre-existing file doc.
# files = current_app.data.driver.db['files']
# file_doc = files.find_one({'project': project_id,
# 'name': internal_filename})
file_doc
=
None
# TODO: at some point do name-based and content-based content-type sniffing.
new_props
=
{
'filename'
:
uploaded_file
.
filename
,
'content_type'
:
uploaded_file
.
mimetype
,
'length'
:
uploaded_file
.
content_length
,
'project'
:
project_id
,
'status'
:
'uploading'
}
if
file_doc
is
None
:
# Create a file document on MongoDB for this file.
file_doc
=
create_file_doc
(
name
=
internal_filename
,
**
new_props
)
file_fields
,
_
,
_
,
status
=
current_app
.
post_internal
(
'files'
,
file_doc
)
else
:
file_doc
.
update
(
new_props
)
file_fields
,
_
,
_
,
status
=
current_app
.
put_internal
(
'files'
,
remove_private_keys
(
file_doc
))
if
status
not
in
(
200
,
201
):
log
.
error
(
'Unable to create new file document in MongoDB, status=
%i
:
%s
'
,
status
,
file_fields
)
raise
wz_exceptions
.
InternalServerError
()
log
.
debug
(
'Created file document
%s
for uploaded file
%s
; internal name
%s
'
,
file_fields
[
'_id'
],
uploaded_file
.
filename
,
internal_filename
)
return
file_fields
[
'_id'
],
internal_filename
,
status
def
compute_aggregate_length
(
file_doc
,
original
=
None
):
"""Computes the total length (in bytes) of the file and all variations.
Stores the result in file_doc['length_aggregate_in_bytes']
"""
# Compute total size of all variations.
variations
=
file_doc
.
get
(
'variations'
,
())
var_length
=
sum
(
var
.
get
(
'length'
,
0
)
for
var
in
variations
)
file_doc
[
'length_aggregate_in_bytes'
]
=
file_doc
.
get
(
'length'
,
0
)
+
var_length
def
compute_aggregate_length_items
(
file_docs
):
for
file_doc
in
file_docs
:
compute_aggregate_length
(
file_doc
)
def
setup_app
(
app
,
url_prefix
):
app
.
on_pre_GET_files
+=
on_pre_get_files
app
.
on_fetched_item_files
+=
before_returning_file
app
.
on_fetched_resource_files
+=
before_returning_files
app
.
on_delete_item_files
+=
before_deleting_file
app
.
on_update_files
+=
compute_aggregate_length
app
.
on_replace_files
+=
compute_aggregate_length
app
.
on_insert_files
+=
compute_aggregate_length_items
app
.
register_api_blueprint
(
file_storage
,
url_prefix
=
url_prefix
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Wed, Feb 1, 4:17 AM (2 d)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
0d/19/7db68772138697af95de94c73724
Attached To
rPS Pillar
Event Timeline
Log In to Comment