forked from aaronn/django-rest-framework-passwordless
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsignals.py
148 lines (120 loc) · 7.23 KB
/
signals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import logging
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.dispatch import receiver
from django.db.models import signals
from drfpasswordless.models import CallbackToken
from drfpasswordless.models import generate_numeric_token
from drfpasswordless.settings import api_settings
from drfpasswordless.services import TokenService
logger = logging.getLogger(__name__)
@receiver(signals.post_save, sender=CallbackToken)
def invalidate_previous_tokens(sender, instance, created, **kwargs):
"""
Invalidates all previously issued tokens of that type when a new one is created, used, or anything like that.
"""
if instance.user.pk in api_settings.PASSWORDLESS_DEMO_USERS.keys():
return
if isinstance(instance, CallbackToken):
CallbackToken.objects.active().filter(user=instance.user, type=instance.type).exclude(id=instance.id).update(is_active=False)
@receiver(signals.pre_save, sender=CallbackToken)
def check_unique_tokens(sender, instance, **kwargs):
"""
Ensures that mobile and email tokens are unique or tries once more to generate.
Note that here we've decided keys are unique even across auth and validation.
We could consider relaxing this in the future as well by filtering on the instance.type.
"""
if instance._state.adding:
# save is called on a token to create it in the db
# before creating check whether a token with the same key exists
if isinstance(instance, CallbackToken):
unique = False
tries = 0
if CallbackToken.objects.filter(key=instance.key, is_active=True).exists():
# Try N(default=3) times before giving up.
while tries < api_settings.PASSWORDLESS_TOKEN_GENERATION_ATTEMPTS:
tries = tries + 1
new_key = generate_numeric_token()
instance.key = new_key
if not CallbackToken.objects.filter(key=instance.key, is_active=True).exists():
# Leave the loop if we found a valid token that doesn't exist yet.
unique = True
break
if not unique:
# A unique value wasn't found after three tries
raise ValidationError("Couldn't create a unique token even after retrying.")
else:
# A unique value was found immediately.
pass
else:
# save is called on an already existing token to update it. Such as invalidating it.
# in that case there is no need to check for the key. This way we both avoid an unneccessary db hit
# and avoid to change key field of used tokens.
pass
User = get_user_model()
@receiver(signals.pre_save, sender=User)
def update_alias_verification(sender, instance, **kwargs):
"""
Flags a user's email as unverified if they change it.
Optionally sends a verification token to the new endpoint.
"""
if isinstance(instance, User):
if instance.id:
if api_settings.PASSWORDLESS_USER_MARK_EMAIL_VERIFIED is True:
"""
For marking email aliases as not verified when a user changes it.
"""
email_field = api_settings.PASSWORDLESS_USER_EMAIL_FIELD_NAME
email_verified_field = api_settings.PASSWORDLESS_USER_EMAIL_VERIFIED_FIELD_NAME
# Verify that this is an existing instance and not a new one.
try:
user_old = User.objects.get(id=instance.id) # Pre-save object
instance_email = getattr(instance, email_field) # Incoming Email
old_email = getattr(user_old, email_field) # Pre-save object email
if instance_email != old_email and instance_email != "" and instance_email is not None:
# Email changed, verification should be flagged
setattr(instance, email_verified_field, False)
if api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN is True:
email_subject = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_SUBJECT
email_plaintext = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_PLAINTEXT_MESSAGE
email_html = api_settings.PASSWORDLESS_EMAIL_VERIFICATION_TOKEN_HTML_TEMPLATE_NAME
message_payload = {'email_subject': email_subject,
'email_plaintext': email_plaintext,
'email_html': email_html}
success = TokenService.send_token(instance, 'email', CallbackToken.TOKEN_TYPE_VERIFY, **message_payload)
if success:
logger.info('drfpasswordless: Successfully sent email on updated address: %s'
% instance_email)
else:
logger.info('drfpasswordless: Failed to send email to updated address: %s'
% instance_email)
except User.DoesNotExist:
# User probably is just initially being created
return
if api_settings.PASSWORDLESS_USER_MARK_MOBILE_VERIFIED is True:
"""
For marking mobile aliases as not verified when a user changes it.
"""
mobile_field = api_settings.PASSWORDLESS_USER_MOBILE_FIELD_NAME
mobile_verified_field = api_settings.PASSWORDLESS_USER_MOBILE_VERIFIED_FIELD_NAME
# Verify that this is an existing instance and not a new one.
try:
user_old = User.objects.get(id=instance.id) # Pre-save object
instance_mobile = getattr(instance, mobile_field) # Incoming mobile
old_mobile = getattr(user_old, mobile_field) # Pre-save object mobile
if instance_mobile != old_mobile and instance_mobile != "" and instance_mobile is not None:
# Mobile changed, verification should be flagged
setattr(instance, mobile_verified_field, False)
if api_settings.PASSWORDLESS_AUTO_SEND_VERIFICATION_TOKEN is True:
mobile_message = api_settings.PASSWORDLESS_MOBILE_MESSAGE
message_payload = {'mobile_message': mobile_message}
success = TokenService.send_token(instance, 'mobile', CallbackToken.TOKEN_TYPE_VERIFY, **message_payload)
if success:
logger.info('drfpasswordless: Successfully sent SMS on updated mobile: %s'
% instance_mobile)
else:
logger.info('drfpasswordless: Failed to send SMS to updated mobile: %s'
% instance_mobile)
except User.DoesNotExist:
# User probably is just initially being created
pass