Page MenuHome
No OneTemporary

import io
import logging
import mimetypes
import tempfile
import uuid
from hashlib import md5
import bson.tz_util
import datetime
import eve.utils
import os
import pymongo
import werkzeug.exceptions as wz_exceptions
from bson import ObjectId
from flask import Blueprint
from flask import current_app
from flask import g
from flask import jsonify
from flask import request
from flask import send_from_directory
from flask import url_for, helpers
from pillar.api import utils
from pillar.api.utils.imaging import generate_local_thumbnails
from pillar.api.utils import remove_private_keys, authentication
from pillar.api.utils.authorization import require_login, user_has_role, \
from pillar.api.utils.cdn import hash_file_path
from pillar.api.utils.encoding import Encoder
from pillar.api.utils.gcs import GoogleCloudStorageBucket
log = logging.getLogger(__name__)
file_storage = Blueprint('file_storage', __name__,
static_folder='../../static/storage', )
# Overrides for browser-specified mimetypes
# We don't want to thumbnail EXR files right now, so don't handle as image/...
'image/x-exr': 'application/x-exr',
# Add our own extensions to the mimetypes package
mimetypes.add_type('application/x-blender', '.blend')
mimetypes.add_type('application/x-radiance-hdr', '.hdr')
mimetypes.add_type('application/x-exr', '.exr')
def browse_gcs(bucket_name, subdir, file_path=None):
"""Browse the content of a Google Cloud Storage bucket"""
# Initialize storage client
storage = GoogleCloudStorageBucket(bucket_name, subdir=subdir)
if file_path:
# If we provided a file_path, we try to fetch it
file_object = storage.Get(file_path)
if file_object:
# If it exists, return file properties in a dictionary
return jsonify(file_object)
listing = storage.List(file_path)
return jsonify(listing)
# We always return an empty listing even if the directory does not
# exist. This can be changed later.
# return abort(404)
listing = storage.List('')
return jsonify(listing)
@file_storage.route('/file', methods=['POST'])
@file_storage.route('/file/<path:file_name>', methods=['GET', 'POST'])
def index(file_name=None):
# GET file -> read it
if request.method == 'GET':
return send_from_directory(current_app.config['STORAGE_DIR'], file_name)
# POST file -> save it
# Sanitize the filename; source:
file_name = request.form['name']
keepcharacters = {' ', '.', '_'}
file_name = ''.join(
c for c in file_name if c.isalnum() or c in keepcharacters).strip()
file_name = file_name.lstrip('.')
# Determine & create storage directory
folder_name = file_name[:2]
file_folder_path = helpers.safe_join(current_app.config['STORAGE_DIR'],
if not os.path.exists(file_folder_path):'Creating folder path %r', file_folder_path)
# Save uploaded file
file_path = helpers.safe_join(file_folder_path, file_name)'Saving file %r', file_path)
# TODO: possibly nicer to just return a redirect to the file's URL.
return jsonify({'url': url_for('file_storage.index', file_name=file_name)})
def _process_image(gcs, file_id, local_file, src_file):
from PIL import Image
im =
res = im.size
src_file['width'] = res[0]
src_file['height'] = res[1]
# Generate previews'Generating thumbnails for file %s', file_id)
src_file['variations'] = generate_local_thumbnails(src_file['name'],
# Send those previews to Google Cloud Storage.'Uploading %i thumbnails for file %s to Google Cloud Storage '
'(GCS)', len(src_file['variations']), file_id)
# TODO: parallelize this at some point.
for variation in src_file['variations']:
fname = variation['file_path']
if current_app.config['TESTING']:
log.warning(' - NOT sending thumbnail %s to GCS', fname)
log.debug(' - Sending thumbnail %s to GCS', fname)
blob = gcs.bucket.blob('_/' + fname, chunk_size=256 * 1024 * 2)
if variation.get('size') == 't':
except OSError:
log.warning('Unable to unlink %s, ignoring this but it will need '
'cleanup later.', variation['local_path'])
del variation['local_path']'Done processing file %s', file_id)
src_file['status'] = 'complete'
def _process_video(gcs, file_id, local_file, src_file):
"""Video is processed by Zencoder; the file isn't even stored locally."""'Processing video for file %s', file_id)
# Create variations
root, _ = os.path.splitext(src_file['file_path'])
src_file['variations'] = []
# Most of these properties will be available after encode.
v = 'mp4'
file_variation = dict(
file_path='{}-{}.{}'.format(root, v, v),
# Append file variation. Originally mp4 and webm were the available options,
# that's why we build a list.
if current_app.config['TESTING']:
log.warning('_process_video: NOT sending out encoding job due to '
'TESTING=%r', current_app.config['TESTING'])
j = type('EncoderJob', (), {'process_id': 'fake-process-id',
'backend': 'fake'})
j = Encoder.job_create(src_file)
if j is None:
log.warning('_process_video: unable to create encoder job for file '
'%s.', file_id)
return'Created asynchronous Zencoder job %s for file %s',
j['process_id'], file_id)
# Add the processing status to the file object
src_file['processing'] = {
'status': 'pending',
'job_id': str(j['process_id']),
'backend': j['backend']}
def process_file(gcs, file_id, local_file):
"""Process the file by creating thumbnails, sending to Zencoder, etc.
:param file_id: '_id' key of the file
:type file_id: ObjectId or str
:param local_file: locally stored file, or None if no local processing is
:type local_file: file
file_id = ObjectId(file_id)
# Fetch the src_file document from MongoDB.
files =['files']
src_file = files.find_one(file_id)
if not src_file:
log.warning('process_file(%s): no such file document found, ignoring.')
src_file = utils.remove_private_keys(src_file)
# Update the 'format' field from the content type.
# TODO: overrule the content type based on file extention & magic numbers.
mime_category, src_file['format'] = src_file['content_type'].split('/', 1)
# Prevent video handling for non-admins.
if not user_has_role(u'admin') and mime_category == 'video':
if src_file['format'].startswith('x-'):
xified = src_file['format']
xified = 'x-' + src_file['format']
src_file['content_type'] = 'application/%s' % xified
mime_category = 'application''Not processing video file %s for non-admin user', file_id)
# Run the required processor, based on the MIME category.
processors = {
'image': _process_image,
'video': _process_video,
processor = processors[mime_category]
except KeyError:"POSTed file %s was of type %r, which isn't "
"thumbnailed/encoded.", file_id,
src_file['status'] = 'complete'
log.debug('process_file(%s): marking file status as "processing"',
src_file['status'] = 'processing'
update_file_doc(file_id, status='processing')
processor(gcs, file_id, local_file, src_file)
except Exception:
log.warning('process_file(%s): error when processing file, '
'resetting status to '
'"queued_for_processing"', file_id, exc_info=True)
update_file_doc(file_id, status='queued_for_processing')
# Update the original file with additional info, e.g. image resolution
r, _, _, status = current_app.put_internal('files', src_file, _id=file_id)
if status not in (200, 201):
log.warning('process_file(%s): status %i when saving processed file '
'info to MongoDB: %s',
file_id, status, r)
def delete_file(file_item):
def process_file_delete(file_item):
"""Given a file item, delete the actual file from the storage backend.
This function can be probably made self-calling."""
if file_item['backend'] == 'gcs':
storage = GoogleCloudStorageBucket(str(file_item['project']))
# Delete any file variation found in the file_item document
if 'variations' in file_item:
for v in file_item['variations']:
return True
elif file_item['backend'] == 'pillar':
elif file_item['backend'] == 'cdnsun':
files_collection =['files']
# Collect children (variations) of the original file
children = files_collection.find({'parent': file_item['_id']})
for child in children:
# Finally remove the original file
def generate_link(backend, file_path, project_id=None, is_public=False):
"""Hook to check the backend of a file resource, to build an appropriate link
that can be used by the client to retrieve the actual file.
if backend == 'gcs':
if current_app.config['TESTING']:'Skipping GCS link generation, and returning a fake link '
return '/path/to/testing/gcs/%s' % file_path
storage = GoogleCloudStorageBucket(project_id)
blob = storage.Get(file_path)
if blob is None:
return ''
if is_public:
return blob['public_url']
return blob['signed_url']
if backend == 'pillar':
return url_for('file_storage.index', file_name=file_path,
_external=True, _scheme=current_app.config['SCHEME'])
if backend == 'cdnsun':
return hash_file_path(file_path, None)
if backend == 'unittest':
return md5(file_path).hexdigest()
return ''
def before_returning_file(response):
# Enable this call later, when we have implemented the is_public field on
# files.
# strip_link_and_variations(response)
def strip_link_and_variations(response):
# Check the access level of the user.
if g.current_user is None:
has_full_access = False
user_roles = g.current_user['roles']
access_roles = current_app.config['FULL_FILE_ACCESS_ROLES']
has_full_access = bool(user_roles.intersection(access_roles))
# Strip all file variations (unless image) and link to the actual file.
if not has_full_access:
response.pop('link', None)
response.pop('link_expires', None)
# Image files have public variations, other files don't.
if not response.get('content_type', '').startswith('image/'):
if response.get('variations') is not None:
response['variations'] = []
def before_returning_files(response):
for item in response['_items']:
def ensure_valid_link(response):
"""Ensures the file item has valid file links using generate_link(...)."""
# Log to function-specific logger, so we can easily turn it off.
log_link = logging.getLogger('%s.ensure_valid_link' % __name__)
# log.debug('Inspecting link for file %s', response['_id'])
# Check link expiry.
now =
if 'link_expires' in response:
link_expires = response['link_expires']
if now < link_expires:
# Not expired yet, so don't bother regenerating anything.
log_link.debug('Link expires at %s, which is in the future, so not '
'generating new link', link_expires)
log_link.debug('Link expired at %s, which is in the past; generating '
'new link', link_expires)
log_link.debug('No expiry date for link; generating new link')
_generate_all_links(response, now)
def _generate_all_links(response, now):
"""Generate a new link for the file and all its variations.
:param response: the file document that should be updated.
:param now: datetime that reflects 'now', for consistent expiry generation.
project_id = str(
response['project']) if 'project' in response else None
# TODO: add project id to all files
backend = response['backend']
response['link'] = generate_link(backend, response['file_path'], project_id)
variations = response.get('variations')
if variations:
for variation in variations:
variation['link'] = generate_link(backend, variation['file_path'],
# Construct the new expiry datetime.
validity_secs = current_app.config['FILE_LINK_VALIDITY'][backend]
response['link_expires'] = now + datetime.timedelta(seconds=validity_secs)
patch_info = remove_private_keys(response)
file_id = ObjectId(response['_id'])
(patch_resp, _, _, _) = current_app.patch_internal('files', patch_info,
if patch_resp.get('_status') == 'ERR':
log.warning('Unable to save new links for file %s: %r',
response['_id'], patch_resp)
# TODO: raise a snag.
response['_updated'] = now
response['_updated'] = patch_resp['_updated']
# Be silly and re-fetch the etag ourselves. TODO: handle this better.
etag_doc =['files'].find_one({'_id': file_id},
{'_etag': 1})
response['_etag'] = etag_doc['_etag']
def before_deleting_file(item):
def on_pre_get_files(_, lookup):
# Override the HTTP header, we always want to fetch the document from
# MongoDB.
parsed_req = eve.utils.parse_request('files')
parsed_req.if_modified_since = None
# Only fetch it if the date got expired.
now =
lookup_expired = lookup.copy()
lookup_expired['link_expires'] = {'$lte': now}
cursor ='files', parsed_req, lookup_expired)
for file_doc in cursor:
# log.debug('Updating expired links for file %r.', file_doc['_id'])
_generate_all_links(file_doc, now)
def refresh_links_for_project(project_uuid, chunk_size, expiry_seconds):
if chunk_size:'Refreshing the first %i links for project %s',
chunk_size, project_uuid)
else:'Refreshing all links for project %s', project_uuid)
# Retrieve expired links.
files_collection =['files']
now =
expire_before = now + datetime.timedelta(seconds=expiry_seconds)'Limiting to links that expire before %s', expire_before)
to_refresh = files_collection.find(
{'project': ObjectId(project_uuid),
'link_expires': {'$lt': expire_before},
}).sort([('link_expires', pymongo.ASCENDING)]).limit(chunk_size)
if to_refresh.count() == 0:'No links to refresh.')
for file_doc in to_refresh:
log.debug('Refreshing links for file %s', file_doc['_id'])
_generate_all_links(file_doc, now)'Refreshed %i links', min(chunk_size, to_refresh.count()))
def refresh_links_for_backend(backend_name, chunk_size, expiry_seconds):
import gcloud.exceptions
# Retrieve expired links.
files_collection =['files']
proj_coll =['projects']
now =
expire_before = now + datetime.timedelta(seconds=expiry_seconds)'Limiting to links that expire before %s', expire_before)
to_refresh = files_collection.find(
{'$or': [{'backend': backend_name, 'link_expires': None},
{'backend': backend_name, 'link_expires': {
'$lt': expire_before}},
{'backend': backend_name, 'link': None}]
}).sort([('link_expires', pymongo.ASCENDING)]).limit(
if to_refresh.count() == 0:'No links to refresh.')
refreshed = 0
for file_doc in to_refresh:
file_id = file_doc['_id']
project_id = file_doc.get('project')
if project_id is None:
log.debug('Skipping file %s, it has no project.', file_id)
count = proj_coll.count({'_id': project_id, '$or': [
{'_deleted': {'$exists': False}},
{'_deleted': False},
if count == 0:
log.debug('Skipping file %s, project %s does not exist.',
file_id, project_id)
if 'file_path' not in file_doc:
log.warning("Skipping file %s, missing 'file_path' property.",
log.debug('Refreshing links for file %s', file_id)
_generate_all_links(file_doc, now)
except gcloud.exceptions.Forbidden:
log.warning('Skipping file %s, GCS forbids us access to '
'project %s bucket.', file_id, project_id)
refreshed += 1
except KeyboardInterrupt:
log.warning('Aborting due to KeyboardInterrupt after refreshing %i '
'links', refreshed)
return'Refreshed %i links', refreshed)
def create_file_doc(name, filename, content_type, length, project,
backend='gcs', **extra_fields):
"""Creates a minimal File document for storage in MongoDB.
Doesn't save it to MongoDB yet.
current_user = g.get('current_user')
file_doc = {'name': name,
'filename': filename,
'file_path': '',
'user': current_user['user_id'],
'backend': backend,
'md5': '',
'content_type': content_type,
'length': length,
'project': project}
return file_doc
def override_content_type(uploaded_file):
"""Overrides the content type based on file extensions.
:param uploaded_file: file from request.files['form-key']
:type uploaded_file: werkzeug.datastructures.FileStorage
# Possibly use the browser-provided mime type
mimetype = uploaded_file.mimetype
mimetype = OVERRIDE_MIMETYPES[mimetype]
except KeyError:
if '/' in mimetype:
mimecat = mimetype.split('/')[0]
if mimecat in {'video', 'audio', 'image'}:
# The browser's mime type is probably ok, just use it.
# And then use it to set the mime type.
(mimetype, encoding) = mimetypes.guess_type(uploaded_file.filename)
# Only override the mime type if we can detect it, otherwise just
# keep whatever the browser gave us.
if mimetype:
# content_type property can't be set directly
uploaded_file.headers['content-type'] = mimetype
# It has this, because we used uploaded_file.mimetype earlier this
# function.
del uploaded_file._parsed_content_type
def assert_file_size_allowed(file_size):
"""Asserts that the current user is allowed to upload a file of the given size.
roles = current_app.config['ROLES_FOR_UNLIMITED_UPLOADS']
if user_matches_roles(require_roles=roles):
filesize_limit = current_app.config['FILESIZE_LIMIT_BYTES_NONSUBS']
if file_size < filesize_limit:
filesize_limit_mb = filesize_limit / 2.0 ** 20'User %s tried to upload a %.3f MiB file, but is only allowed '
'%.3f MiB.',
authentication.current_user_id(), file_size / 2.0 ** 20,
raise wz_exceptions.RequestEntityTooLarge(
'To upload files larger than %i MiB, subscribe to Blender Cloud' %
@file_storage.route('/stream/<string:project_id>', methods=['POST', 'OPTIONS'])
def stream_to_gcs(project_id):
project_oid = utils.str2id(project_id)
projects =['projects']
project = projects.find_one(project_oid, projection={'_id': 1})
if not project:
raise wz_exceptions.NotFound('Project %s does not exist' % project_id)'Streaming file to bucket for project=%s user_id=%s', project_id,
authentication.current_user_id())'request.headers[Origin] = %r', request.headers.get('Origin'))
uploaded_file = request.files['file']
# Not every upload has a Content-Length header. If it was passed, we might
# as well check for its value before we require the user to upload the
# entire file. (At least I hope that this part of the code is processed
# before the body is read in its entirety)
if uploaded_file.content_length:
if not uploaded_file.content_type:
log.warning('File uploaded to project %s without content type.', project_oid)
raise wz_exceptions.BadRequest('Missing content type.')
if uploaded_file.content_type.startswith('image/'):
# We need to do local thumbnailing, so we have to write the stream
# both to Google Cloud Storage and to local storage.
local_file = tempfile.NamedTemporaryFile(dir=current_app.config['STORAGE_DIR']) # Make sure that a re-read starts from the beginning.
stream_for_gcs = local_file
local_file = None
stream_for_gcs =
# Figure out the file size, as we need to pass this in explicitly to GCloud.
# Otherwise it always uses os.fstat(file_obj.fileno()).st_size, which isn't
# supported by a BytesIO object (even though it does have a fileno attribute).
if isinstance(stream_for_gcs, io.BytesIO):
file_size = len(stream_for_gcs.getvalue())
file_size = os.fstat(stream_for_gcs.fileno()).st_size
# Check the file size again, now that we know its size for sure.
# Create file document in MongoDB.
file_id, internal_fname, status = create_file_doc_for_upload(project_oid, uploaded_file)
if current_app.config['TESTING']:
log.warning('NOT streaming to GCS because TESTING=%r', current_app.config['TESTING'])
# Fake a Blob object.
gcs = None
blob = type('Blob', (), {'size': file_size})
# Upload the file to GCS.
from gcloud.streaming import transfer
# Files larger than this many bytes will be streamed directly from disk, smaller
# ones will be read into memory and then uploaded.
gcs = GoogleCloudStorageBucket(project_id)
blob = gcs.bucket.blob('_/' + internal_fname, chunk_size=256 * 1024 * 2)
blob.upload_from_file(stream_for_gcs, size=file_size,
except Exception:
log.exception('Error uploading file to Google Cloud Storage (GCS),'
' aborting handling of uploaded file (id=%s).', file_id)
update_file_doc(file_id, status='failed')
raise wz_exceptions.InternalServerError('Unable to stream file to Google Cloud Storage')
if stream_for_gcs.closed:
log.error('Eek, GCS closed its stream, Andy is not going to like this.')
# Reload the blob to get the file size according to Google.
process_file(gcs, file_id, local_file)
# Local processing is done, we can close the local file so it is removed.
if local_file is not None:
log.debug('Handled uploaded file id=%s, fname=%s, size=%i', file_id, internal_fname, blob.size)
# Status is 200 if the file already existed, and 201 if it was newly created.
# TODO: add a link to a thumbnail in the response.
resp = jsonify(status='ok', file_id=str(file_id))
resp.status_code = status
return resp
def add_access_control_headers(resp):
"""Allows cross-site requests from the configured domain."""
if 'Origin' not in request.headers:
return resp
resp.headers['Access-Control-Allow-Origin'] = request.headers['Origin']
resp.headers['Access-Control-Allow-Credentials'] = 'true'
return resp
def update_file_doc(file_id, **updates):
files =['files']
res = files.update_one({'_id': ObjectId(file_id)},
{'$set': updates})
log.debug('update_file_doc(%s, %s): %i matched, %i updated.',
file_id, updates, res.matched_count, res.modified_count)
return res
def create_file_doc_for_upload(project_id, uploaded_file):
"""Creates a secure filename and a document in MongoDB for the file.
The (project_id, filename) tuple should be unique. If such a document already
exists, it is updated with the new file.
:param uploaded_file: file from request.files['form-key']
:type uploaded_file: werkzeug.datastructures.FileStorage
:returns: a tuple (file_id, filename, status), where 'filename' is the internal
filename used on GCS.
project_id = ObjectId(project_id)
# Hash the filename with path info to get the internal name. This should
# be unique for the project.
# internal_filename = uploaded_file.filename
_, ext = os.path.splitext(uploaded_file.filename)
internal_filename = uuid.uuid4().hex + ext
# For now, we don't support overwriting files, and create a new one every time.
# # See if we can find a pre-existing file doc.
# files =['files']
# file_doc = files.find_one({'project': project_id,
# 'name': internal_filename})
file_doc = None
# TODO: at some point do name-based and content-based content-type sniffing.
new_props = {'filename': uploaded_file.filename,
'content_type': uploaded_file.mimetype,
'length': uploaded_file.content_length,
'project': project_id,
'status': 'uploading'}
if file_doc is None:
# Create a file document on MongoDB for this file.
file_doc = create_file_doc(name=internal_filename, **new_props)
file_fields, _, _, status = current_app.post_internal('files', file_doc)
file_fields, _, _, status = current_app.put_internal('files', remove_private_keys(file_doc))
if status not in (200, 201):
log.error('Unable to create new file document in MongoDB, status=%i: %s',
status, file_fields)
raise wz_exceptions.InternalServerError()
return file_fields['_id'], internal_filename, status
def compute_aggregate_length(file_doc, original=None):
"""Computes the total length (in bytes) of the file and all variations.
Stores the result in file_doc['length_aggregate_in_bytes']
# Compute total size of all variations.
variations = file_doc.get('variations', ())
var_length = sum(var.get('length', 0) for var in variations)
file_doc['length_aggregate_in_bytes'] = file_doc.get('length', 0) + var_length
def compute_aggregate_length_items(file_docs):
for file_doc in file_docs:
def setup_app(app, url_prefix):
app.on_pre_GET_files += on_pre_get_files
app.on_fetched_item_files += before_returning_file
app.on_fetched_resource_files += before_returning_files
app.on_delete_item_files += before_deleting_file
app.on_update_files += compute_aggregate_length
app.on_replace_files += compute_aggregate_length
app.on_insert_files += compute_aggregate_length_items
app.register_api_blueprint(file_storage, url_prefix=url_prefix)

File Metadata

Mime Type
Sun, Sep 27, 6:29 PM (1 d, 23 h)
Storage Engine
Storage Format
Raw Data
Storage Handle

Event Timeline