Page MenuHome

oauth.py
No OneTemporary

oauth.py

import abc
import attr
import json
import logging
from rauth import OAuth2Service
from flask import current_app, url_for, request, redirect, session, Response
@attr.s
class OAuthUserResponse:
"""Represents user information requested to an OAuth provider after
authenticating.
"""
id = attr.ib(validator=attr.validators.instance_of(str))
email = attr.ib(validator=attr.validators.instance_of(str))
class OAuthError(Exception):
"""Superclass of all exceptions raised by this module."""
class ProviderConfigurationMissing(OAuthError):
"""Raised when an OAuth provider is used but not configured."""
class ProviderNotImplemented(OAuthError):
"""Raised when a provider is requested that does not exist."""
class OAuthCodeNotProvided(OAuthError):
"""Raised when the 'code' arg is not provided in the OAuth callback."""
class ProviderNotConfigured:
"""Dummy class that indicates a provider isn't configured."""
class OAuthSignIn(metaclass=abc.ABCMeta):
provider_name: str = None # set in each subclass.
_providers = None # initialized in get_provider()
_log = logging.getLogger(f'{__name__}.OAuthSignIn')
def __init__(self):
credentials = current_app.config['OAUTH_CREDENTIALS'].get(self.provider_name)
if not credentials:
raise ProviderConfigurationMissing(
f'Missing OAuth credentials for {self.provider_name}')
self.consumer_id = credentials['id']
self.consumer_secret = credentials['secret']
# Set in a subclass
self.service: OAuth2Service = None
@abc.abstractmethod
def authorize(self) -> Response:
"""Redirect to the correct authorization endpoint for the current provider.
Depending on the provider, we sometimes have to specify a different
'scope'.
"""
pass
@abc.abstractmethod
def callback(self) -> OAuthUserResponse:
"""Callback performed after authorizing the user.
This is usually a request to a protected /me endpoint to query for
user information, such as user id and email address.
"""
pass
def get_callback_url(self):
return url_for('users.oauth_callback', provider=self.provider_name,
_external=True, _scheme=current_app.config['SCHEME'])
@staticmethod
def auth_code_from_request() -> str:
try:
return request.args['code']
except KeyError:
raise OAuthCodeNotProvided('A code argument was not provided in the request')
@staticmethod
def decode_json(payload):
return json.loads(payload.decode('utf-8'))
def make_oauth_session(self):
return self.service.get_auth_session(
data={'code': self.auth_code_from_request(),
'grant_type': 'authorization_code',
'redirect_uri': self.get_callback_url()},
decoder=self.decode_json
)
@classmethod
def get_provider(cls, provider_name) -> 'OAuthSignIn':
if cls._providers is None:
cls._init_providers()
try:
provider = cls._providers[provider_name]
except KeyError:
raise ProviderNotImplemented(f'No such OAuth provider {provider_name}')
if provider is ProviderNotConfigured:
raise ProviderConfigurationMissing(f'OAuth provider {provider_name} not configured')
return provider
@classmethod
def _init_providers(cls):
cls._providers = {}
for provider_class in cls.__subclasses__():
try:
provider = provider_class()
except ProviderConfigurationMissing:
cls._log.info('OAuth provider %s not configured',
provider_class.provider_name)
provider = ProviderNotConfigured
cls._providers[provider_class.provider_name] = provider
class BlenderIdSignIn(OAuthSignIn):
provider_name = 'blender-id'
def __init__(self):
super().__init__()
base_url = current_app.config['OAUTH_CREDENTIALS']['blender-id'].get(
'base_url', 'https://www.blender.org/id/')
self.service = OAuth2Service(
name='blender-id',
client_id=self.consumer_id,
client_secret=self.consumer_secret,
authorize_url='%soauth/authorize' % base_url,
access_token_url='%soauth/token' % base_url,
base_url='%sapi/' % base_url
)
def authorize(self):
return redirect(self.service.get_authorize_url(
scope='email',
response_type='code',
redirect_uri=self.get_callback_url())
)
def callback(self):
oauth_session = self.make_oauth_session()
# TODO handle exception for failed oauth or not authorized
session['blender_id_oauth_token'] = oauth_session.access_token
me = oauth_session.get('user').json()
return OAuthUserResponse(str(me['id']), me['email'])
class FacebookSignIn(OAuthSignIn):
provider_name = 'facebook'
def __init__(self):
super().__init__()
self.service = OAuth2Service(
name='facebook',
client_id=self.consumer_id,
client_secret=self.consumer_secret,
authorize_url='https://graph.facebook.com/oauth/authorize',
access_token_url='https://graph.facebook.com/oauth/access_token',
base_url='https://graph.facebook.com/'
)
def authorize(self):
return redirect(self.service.get_authorize_url(
scope='email',
response_type='code',
redirect_uri=self.get_callback_url())
)
def callback(self):
oauth_session = self.make_oauth_session()
me = oauth_session.get('me?fields=id,email').json()
# TODO handle case when user chooses not to disclose en email
# see https://developers.facebook.com/docs/graph-api/reference/user/
return OAuthUserResponse(me['id'], me.get('email'))
class GoogleSignIn(OAuthSignIn):
provider_name = 'google'
def __init__(self):
super().__init__()
self.service = OAuth2Service(
name='google',
client_id=self.consumer_id,
client_secret=self.consumer_secret,
authorize_url='https://accounts.google.com/o/oauth2/auth',
access_token_url='https://accounts.google.com/o/oauth2/token',
base_url='https://www.googleapis.com/oauth2/v1/'
)
def authorize(self):
return redirect(self.service.get_authorize_url(
scope='https://www.googleapis.com/auth/userinfo.email',
response_type='code',
redirect_uri=self.get_callback_url())
)
def callback(self):
oauth_session = self.make_oauth_session()
me = oauth_session.get('userinfo').json()
return OAuthUserResponse(str(me['id']), me['email'])

File Metadata

Mime Type
text/x-python
Expires
Thu, Oct 22, 9:20 AM (1 d, 23 h)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
ab/09/37a31dc56b521f14dd9f35b25b6c

Event Timeline