11
11
oauth2_scheme = APIKeyCookie (name = settings .auth_cookie_name )
12
12
13
13
14
+ def derive_key () -> bytes :
15
+ """Derive a key from the auth secret."""
16
+ hkdf = HKDF (
17
+ algorithm = hashes .SHA256 (),
18
+ length = settings .auth_length ,
19
+ salt = settings .auth_salt ,
20
+ info = settings .auth_info ,
21
+ )
22
+ key = hkdf .derive (settings .auth_secret )
23
+ return key
24
+
25
+
14
26
def create_access_token (data : dict ) -> str :
15
27
"""Create encoded JSON Web Token (JWT) using the given data."""
16
28
expires_delta = timedelta (minutes = settings .auth_access_token_expire_minutes )
@@ -19,32 +31,20 @@ def create_access_token(data: dict) -> str:
19
31
to_encode .update ({"exp" : expire })
20
32
21
33
# Generate a key from the auth secret
22
- hkdf = HKDF (
23
- algorithm = hashes .SHA256 (),
24
- length = settings .auth_length ,
25
- salt = settings .auth_salt ,
26
- info = settings .auth_info ,
27
- )
28
- key = hkdf .derive (settings .auth_secret )
34
+ key = derive_key ()
29
35
30
36
# Encrypt the payload using JWE
31
37
token : bytes = jwe .encrypt (to_encode , key )
32
38
return token .decode ()
33
39
34
40
35
41
def get_current_user_id (token : str = Security (oauth2_scheme )) -> str | None :
42
+ """Decode the current user JWT token and return the payload."""
36
43
if not settings .use_auth :
37
44
return None
38
45
39
- """Decode the current user JWT token and return the payload."""
40
46
# Generate a key from the auth secret
41
- hkdf = HKDF (
42
- algorithm = hashes .SHA256 (),
43
- length = settings .auth_length ,
44
- salt = settings .auth_salt ,
45
- info = settings .auth_info ,
46
- )
47
- key = hkdf .derive (settings .auth_secret )
47
+ key = derive_key ()
48
48
49
49
# Decrypt the JWE token
50
50
try :
0 commit comments