-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #36077 from openedx/rijuma/remove-edx-token-utils-dep
chore: Remove edx-token-utils dependency
- Loading branch information
Showing
12 changed files
with
271 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
""" | ||
JWT Token handling and signing functions. | ||
""" | ||
|
||
import json | ||
from time import time | ||
|
||
from django.conf import settings | ||
from jwkest import Expired, Invalid, MissingKey, jwk | ||
from jwkest.jws import JWS | ||
|
||
|
||
def create_jwt(lms_user_id, expires_in_seconds, additional_token_claims, now=None): | ||
""" | ||
Produce an encoded JWT (string) indicating some temporary permission for the indicated user. | ||
What permission that is must be encoded in additional_claims. | ||
Arguments: | ||
lms_user_id (int): LMS user ID this token is being generated for | ||
expires_in_seconds (int): Time to token expiry, specified in seconds. | ||
additional_token_claims (dict): Additional claims to include in the token. | ||
now(int): optional now value for testing | ||
""" | ||
now = now or int(time()) | ||
|
||
payload = { | ||
'lms_user_id': lms_user_id, | ||
'exp': now + expires_in_seconds, | ||
'iat': now, | ||
'iss': settings.TOKEN_SIGNING['JWT_ISSUER'], | ||
'version': settings.TOKEN_SIGNING['JWT_SUPPORTED_VERSION'], | ||
} | ||
payload.update(additional_token_claims) | ||
return _encode_and_sign(payload) | ||
|
||
|
||
def _encode_and_sign(payload): | ||
""" | ||
Encode and sign the provided payload. | ||
The signing key and algorithm are pulled from settings. | ||
""" | ||
keys = jwk.KEYS() | ||
|
||
serialized_keypair = json.loads(settings.TOKEN_SIGNING['JWT_PRIVATE_SIGNING_JWK']) | ||
keys.add(serialized_keypair) | ||
algorithm = settings.TOKEN_SIGNING['JWT_SIGNING_ALGORITHM'] | ||
|
||
data = json.dumps(payload) | ||
jws = JWS(data, alg=algorithm) | ||
return jws.sign_compact(keys=keys) | ||
|
||
|
||
def unpack_jwt(token, lms_user_id, now=None): | ||
""" | ||
Unpack and verify an encoded JWT. | ||
Validate the user and expiration. | ||
Arguments: | ||
token (string): The token to be unpacked and verified. | ||
lms_user_id (int): LMS user ID this token should match with. | ||
now (int): Optional now value for testing. | ||
Returns a valid, decoded json payload (string). | ||
""" | ||
now = now or int(time()) | ||
payload = _unpack_and_verify(token) | ||
|
||
if "lms_user_id" not in payload: | ||
raise MissingKey("LMS user id is missing") | ||
if "exp" not in payload: | ||
raise MissingKey("Expiration is missing") | ||
if payload["lms_user_id"] != lms_user_id: | ||
raise Invalid("User does not match") | ||
if payload["exp"] < now: | ||
raise Expired("Token is expired") | ||
|
||
return payload | ||
|
||
|
||
def _unpack_and_verify(token): | ||
""" | ||
Unpack and verify the provided token. | ||
The signing key and algorithm are pulled from settings. | ||
""" | ||
keys = jwk.KEYS() | ||
keys.load_jwks(settings.TOKEN_SIGNING['JWT_PUBLIC_SIGNING_JWK_SET']) | ||
decoded = JWS().verify_compact(token.encode('utf-8'), keys) | ||
return decoded |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
""" | ||
Tests for token handling | ||
""" | ||
import unittest | ||
|
||
from django.conf import settings | ||
from jwkest import BadSignature, Expired, Invalid, MissingKey, jwk | ||
from jwkest.jws import JWS | ||
|
||
from openedx.core.djangolib.testing.utils import skip_unless_lms | ||
from openedx.core.lib.jwt import _encode_and_sign, create_jwt, unpack_jwt | ||
|
||
|
||
test_user_id = 121 | ||
invalid_test_user_id = 120 | ||
test_timeout = 60 | ||
test_now = 1661432902 | ||
test_claims = {"foo": "bar", "baz": "quux", "meaning": 42} | ||
expected_full_token = { | ||
"lms_user_id": test_user_id, | ||
"iat": 1661432902, | ||
"exp": 1661432902 + 60, | ||
"iss": "token-test-issuer", # these lines from test_settings.py | ||
"version": "1.2.0", # these lines from test_settings.py | ||
} | ||
|
||
|
||
@skip_unless_lms | ||
class TestSign(unittest.TestCase): | ||
""" | ||
Tests for JWT creation and signing. | ||
""" | ||
|
||
def test_create_jwt(self): | ||
token = create_jwt(test_user_id, test_timeout, {}, test_now) | ||
|
||
decoded = _verify_jwt(token) | ||
self.assertEqual(expected_full_token, decoded) | ||
|
||
def test_create_jwt_with_claims(self): | ||
token = create_jwt(test_user_id, test_timeout, test_claims, test_now) | ||
|
||
expected_token_with_claims = expected_full_token.copy() | ||
expected_token_with_claims.update(test_claims) | ||
|
||
decoded = _verify_jwt(token) | ||
self.assertEqual(expected_token_with_claims, decoded) | ||
|
||
def test_malformed_token(self): | ||
token = create_jwt(test_user_id, test_timeout, test_claims, test_now) | ||
token = token + "a" | ||
|
||
expected_token_with_claims = expected_full_token.copy() | ||
expected_token_with_claims.update(test_claims) | ||
|
||
with self.assertRaises(BadSignature): | ||
_verify_jwt(token) | ||
|
||
|
||
def _verify_jwt(jwt_token): | ||
""" | ||
Helper function which verifies the signature and decodes the token | ||
from string back to claims form | ||
""" | ||
keys = jwk.KEYS() | ||
keys.load_jwks(settings.TOKEN_SIGNING['JWT_PUBLIC_SIGNING_JWK_SET']) | ||
decoded = JWS().verify_compact(jwt_token.encode('utf-8'), keys) | ||
return decoded | ||
|
||
|
||
@skip_unless_lms | ||
class TestUnpack(unittest.TestCase): | ||
""" | ||
Tests for JWT unpacking. | ||
""" | ||
|
||
def test_unpack_jwt(self): | ||
token = create_jwt(test_user_id, test_timeout, {}, test_now) | ||
decoded = unpack_jwt(token, test_user_id, test_now) | ||
|
||
self.assertEqual(expected_full_token, decoded) | ||
|
||
def test_unpack_jwt_with_claims(self): | ||
token = create_jwt(test_user_id, test_timeout, test_claims, test_now) | ||
|
||
expected_token_with_claims = expected_full_token.copy() | ||
expected_token_with_claims.update(test_claims) | ||
|
||
decoded = unpack_jwt(token, test_user_id, test_now) | ||
|
||
self.assertEqual(expected_token_with_claims, decoded) | ||
|
||
def test_malformed_token(self): | ||
token = create_jwt(test_user_id, test_timeout, test_claims, test_now) | ||
token = token + "a" | ||
|
||
expected_token_with_claims = expected_full_token.copy() | ||
expected_token_with_claims.update(test_claims) | ||
|
||
with self.assertRaises(BadSignature): | ||
unpack_jwt(token, test_user_id, test_now) | ||
|
||
def test_unpack_token_with_invalid_user(self): | ||
token = create_jwt(invalid_test_user_id, test_timeout, {}, test_now) | ||
|
||
with self.assertRaises(Invalid): | ||
unpack_jwt(token, test_user_id, test_now) | ||
|
||
def test_unpack_expired_token(self): | ||
token = create_jwt(test_user_id, test_timeout, {}, test_now) | ||
|
||
with self.assertRaises(Expired): | ||
unpack_jwt(token, test_user_id, test_now + test_timeout + 1) | ||
|
||
def test_missing_expired_lms_user_id(self): | ||
payload = expected_full_token.copy() | ||
del payload['lms_user_id'] | ||
token = _encode_and_sign(payload) | ||
|
||
with self.assertRaises(MissingKey): | ||
unpack_jwt(token, test_user_id, test_now) | ||
|
||
def test_missing_expired_key(self): | ||
payload = expected_full_token.copy() | ||
del payload['exp'] | ||
token = _encode_and_sign(payload) | ||
|
||
with self.assertRaises(MissingKey): | ||
unpack_jwt(token, test_user_id, test_now) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.