# -*- coding: utf-8 -
#
# This file is part of dj-webmachine released under the MIT license.
# See the NOTICE for more information.
from django.core.exceptions import ImproperlyConfigured
from django.conf import settings
from django.utils.importlib import import_module
try:
from restkit.util import oauth2
except ImportError:
raise ImportError("restkit packages is needed for auth.")
from webmachine.auth.base import Auth
from webmachine.util.const import TOKEN_REQUEST, TOKEN_ACCESS
def load_oauth_datastore():
datastore = getattr(settings, 'OAUTH_DATASTORE',
'webmachine.auth.oauth_store.DataStore')
i = datastore.rfind('.')
module, clsname = datastore[:i], datastore[i+1:]
try:
mod = import_module(module)
except ImportError:
raise ImproperlyConfigured("oauth datastore module '%s' isn't valid" % module)
try:
cls = getattr(mod, clsname)
except AttributeError:
raise ImproperlyConfigured("oauth datastore '%s' doesn't exist in '%s' module" % (clsname, module))
return cls
class OAuthServer(oauth2.Server):
def __init__(self, datastore):
self.datastore = datastore
super(OAuthServer, self).__init__()
def fetch_request_token(self, oauth_request):
"""Processes a request_token request and returns the
request token on success.
"""
try:
# Get the request token for authorization.
token = self._get_token(oauth_request, TOKEN_REQUEST)
except oauth2.Error:
# No token required for the initial token request.
timestamp = self._get_timestamp(oauth_request)
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
try:
callback = self.get_callback(oauth_request)
except oauth2.Error:
callback = None # 1.0, no callback specified.
#hack
self._check_signature(oauth_request, consumer, None)
# Fetch a new token.
token = self.datastore.fetch_request_token(consumer,
callback, timestamp)
return token
def fetch_access_token(self, oauth_request):
"""Processes an access_token request and returns the
access token on success.
"""
timestamp = self._get_timestamp(oauth_request)
version = self._get_version(oauth_request)
consumer = self._get_consumer(oauth_request)
try:
verifier = self._get_verifier(oauth_request)
except oauth2.Error:
verifier = None
# Get the request token.
token = self._get_token(oauth_request, TOKEN_REQUEST)
self._check_signature(oauth_request, consumer, token)
new_token = self.datastore.fetch_access_token(consumer, token,
verifier, timestamp)
return new_token
def verify_request(self, oauth_request):
consumer = self._get_consumer(oauth_request)
token = self._get_token(oauth_request, TOKEN_ACCESS)
parameters = super(OAuthServer, self).verify_request(oauth_request,
consumer, token)
return consumer, token, parameters
def authorize_token(self, token, user):
"""Authorize a request token."""
return self.datastore.authorize_request_token(token, user)
def get_callback(self, oauth_request):
"""Get the callback URL."""
return oauth_request.get_parameter('oauth_callback')
def _get_consumer(self, oauth_request):
consumer_key = oauth_request.get_parameter('oauth_consumer_key')
consumer = self.datastore.lookup_consumer(consumer_key)
if not consumer:
raise oauth2.Error('Invalid consumer.')
return consumer
def _get_token(self, oauth_request, token_type=TOKEN_ACCESS):
"""Try to find the token for the provided request token key."""
token_field = oauth_request.get_parameter('oauth_token')
token = self.datastore.lookup_token(token_type, token_field)
if not token:
raise oauth2.Error('Invalid %s token: %s' % (token_type, token_field))
return token
def _check_nonce(self, consumer, token, nonce):
"""Verify that the nonce is uniqueish."""
nonce = self.datastore.lookup_nonce(consumer, token, nonce)
if nonce:
raise oauth2.Error('Nonce already used: %s' % str(nonce))
def _get_timestamp(self, oauth_request):
return int(oauth_request.get_parameter('oauth_timestamp'))
[docs]class Oauth(Auth):
def __init__(self, realm="OAuth"):
oauth_datastore = load_oauth_datastore()
self.realm = realm
self.oauth_server = OAuthServer(oauth_datastore())
self.oauth_server.add_signature_method(oauth2.SignatureMethod_PLAINTEXT())
self.oauth_server.add_signature_method(oauth2.SignatureMethod_HMAC_SHA1())
[docs] def authorized(self, req, resp):
params = {}
headers = {}
if req.method == "POST":
params = req.REQUEST.items()
if 'HTTP_AUTHORIZATION' in req.META:
headers['Authorization'] = req.META.get('HTTP_AUTHORIZATION')
oauth_request = oauth2.Request.from_request(req.method,
req.build_absolute_uri(), headers=headers,
parameters=params,
query_string=req.META.get('QUERY_STRING'))
if not oauth_request:
return 'OAuth realm="%s"' % self.realm
try:
consumer, token, params = self.oauth_server.verify_request(oauth_request)
except oauth2.Error, err:
resp.content = str(err)
return 'OAuth realm="%s"' % self.realm
req.user = consumer.user
return True