forked from aaronn/django-rest-framework-passwordless
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserializers.py
303 lines (239 loc) · 10.5 KB
/
serializers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import logging
from django.utils.translation import gettext_lazy as _
from django.contrib.auth import get_user_model
from django.core.exceptions import PermissionDenied
from django.core.validators import RegexValidator
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from drfpasswordless.models import CallbackToken
from drfpasswordless.settings import api_settings
from drfpasswordless.utils import verify_user_alias, validate_token_age
logger = logging.getLogger(__name__)
User = get_user_model()
class TokenField(serializers.CharField):
default_error_messages = {
'required': _('Invalid Token'),
'invalid': _('Invalid Token'),
'blank': _('Invalid Token'),
'max_length': _('Tokens are {max_length} digits long.'),
'min_length': _('Tokens are {min_length} digits long.')
}
class AbstractBaseAliasAuthenticationSerializer(serializers.Serializer):
"""
Abstract class that returns a callback token based on the field given
Returns a token if valid, None or a message if not.
"""
@property
def alias_type(self):
# The alias type, either email or mobile
raise NotImplementedError
def validate(self, attrs):
alias = attrs.get(self.alias_type)
if alias:
# Create or authenticate a user
# Return THem
if api_settings.PASSWORDLESS_REGISTER_NEW_USERS is True:
# If new aliases should register new users.
try:
user = User.objects.get(**{self.alias_type+'__iexact': alias})
except User.DoesNotExist:
user = User.objects.create(**{self.alias_type: alias})
user.set_unusable_password()
user.save()
else:
# If new aliases should not register new users.
try:
user = User.objects.get(**{self.alias_type+'__iexact': alias})
except User.DoesNotExist:
user = None
if user:
if not user.is_active:
# If valid, return attrs so we can create a token in our logic controller
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
else:
msg = _('No account is associated with this alias.')
raise serializers.ValidationError(msg)
else:
msg = _('Missing %s.') % self.alias_type
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
class EmailAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
@property
def alias_type(self):
return 'email'
email = serializers.EmailField()
class MobileAuthSerializer(AbstractBaseAliasAuthenticationSerializer):
@property
def alias_type(self):
return 'mobile'
phone_regex = RegexValidator(regex=r'^\+[1-9]\d{1,14}$',
message="Mobile number must be entered in the format:"
" '+999999999'. Up to 15 digits allowed.")
mobile = serializers.CharField(validators=[phone_regex], max_length=17)
"""
Verification
"""
class AbstractBaseAliasVerificationSerializer(serializers.Serializer):
"""
Abstract class that returns a callback token based on the field given
Returns a token if valid, None or a message if not.
"""
@property
def alias_type(self):
# The alias type, either email or mobile
raise NotImplementedError
def validate(self, attrs):
msg = _('There was a problem with your request.')
if self.alias_type:
# Get request.user
# Get their specified valid endpoint
# Validate
request = self.context["request"]
if request and hasattr(request, "user"):
user = request.user
if user:
if not user.is_active:
# If valid, return attrs so we can create a token in our logic controller
msg = _('User account is disabled.')
else:
if hasattr(user, self.alias_type):
# Has the appropriate alias type
attrs['user'] = user
return attrs
else:
msg = _('This user doesn\'t have an %s.' % self.alias_type)
raise serializers.ValidationError(msg)
else:
msg = _('Missing %s.') % self.alias_type
raise serializers.ValidationError(msg)
class EmailVerificationSerializer(AbstractBaseAliasVerificationSerializer):
@property
def alias_type(self):
return 'email'
class MobileVerificationSerializer(AbstractBaseAliasVerificationSerializer):
@property
def alias_type(self):
return 'mobile'
"""
Callback Token
"""
def token_age_validator(value):
"""
Check token age
Makes sure a token is within the proper expiration datetime window.
"""
valid_token = validate_token_age(value)
if not valid_token:
raise serializers.ValidationError("The token you entered isn't valid.")
return value
class AbstractBaseCallbackTokenSerializer(serializers.Serializer):
"""
Abstract class inspired by DRF's own token serializer.
Returns a user if valid, None or a message if not.
"""
phone_regex = RegexValidator(regex=r'^\+[1-9]\d{1,14}$',
message="Mobile number must be entered in the format:"
" '+999999999'. Up to 15 digits allowed.")
email = serializers.EmailField(required=False) # Needs to be required=false to require both.
mobile = serializers.CharField(required=False, validators=[phone_regex], max_length=17)
token = TokenField(min_length=6, max_length=6, validators=[token_age_validator])
def validate_alias(self, attrs):
email = attrs.get('email', None)
mobile = attrs.get('mobile', None)
if email and mobile:
raise serializers.ValidationError()
if not email and not mobile:
raise serializers.ValidationError()
if email:
return 'email', email
elif mobile:
return 'mobile', mobile
return None
class CallbackTokenAuthSerializer(AbstractBaseCallbackTokenSerializer):
def validate(self, attrs):
# Check Aliases
try:
alias_type, alias = self.validate_alias(attrs)
callback_token = attrs.get('token', None)
user = User.objects.get(**{alias_type+'__iexact': alias})
token = CallbackToken.objects.get(**{'user': user,
'key': callback_token,
'type': CallbackToken.TOKEN_TYPE_AUTH,
'is_active': True})
if token.user == user:
# Check the token type for our uni-auth method.
# authenticates and checks the expiry of the callback token.
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED \
or api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED:
# Mark this alias as verified
user = User.objects.get(pk=token.user.pk)
success = verify_user_alias(user, token)
if success is False:
msg = _('Error validating user alias.')
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
else:
msg = _('Invalid Token')
raise serializers.ValidationError(msg)
except CallbackToken.DoesNotExist:
msg = _('Invalid alias parameters provided.')
raise serializers.ValidationError(msg)
except User.DoesNotExist:
msg = _('Invalid user alias parameters provided.')
raise serializers.ValidationError(msg)
except ValidationError:
msg = _('Invalid alias parameters provided.')
raise serializers.ValidationError(msg)
class CallbackTokenVerificationSerializer(AbstractBaseCallbackTokenSerializer):
"""
Takes a user and a token, verifies the token belongs to the user and
validates the alias that the token was sent from.
"""
def validate(self, attrs):
try:
alias_type, alias = self.validate_alias(attrs)
user_id = self.context.get("user_id")
user = User.objects.get(**{'id': user_id, alias_type+'__iexact': alias})
callback_token = attrs.get('token', None)
token = CallbackToken.objects.get(**{'user': user,
'key': callback_token,
'type': CallbackToken.TOKEN_TYPE_VERIFY,
'is_active': True})
if token.user == user:
# Mark this alias as verified
success = verify_user_alias(user, token)
if success is False:
logger.debug("drfpasswordless: Error verifying alias.")
attrs['user'] = user
return attrs
else:
msg = _('This token is invalid. Try again later.')
logger.debug("drfpasswordless: User token mismatch when verifying alias.")
except CallbackToken.DoesNotExist:
msg = _('We could not verify this alias.')
logger.debug("drfpasswordless: Tried to validate alias with bad token.")
pass
except User.DoesNotExist:
msg = _('We could not verify this alias.')
logger.debug("drfpasswordless: Tried to validate alias with bad user.")
pass
except PermissionDenied:
msg = _('Insufficient permissions.')
logger.debug("drfpasswordless: Permission denied while validating alias.")
pass
raise serializers.ValidationError(msg)
"""
Responses
"""
class TokenResponseSerializer(serializers.Serializer):
"""
Our default response serializer.
"""
token = serializers.CharField(source='key')
key = serializers.CharField(write_only=True)