Skip to content

Commit c1cd5d7

Browse files
symptogjleclanche
authored andcommitted
Implement IntrospectTokenView (RFC 7662)
1 parent 8ef945e commit c1cd5d7

File tree

7 files changed

+361
-1
lines changed

7 files changed

+361
-1
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# -*- coding: utf-8 -*-
2+
# Generated by Django 1.11.1 on 2017-05-14 11:41
3+
from __future__ import unicode_literals
4+
5+
from oauth2_provider.settings import oauth2_settings
6+
from django.db import migrations, models
7+
import django.db.models.deletion
8+
9+
10+
class Migration(migrations.Migration):
11+
12+
dependencies = [
13+
('oauth2_provider', '0004_auto_20160525_1623'),
14+
]
15+
16+
operations = [
17+
migrations.AlterField(
18+
model_name='accesstoken',
19+
name='application',
20+
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=oauth2_settings.APPLICATION_MODEL),
21+
),
22+
]

oauth2_provider/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class AbstractAccessToken(models.Model):
211211
on_delete=models.CASCADE,
212212
related_name="%(app_label)s_%(class)s")
213213
token = models.CharField(max_length=255, unique=True, )
214-
application = models.ForeignKey(oauth2_settings.APPLICATION_MODEL,
214+
application = models.ForeignKey(oauth2_settings.APPLICATION_MODEL, blank=True, null=True,
215215
on_delete=models.CASCADE)
216216
expires = models.DateTimeField()
217217
scope = models.TextField(blank=True)

oauth2_provider/urls.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
url(r'^authorize/$', views.AuthorizationView.as_view(), name="authorize"),
1313
url(r'^token/$', views.TokenView.as_view(), name="token"),
1414
url(r'^revoke_token/$', views.RevokeTokenView.as_view(), name="revoke-token"),
15+
url(r'^introspect/$', views.IntrospectTokenView.as_view(), name="introspect"),
1516
]
1617

1718

oauth2_provider/views/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
ApplicationDelete, ApplicationUpdate
55
from .generic import ProtectedResourceView, ScopedProtectedResourceView, ReadWriteScopedResourceView
66
from .token import AuthorizedTokensListView, AuthorizedTokenDeleteView
7+
from .introspect import IntrospectTokenView
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from __future__ import unicode_literals
2+
3+
import calendar
4+
import json
5+
6+
from django.core.exceptions import ObjectDoesNotExist
7+
from django.http import HttpResponse
8+
from django.utils.decorators import method_decorator
9+
from django.views.decorators.csrf import csrf_exempt
10+
11+
from oauth2_provider.models import get_access_token_model
12+
from oauth2_provider.views import ReadWriteScopedResourceView
13+
14+
15+
@method_decorator(csrf_exempt, name="dispatch")
16+
class IntrospectTokenView(ReadWriteScopedResourceView):
17+
"""
18+
Implements an endpoint for token introspection based
19+
on RFC 7662 https://tools.ietf.org/html/rfc7662
20+
21+
To access this view the request must pass a OAuth2 Bearer Token
22+
which is allowed to access the scope `introspection`.
23+
"""
24+
required_scopes = ["introspection"]
25+
26+
@staticmethod
27+
def get_token_response(token_value=None):
28+
try:
29+
token = get_access_token_model().objects.get(token=token_value)
30+
except ObjectDoesNotExist:
31+
return HttpResponse(
32+
content=json.dumps({"active": False}),
33+
status=401,
34+
content_type="application/json"
35+
)
36+
else:
37+
if token.is_valid():
38+
data = {
39+
"active": True,
40+
"scope": token.scope,
41+
"exp": int(calendar.timegm(token.expires.timetuple())),
42+
}
43+
if token.application:
44+
data["client_id"] = token.application.client_id
45+
if token.user:
46+
data["username"] = token.user.get_username()
47+
return HttpResponse(content=json.dumps(data), status=200, content_type="application/json")
48+
else:
49+
return HttpResponse(content=json.dumps({
50+
"active": False,
51+
}), status=200, content_type="application/json")
52+
53+
def get(self, request, *args, **kwargs):
54+
"""
55+
Get the token from the URL parameters.
56+
URL: https://example.com/introspect?token=mF_9.B5f-4.1JqM
57+
58+
:param request:
59+
:param args:
60+
:param kwargs:
61+
:return:
62+
"""
63+
return self.get_token_response(request.GET.get("token", None))
64+
65+
def post(self, request, *args, **kwargs):
66+
"""
67+
Get the token from the body form parameters.
68+
Body: token=mF_9.B5f-4.1JqM
69+
70+
:param request:
71+
:param args:
72+
:param kwargs:
73+
:return:
74+
"""
75+
return self.get_token_response(request.POST.get("token", None))

setup.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ zip_safe = False
3030
install_requires =
3131
django >= 1.8
3232
oauthlib >= 2.0.1
33+
requests >= 2.13.0
3334

3435
[options.packages.find]
3536
exclude = tests

tests/test_introspection_view.py

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
from __future__ import unicode_literals
2+
3+
import calendar
4+
import datetime
5+
6+
from django.contrib.auth import get_user_model
7+
from django.test import TestCase
8+
from django.urls import reverse
9+
from django.utils import timezone
10+
11+
from oauth2_provider.models import get_access_token_model, get_application_model
12+
from oauth2_provider.settings import oauth2_settings
13+
14+
Application = get_application_model()
15+
AccessToken = get_access_token_model()
16+
UserModel = get_user_model()
17+
18+
19+
class TestTokenIntrospectionViews(TestCase):
20+
"""
21+
Tests for Authorized Token Introspection Views
22+
"""
23+
def setUp(self):
24+
self.resource_server_user = UserModel.objects.create_user("resource_server", "test@example.com")
25+
self.test_user = UserModel.objects.create_user("bar_user", "dev@example.com")
26+
27+
self.application = Application(
28+
name="Test Application",
29+
redirect_uris="http://localhost http://example.com http://example.it",
30+
user=self.test_user,
31+
client_type=Application.CLIENT_CONFIDENTIAL,
32+
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
33+
)
34+
self.application.save()
35+
36+
self.resource_server_token = AccessToken.objects.create(
37+
user=self.resource_server_user, token="12345678900",
38+
application=self.application,
39+
expires=timezone.now() + datetime.timedelta(days=1),
40+
scope="read write introspection"
41+
)
42+
43+
self.valid_token = AccessToken.objects.create(
44+
user=self.test_user, token="12345678901",
45+
application=self.application,
46+
expires=timezone.now() + datetime.timedelta(days=1),
47+
scope="read write dolphin"
48+
)
49+
50+
self.invalid_token = AccessToken.objects.create(
51+
user=self.test_user, token="12345678902",
52+
application=self.application,
53+
expires=timezone.now() + datetime.timedelta(days=-1),
54+
scope="read write dolphin"
55+
)
56+
57+
self.token_without_user = AccessToken.objects.create(
58+
user=None, token="12345678903",
59+
application=self.application,
60+
expires=timezone.now() + datetime.timedelta(days=1),
61+
scope="read write dolphin"
62+
)
63+
64+
self.token_without_app = AccessToken.objects.create(
65+
user=self.test_user, token="12345678904",
66+
application=None,
67+
expires=timezone.now() + datetime.timedelta(days=1),
68+
scope="read write dolphin"
69+
)
70+
71+
oauth2_settings._SCOPES = ["read", "write", "introspection", "dolphin"]
72+
oauth2_settings.READ_SCOPE = "read"
73+
oauth2_settings.WRITE_SCOPE = "write"
74+
75+
def tearDown(self):
76+
oauth2_settings._SCOPES = ["read", "write"]
77+
AccessToken.objects.all().delete()
78+
Application.objects.all().delete()
79+
UserModel.objects.all().delete()
80+
81+
def test_view_forbidden(self):
82+
"""
83+
Test that the view is restricted for logged-in users.
84+
"""
85+
response = self.client.get(reverse("oauth2_provider:introspect"))
86+
self.assertEqual(response.status_code, 403)
87+
88+
def test_view_get_valid_token(self):
89+
"""
90+
Test that when you pass a valid token as URL parameter,
91+
a json with an active token state is provided
92+
"""
93+
auth_headers = {
94+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
95+
}
96+
response = self.client.get(
97+
reverse("oauth2_provider:introspect"),
98+
{"token": self.valid_token.token},
99+
**auth_headers)
100+
101+
self.assertEqual(response.status_code, 200)
102+
content = response.json()
103+
self.assertIsInstance(content, dict)
104+
self.assertDictEqual(content, {
105+
"active": True,
106+
"scope": self.valid_token.scope,
107+
"client_id": self.valid_token.application.client_id,
108+
"username": self.valid_token.user.get_username(),
109+
"exp": int(calendar.timegm(self.valid_token.expires.timetuple())),
110+
})
111+
112+
def test_view_get_valid_token_without_user(self):
113+
"""
114+
Test that when you pass a valid token as URL parameter,
115+
a json with an active token state is provided
116+
"""
117+
auth_headers = {
118+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
119+
}
120+
response = self.client.get(
121+
reverse("oauth2_provider:introspect"),
122+
{"token": self.token_without_user.token},
123+
**auth_headers)
124+
125+
self.assertEqual(response.status_code, 200)
126+
content = response.json()
127+
self.assertIsInstance(content, dict)
128+
self.assertDictEqual(content, {
129+
"active": True,
130+
"scope": self.token_without_user.scope,
131+
"client_id": self.token_without_user.application.client_id,
132+
"exp": int(calendar.timegm(self.token_without_user.expires.timetuple())),
133+
})
134+
135+
def test_view_get_valid_token_without_app(self):
136+
"""
137+
Test that when you pass a valid token as URL parameter,
138+
a json with an active token state is provided
139+
"""
140+
auth_headers = {
141+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
142+
}
143+
response = self.client.get(
144+
reverse("oauth2_provider:introspect"),
145+
{"token": self.token_without_app.token},
146+
**auth_headers)
147+
148+
self.assertEqual(response.status_code, 200)
149+
content = response.json()
150+
self.assertIsInstance(content, dict)
151+
self.assertDictEqual(content, {
152+
"active": True,
153+
"scope": self.token_without_app.scope,
154+
"username": self.token_without_app.user.get_username(),
155+
"exp": int(calendar.timegm(self.token_without_app.expires.timetuple())),
156+
})
157+
158+
def test_view_get_invalid_token(self):
159+
"""
160+
Test that when you pass an invalid token as URL parameter,
161+
a json with an inactive token state is provided
162+
"""
163+
auth_headers = {
164+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
165+
}
166+
response = self.client.get(
167+
reverse("oauth2_provider:introspect"),
168+
{"token": self.invalid_token.token},
169+
**auth_headers)
170+
171+
self.assertEqual(response.status_code, 200)
172+
content = response.json()
173+
self.assertIsInstance(content, dict)
174+
self.assertDictEqual(content, {
175+
"active": False,
176+
})
177+
178+
def test_view_get_notexisting_token(self):
179+
"""
180+
Test that when you pass an non existing token as URL parameter,
181+
a json with an inactive token state is provided
182+
"""
183+
auth_headers = {
184+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
185+
}
186+
response = self.client.get(
187+
reverse("oauth2_provider:introspect"),
188+
{"token": "kaudawelsch"},
189+
**auth_headers)
190+
191+
self.assertEqual(response.status_code, 401)
192+
content = response.json()
193+
self.assertIsInstance(content, dict)
194+
self.assertDictEqual(content, {
195+
"active": False,
196+
})
197+
198+
def test_view_post_valid_token(self):
199+
"""
200+
Test that when you pass a valid token as form parameter,
201+
a json with an active token state is provided
202+
"""
203+
auth_headers = {
204+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
205+
}
206+
response = self.client.post(
207+
reverse("oauth2_provider:introspect"),
208+
{"token": self.valid_token.token},
209+
**auth_headers)
210+
211+
self.assertEqual(response.status_code, 200)
212+
content = response.json()
213+
self.assertIsInstance(content, dict)
214+
self.assertDictEqual(content, {
215+
"active": True,
216+
"scope": self.valid_token.scope,
217+
"client_id": self.valid_token.application.client_id,
218+
"username": self.valid_token.user.get_username(),
219+
"exp": int(calendar.timegm(self.valid_token.expires.timetuple())),
220+
})
221+
222+
def test_view_post_invalid_token(self):
223+
"""
224+
Test that when you pass an invalid token as form parameter,
225+
a json with an inactive token state is provided
226+
"""
227+
auth_headers = {
228+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
229+
}
230+
response = self.client.post(
231+
reverse("oauth2_provider:introspect"),
232+
{"token": self.invalid_token.token},
233+
**auth_headers)
234+
235+
self.assertEqual(response.status_code, 200)
236+
content = response.json()
237+
self.assertIsInstance(content, dict)
238+
self.assertDictEqual(content, {
239+
"active": False,
240+
})
241+
242+
def test_view_post_notexisting_token(self):
243+
"""
244+
Test that when you pass an non existing token as form parameter,
245+
a json with an inactive token state is provided
246+
"""
247+
auth_headers = {
248+
"HTTP_AUTHORIZATION": "Bearer " + self.resource_server_token.token,
249+
}
250+
response = self.client.post(
251+
reverse("oauth2_provider:introspect"),
252+
{"token": "kaudawelsch"},
253+
**auth_headers)
254+
255+
self.assertEqual(response.status_code, 401)
256+
content = response.json()
257+
self.assertIsInstance(content, dict)
258+
self.assertDictEqual(content, {
259+
"active": False,
260+
})

0 commit comments

Comments
 (0)