Source code for webmachine.auth.oauth_store

# -*- coding: utf-8 -
#
# This file is part of dj-webmachine released under the MIT license. 
# See the NOTICE for more information.

from django.contrib.auth.models import AnonymousUser

from webmachine.models import Nonce, Consumer, Token
from webmachine.util import generate_random
from webmachine.util.const import VERIFIER_SIZE, TOKEN_REQUEST, TOKEN_ACCESS

[docs]class OAuthDataStore(object): """A database abstraction used to lookup consumers and tokens."""
[docs] def lookup_consumer(self, key): """-> OAuthConsumer.""" raise NotImplementedError
[docs] def lookup_token(self, token_type, key): """-> OAuthToken.""" raise NotImplementedError
[docs] def lookup_nonce(self, oauth_consumer, oauth_token, nonce): """-> OAuthToken.""" raise NotImplementedError
[docs] def fetch_request_token(self, oauth_consumer, oauth_callback, oauth_timestamp): """-> OAuthToken.""" raise NotImplementedError
[docs] def fetch_access_token(self, oauth_consumer, oauth_token, oauth_verifier, oauth_timestamp): """-> OAuthToken.""" raise NotImplementedError
[docs] def authorize_request_token(self, oauth_token, user): """-> OAuthToken.""" raise NotImplementedError
[docs]class DataStore(OAuthDataStore):
[docs] def lookup_consumer(self, key): try: self.consumer = Consumer.objects.get(key=key) except Consumer.DoesNotExist: return None return self.consumer
[docs] def lookup_token(self, token_type, key): try: self.request_token = Token.objects.get( token_type=token_type, key=key ) except Consumer.DoesNotExist: return None return self.request_token
[docs] def lookup_nonce(self, consumer, token, nonce): if not token: return nonce, created = Nonce.objects.get_or_create( consumer_key=consumer.key, token_key=token.key, nonce=nonce ) if created: return None return nonce
[docs] def fetch_request_token(self, consumer, callback, timestamp): if consumer.key == self.consumer.key: request_token = Token.objects.create_token( consumer=self.consumer, token_type=TOKEN_REQUEST, timestamp=timestamp ) if callback: self.request_token.set_callback(callback) self.request_token = request_token return request_token return None
[docs] def fetch_access_token(self, consumer, token, verifier, timestamp): if consumer.key == self.consumer.key \ and token.key == self.request_token.key \ and self.request_token.is_approved: if (self.request_token.callback_confirmed \ and verifier == self.request_token.verifier) \ or not self.request_token.callback_confirmed: self.access_token = Token.objects.create_token( consumer=self.consumer, token_type=TOKEN_ACCESS, timestamp=timestamp, user=self.request_token.user) return self.access_token return None
[docs] def authorize_request_token(self, oauth_token, user): if oauth_token.key == self.request_token.key: # authorize the request token in the store self.request_token.is_approved = True if not isinstance(user, AnonymousUser): self.request_token.user = user self.request_token.verifier = generate_random(VERIFIER_SIZE) self.request_token.save() return self.request_token return None