Page Menu
Home
Search
Configure Global Search
Log In
Files
F14228921
test_file_storage.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
10 KB
Subscribers
None
test_file_storage.py
View Options
import
io
import
json
import
os
import
tempfile
import
pillar.tests.common_test_data
as
ctd
import
rsa.randnum
from
pillar.tests
import
AbstractPillarTest
,
TEST_EMAIL_ADDRESS
from
werkzeug.datastructures
import
FileStorage
class
FileStorageTest
(
AbstractPillarTest
):
def
fake_file
(
self
,
filename
,
content_type
):
return
FileStorage
(
filename
=
filename
,
name
=
'file'
,
# form field name
content_type
=
content_type
)
def
test_override_content_type
(
self
):
from
pillar.api.file_storage
import
override_content_type
fake
=
self
.
fake_file
(
'compressed.blend'
,
'jemoeder'
)
override_content_type
(
fake
)
self
.
assertEqual
(
'application/x-blender'
,
fake
.
content_type
)
self
.
assertEqual
(
'application/x-blender'
,
fake
.
mimetype
)
fake
=
self
.
fake_file
(
'blend.mp3'
,
'application/octet-stream'
)
override_content_type
(
fake
)
self
.
assertEqual
(
'audio/mpeg'
,
fake
.
content_type
)
self
.
assertEqual
(
'audio/mpeg'
,
fake
.
mimetype
)
# Official one is audio/mpeg, but if the browser gives audio/XXX, it should
# just be used.
fake
=
self
.
fake_file
(
'blend.mp3'
,
'audio/mp3'
)
override_content_type
(
fake
)
self
.
assertEqual
(
'audio/mp3'
,
fake
.
content_type
)
self
.
assertEqual
(
'audio/mp3'
,
fake
.
mimetype
)
fake
=
self
.
fake_file
(
'mp3.mkv'
,
'application/octet-stream'
)
override_content_type
(
fake
)
self
.
assertEqual
(
'video/x-matroska'
,
fake
.
content_type
)
self
.
assertEqual
(
'video/x-matroska'
,
fake
.
mimetype
)
fake
=
self
.
fake_file
(
'mkv.mp3.avi.mp4'
,
'application/octet-stream'
)
override_content_type
(
fake
)
self
.
assertEqual
(
'video/mp4'
,
fake
.
content_type
)
self
.
assertEqual
(
'video/mp4'
,
fake
.
mimetype
)
fake
=
self
.
fake_file
(
'mkv.mp3.avi.mp4.unknown'
,
'application/awesome-type'
)
override_content_type
(
fake
)
self
.
assertEqual
(
'application/awesome-type'
,
fake
.
content_type
)
self
.
assertEqual
(
'application/awesome-type'
,
fake
.
mimetype
)
class
TempDirTest
(
AbstractPillarTest
):
def
test_tempfiles_location
(
self
):
# After importing the application, tempfiles should be created in the STORAGE_DIR
storage
=
self
.
app
.
config
[
'STORAGE_DIR'
]
self
.
assertEqual
(
os
.
environ
[
'TMP'
],
storage
)
self
.
assertNotIn
(
'TEMP'
,
os
.
environ
)
self
.
assertNotIn
(
'TMPDIR'
,
os
.
environ
)
handle
,
filename
=
tempfile
.
mkstemp
()
os
.
close
(
handle
)
dirname
=
os
.
path
.
dirname
(
filename
)
self
.
assertEqual
(
dirname
,
storage
)
tmpfile
=
tempfile
.
NamedTemporaryFile
()
dirname
=
os
.
path
.
dirname
(
tmpfile
.
name
)
self
.
assertEqual
(
dirname
,
storage
)
class
FileAccessTest
(
AbstractPillarTest
):
def
__test_link_stripping
(
self
):
"""Subscribers should get all links, but non-subscribers only a subset."""
img_file_id
,
_
=
self
.
ensure_file_exists
()
video_file_id
,
_
=
self
.
ensure_file_exists
({
u'_id'
:
None
,
u'content_type'
:
u'video/matroska'
,
'variations'
:
[
{
'format'
:
'mp4'
,
'height'
:
446
,
'width'
:
1064
,
'length'
:
219399183
,
'link'
:
'https://hosting/filename.mp4'
,
'content_type'
:
'video/mp4'
,
'duration'
:
44
,
'size'
:
'446p'
,
'file_path'
:
'c1/c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0.mp4'
,
'md5'
:
'c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0'
},
{
'format'
:
'webm'
,
'height'
:
446
,
'width'
:
1064
,
'length'
:
31219520
,
'link'
:
'https://hosting/filename.webm'
,
'content_type'
:
'video/webm'
,
'duration'
:
44
,
'md5'
:
'c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0'
,
'file_path'
:
'c1/c1f7b71c248c03468b2bb3e7c9f0c4e5cdb9d6d0.webm'
,
'size'
:
'446p'
}
]
})
blend_file_id
,
_
=
self
.
ensure_file_exists
({
u'_id'
:
None
,
u'content_type'
:
u'application/x-blender'
,
u'variations'
:
None
})
nonsub_user_id
=
self
.
create_user
(
user_id
=
'cafef00dcafef00d00000000'
,
roles
=
())
sub_user_id
=
self
.
create_user
(
user_id
=
'cafef00dcafef00dcafef00d'
,
roles
=
(
u'subscriber'
,))
demo_user_id
=
self
.
create_user
(
user_id
=
'cafef00dcafef00ddeadbeef'
,
roles
=
(
u'demo'
,))
admin_user_id
=
self
.
create_user
(
user_id
=
'aaaaaaaaaaaaaaaaaaaaaaaa'
,
roles
=
(
u'admin'
,))
self
.
create_valid_auth_token
(
nonsub_user_id
,
'nonsub-token'
)
self
.
create_valid_auth_token
(
sub_user_id
,
'sub-token'
)
self
.
create_valid_auth_token
(
demo_user_id
,
'demo-token'
)
self
.
create_valid_auth_token
(
admin_user_id
,
'admin-token'
)
def
assert_variations
(
file_id
,
has_access
,
token
=
None
):
if
token
:
headers
=
{
'Authorization'
:
self
.
make_header
(
token
)}
else
:
headers
=
None
resp
=
self
.
client
.
get
(
'/api/files/
%s
'
%
file_id
,
headers
=
headers
)
self
.
assertEqual
(
200
,
resp
.
status_code
)
file_info
=
json
.
loads
(
resp
.
data
)
self
.
assertEqual
(
has_access
,
'link'
in
file_info
)
self
.
assertEqual
(
has_access
,
'link_expires'
in
file_info
)
return
file_info
# Unauthenticated user and non-subscriber should still get the file, but limited.
file_info
=
assert_variations
(
img_file_id
,
False
)
self
.
assertEqual
({
't'
,
'h'
,
'b'
},
{
var
[
'size'
]
for
var
in
file_info
[
'variations'
]})
file_info
=
assert_variations
(
img_file_id
,
False
,
'nonsub-token'
)
self
.
assertEqual
({
't'
,
'h'
,
'b'
},
{
var
[
'size'
]
for
var
in
file_info
[
'variations'
]})
# Authenticated subscribers, demos and admins should get the full file.
file_info
=
assert_variations
(
img_file_id
,
True
,
'sub-token'
)
self
.
assertEqual
({
't'
,
'h'
,
'b'
},
{
var
[
'size'
]
for
var
in
file_info
[
'variations'
]})
file_info
=
assert_variations
(
img_file_id
,
True
,
'demo-token'
)
self
.
assertEqual
({
't'
,
'h'
,
'b'
},
{
var
[
'size'
]
for
var
in
file_info
[
'variations'
]})
file_info
=
assert_variations
(
img_file_id
,
True
,
'admin-token'
)
self
.
assertEqual
({
't'
,
'h'
,
'b'
},
{
var
[
'size'
]
for
var
in
file_info
[
'variations'
]})
# Unauthenticated user and non-subscriber should get no links what so ever.
file_info
=
assert_variations
(
video_file_id
,
False
)
self
.
assertEqual
([],
file_info
[
'variations'
])
file_info
=
assert_variations
(
video_file_id
,
False
,
'nonsub-token'
)
self
.
assertEqual
([],
file_info
[
'variations'
])
# Authenticated subscribers, demos and admins should get the full file.
file_info
=
assert_variations
(
video_file_id
,
True
,
'sub-token'
)
self
.
assertEqual
({
'mp4'
,
'webm'
},
{
var
[
'format'
]
for
var
in
file_info
[
'variations'
]})
file_info
=
assert_variations
(
video_file_id
,
True
,
'demo-token'
)
self
.
assertEqual
({
'mp4'
,
'webm'
},
{
var
[
'format'
]
for
var
in
file_info
[
'variations'
]})
file_info
=
assert_variations
(
video_file_id
,
True
,
'admin-token'
)
self
.
assertEqual
({
'mp4'
,
'webm'
},
{
var
[
'format'
]
for
var
in
file_info
[
'variations'
]})
# Unauthenticated user and non-subscriber should get no links what so ever.
file_info
=
assert_variations
(
blend_file_id
,
False
)
self
.
assertIsNone
(
file_info
[
'variations'
])
file_info
=
assert_variations
(
blend_file_id
,
False
,
'nonsub-token'
)
self
.
assertIsNone
(
file_info
[
'variations'
])
# Authenticated subscribers, demos and admins should get the full file.
file_info
=
assert_variations
(
blend_file_id
,
True
,
'sub-token'
)
self
.
assertIsNone
(
file_info
[
'variations'
])
file_info
=
assert_variations
(
blend_file_id
,
True
,
'demo-token'
)
self
.
assertIsNone
(
file_info
[
'variations'
])
file_info
=
assert_variations
(
blend_file_id
,
True
,
'admin-token'
)
self
.
assertIsNone
(
file_info
[
'variations'
])
class
FileMaxSizeTest
(
AbstractPillarTest
):
def
setUp
(
self
,
**
kwargs
):
AbstractPillarTest
.
setUp
(
self
,
**
kwargs
)
self
.
project_id
,
_
=
self
.
ensure_project_exists
()
self
.
user_id
=
self
.
create_user
(
groups
=
[
ctd
.
EXAMPLE_ADMIN_GROUP_ID
],
roles
=
set
())
self
.
create_valid_auth_token
(
self
.
user_id
,
'token'
)
def
test_upload_small_file
(
self
):
file_size
=
10
*
2
**
10
test_file
=
self
.
create_test_file
(
file_size
)
resp
=
self
.
post
(
'/api/storage/stream/
%s
'
%
self
.
project_id
,
expected_status
=
201
,
auth_token
=
'token'
,
files
=
{
'file'
:
(
test_file
,
'test_file.bin'
)})
stream_info
=
resp
.
json
()
file_id
=
stream_info
[
'file_id'
]
self
.
assert_file_doc_ok
(
file_id
,
file_size
)
def
test_upload_too_large_file
(
self
):
file_size
=
30
*
2
**
10
test_file
=
self
.
create_test_file
(
file_size
)
self
.
post
(
'/api/storage/stream/
%s
'
%
self
.
project_id
,
expected_status
=
413
,
auth_token
=
'token'
,
files
=
{
'file'
:
(
test_file
,
'test_file.bin'
)})
def
test_upload_large_file_subscriber
(
self
):
self
.
badger
(
TEST_EMAIL_ADDRESS
,
'subscriber'
,
'grant'
)
file_size
=
30
*
2
**
10
test_file
=
self
.
create_test_file
(
file_size
)
resp
=
self
.
post
(
'/api/storage/stream/
%s
'
%
self
.
project_id
,
expected_status
=
201
,
auth_token
=
'token'
,
files
=
{
'file'
:
(
test_file
,
'test_file.bin'
)})
stream_info
=
resp
.
json
()
file_id
=
stream_info
[
'file_id'
]
self
.
assert_file_doc_ok
(
file_id
,
file_size
)
def
assert_file_doc_ok
(
self
,
file_id
,
file_size
):
with
self
.
app
.
test_request_context
():
from
pillar.api.utils
import
str2id
# Check that the file exists in MongoDB
files_coll
=
self
.
app
.
data
.
driver
.
db
[
'files'
]
db_file
=
files_coll
.
find_one
({
'_id'
:
str2id
(
file_id
)})
self
.
assertEqual
(
file_size
,
db_file
[
'length'
])
def
create_test_file
(
self
,
file_size_bytes
):
fileob
=
io
.
BytesIO
(
rsa
.
randnum
.
read_random_bits
(
file_size_bytes
*
8
))
return
fileob
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Feb 3, 11:21 PM (2 d)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
9b/81/e16ac6203dc36cd74f347ff62ef6
Attached To
rPS Pillar
Event Timeline
Log In to Comment