1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
import base64
import pickle
from collections import UserDict
from openid.association import Association as OIDAssociation
from openid.extensions.ax import FetchResponse
from openid.extensions.sreg import SRegResponse
from openid.store.interface import OpenIDStore as OIDStore
from allauth.utils import valid_email_or_none
from .models import OpenIDNonce, OpenIDStore
class JSONSafeSession(UserDict):
"""
openid puts e.g. class OpenIDServiceEndpoint in the session.
Django 1.6 no longer pickles stuff, so we'll need to do some
hacking here...
"""
def __init__(self, session):
UserDict.__init__(self)
self.data = session
def __setitem__(self, key, value):
data = base64.b64encode(pickle.dumps(value)).decode("ascii")
return UserDict.__setitem__(self, key, data)
def __getitem__(self, key):
data = UserDict.__getitem__(self, key)
return pickle.loads(base64.b64decode(data.encode("ascii")))
class OldAXAttribute:
PERSON_NAME = "http://openid.net/schema/namePerson"
PERSON_FIRST_NAME = "http://openid.net/schema/namePerson/first"
PERSON_LAST_NAME = "http://openid.net/schema/namePerson/last"
class AXAttribute:
CONTACT_EMAIL = "http://axschema.org/contact/email"
PERSON_NAME = "http://axschema.org/namePerson"
PERSON_FIRST_NAME = "http://axschema.org/namePerson/first"
PERSON_LAST_NAME = "http://axschema.org/namePerson/last"
AXAttributes = [
AXAttribute.CONTACT_EMAIL,
AXAttribute.PERSON_NAME,
AXAttribute.PERSON_FIRST_NAME,
AXAttribute.PERSON_LAST_NAME,
OldAXAttribute.PERSON_NAME,
OldAXAttribute.PERSON_FIRST_NAME,
OldAXAttribute.PERSON_LAST_NAME,
]
class SRegField:
EMAIL = "email"
NAME = "fullname"
SRegFields = [
SRegField.EMAIL,
SRegField.NAME,
]
class DBOpenIDStore(OIDStore):
max_nonce_age = 6 * 60 * 60
def storeAssociation(self, server_url, assoc=None):
try:
secret = base64.encodebytes(assoc.secret)
except AttributeError:
# Python 2.x compat
secret = base64.encodestring(assoc.secret)
else:
secret = secret.decode()
OpenIDStore.objects.create(
server_url=server_url,
handle=assoc.handle,
secret=secret,
issued=assoc.issued,
lifetime=assoc.lifetime,
assoc_type=assoc.assoc_type,
)
def getAssociation(self, server_url, handle=None):
stored_assocs = OpenIDStore.objects.filter(server_url=server_url)
if handle:
stored_assocs = stored_assocs.filter(handle=handle)
stored_assocs.order_by("-issued")
if stored_assocs.count() == 0:
return None
return_val = None
for stored_assoc in stored_assocs:
assoc = OIDAssociation(
stored_assoc.handle,
base64.decodebytes(stored_assoc.secret.encode("utf-8")),
stored_assoc.issued,
stored_assoc.lifetime,
stored_assoc.assoc_type,
)
# See:
# necaris/python3-openid@1abb155c8fc7b508241cbe9d2cae24f18e4a379b
if hasattr(assoc, "getExpiresIn"):
expires_in = assoc.getExpiresIn()
else:
expires_in = assoc.expiresIn
if expires_in == 0:
stored_assoc.delete()
else:
if return_val is None:
return_val = assoc
return return_val
def removeAssociation(self, server_url, handle):
stored_assocs = OpenIDStore.objects.filter(server_url=server_url)
if handle:
stored_assocs = stored_assocs.filter(handle=handle)
stored_assocs.delete()
def useNonce(self, server_url, timestamp, salt):
try:
OpenIDNonce.objects.get(
server_url=server_url, timestamp=timestamp, salt=salt
)
except OpenIDNonce.DoesNotExist:
OpenIDNonce.objects.create(
server_url=server_url, timestamp=timestamp, salt=salt
)
return True
return False
def get_email_from_response(response):
email = None
sreg = SRegResponse.fromSuccessResponse(response)
if sreg:
email = valid_email_or_none(sreg.get(SRegField.EMAIL))
if not email:
ax = FetchResponse.fromSuccessResponse(response)
if ax:
try:
values = ax.get(AXAttribute.CONTACT_EMAIL)
if values:
email = valid_email_or_none(values[0])
except KeyError:
pass
return email
def get_value_from_response(response, sreg_names=None, ax_names=None):
value = None
if sreg_names:
sreg = SRegResponse.fromSuccessResponse(response)
if sreg:
for name in sreg_names:
value = sreg.get(name)
if value:
break
if not value and ax_names:
ax = FetchResponse.fromSuccessResponse(response)
if ax:
for name in ax_names:
try:
values = ax.get(name)
if values:
value = values[0]
except KeyError:
pass
if value:
break
return value
|