Page Menu
Home
Search
Configure Global Search
Log In
Files
F13243333
local_auth.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
4 KB
Subscribers
None
local_auth.py
View Options
import
base64
import
hashlib
import
logging
import
typing
import
bcrypt
import
datetime
from
bson
import
tz_util
from
flask
import
abort
,
Blueprint
,
current_app
,
jsonify
,
request
from
pillar.api.utils.authentication
import
create_new_user_document
from
pillar.api.utils.authentication
import
make_unique_username
from
pillar.api.utils.authentication
import
store_token
blueprint
=
Blueprint
(
'authentication'
,
__name__
)
log
=
logging
.
getLogger
(
__name__
)
def
get_auth_credentials
(
user
,
provider
):
return
next
((
credentials
for
credentials
in
user
[
'auth'
]
if
'provider'
in
credentials
and
credentials
[
'provider'
]
==
provider
),
None
)
def
create_local_user
(
email
,
password
):
"""For internal user only. Given username and password, create a user."""
# Hash the password
hashed_password
=
hash_password
(
password
,
bcrypt
.
gensalt
())
db_user
=
create_new_user_document
(
email
,
''
,
email
,
provider
=
'local'
,
token
=
hashed_password
)
# Make username unique
db_user
[
'username'
]
=
make_unique_username
(
email
)
# Create the user
r
,
_
,
_
,
status
=
current_app
.
post_internal
(
'users'
,
db_user
)
if
status
!=
201
:
log
.
error
(
'internal response:
%r
%r
'
,
status
,
r
)
return
abort
(
500
)
# Return user ID
return
r
[
'_id'
]
def
get_local_user
(
username
,
password
):
# Look up user in db
users_collection
=
current_app
.
data
.
driver
.
db
[
'users'
]
user
=
users_collection
.
find_one
({
'username'
:
username
})
if
not
user
:
return
abort
(
403
)
# Check if user has "local" auth type
credentials
=
get_auth_credentials
(
user
,
'local'
)
if
not
credentials
:
return
abort
(
403
)
# Verify password
salt
=
credentials
[
'token'
]
hashed_password
=
hash_password
(
password
,
salt
)
if
hashed_password
!=
credentials
[
'token'
]:
return
abort
(
403
)
return
user
@blueprint.route
(
'/make-token'
,
methods
=
[
'POST'
])
def
make_token
():
"""Direct login for a user, without OAuth, using local database. Generates
a token that is passed back to Pillar Web and used in subsequent
transactions.
:return: a token string
"""
username
=
request
.
form
[
'username'
]
password
=
request
.
form
[
'password'
]
user
=
get_local_user
(
username
,
password
)
token
=
generate_and_store_token
(
user
[
'_id'
])
return
jsonify
(
token
=
token
[
'token'
])
def
generate_and_store_token
(
user_id
,
days
=
15
,
prefix
=
b
''
)
->
dict
:
"""Generates token based on random bits.
NOTE: the returned document includes the plain-text token.
DO NOT STORE OR LOG THIS unless there is a good reason to.
:param user_id: ObjectId of the owning user.
:param days: token will expire in this many days.
:param prefix: the token will be prefixed by these bytes, for easy identification.
:return: the token document with the token in plain text as well as hashed.
"""
if
not
isinstance
(
prefix
,
bytes
):
raise
TypeError
(
'prefix must be bytes, not
%s
'
%
type
(
prefix
))
import
secrets
random_bits
=
secrets
.
token_bytes
(
32
)
# Use 'xy' as altargs to prevent + and / characters from appearing.
# We never have to b64decode the string anyway.
token_bytes
=
prefix
+
base64
.
b64encode
(
random_bits
,
altchars
=
b
'xy'
)
.
strip
(
b
'='
)
token
=
token_bytes
.
decode
(
'ascii'
)
token_expiry
=
datetime
.
datetime
.
now
(
tz
=
tz_util
.
utc
)
+
datetime
.
timedelta
(
days
=
days
)
token_data
=
store_token
(
user_id
,
token
,
token_expiry
)
# Include the token in the returned document so that it can be stored client-side,
# in configuration, etc.
token_data
[
'token'
]
=
token
return
token_data
def
hash_password
(
password
:
str
,
salt
:
typing
.
Union
[
str
,
bytes
])
->
str
:
password
=
password
.
encode
()
if
isinstance
(
salt
,
str
):
salt
=
salt
.
encode
(
'utf-8'
)
hash
=
hashlib
.
sha256
(
password
)
.
digest
()
encoded_password
=
base64
.
b64encode
(
hash
)
hashed_password
=
bcrypt
.
hashpw
(
encoded_password
,
salt
)
return
hashed_password
.
decode
(
'ascii'
)
def
setup_app
(
app
,
url_prefix
):
app
.
register_api_blueprint
(
blueprint
,
url_prefix
=
url_prefix
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Jul 2, 5:57 PM (1 d, 23 h)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
cd/e7/10119dd0043be096487b47fc5074
Attached To
rPS Pillar
Event Timeline
Log In to Comment