diff --git a/qpdnd/api/decorators/voucher_checker.py b/qpdnd/api/decorators/voucher_checker.py index 00424ba..afbd17f 100644 --- a/qpdnd/api/decorators/voucher_checker.py +++ b/qpdnd/api/decorators/voucher_checker.py @@ -16,6 +16,57 @@ import json import jwt import requests +import datetime +import uuid + + +def _get_server_client_assertion(audience, client_id, issuer, subject, private_key_path): + """ + Create a client assertion (where G3WSuite is the client) to get PDND a voucher + """ + + issued = datetime.datetime.utcnow() + delta = datetime.timedelta(minutes=120) + expire_in = issued + delta + jti = uuid.uuid4() + + headers_rsa = { + "kid": client_id, + "alg": "RS256", + "typ": "JWT" + } + + payload = { + "iss": issuer, + "sub": subject, + "aud": audience, + #"purposeId": PURPOSEID, # Not needed for server assertion + "jti": str(jti), + "iat": issued, + "exp": expire_in + } + + key_path = private_key_path + with open(key_path, "rb") as private_key: + rsaKey = private_key.read() + + return jwt.encode(payload, rsaKey, algorithm="RS256", headers=headers_rsa) + + +def _get_voucher(url, client_id, client_assertion): + """ + Make a POST request to PDND + """ + + # Note: client_id is the ISSUER, not KID!!! + data = { + 'client_id': client_id, + 'client_assertion': client_assertion, + 'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + 'grant_type': 'client_credentials' + } + + return requests.post(url, data=data, headers={'Content-Type': 'application/x-www-form-urlencoded'}) def pdnd_voucher_required(func): @@ -123,6 +174,68 @@ def _wrapped_view(request, *args, **kwargs): **{'content_type': 'application/problem+json'}) + # Verify that the purposeId in the token is authorized by calling PDND API + purpose_id = None + try: + purpose_id = payload.get('purposeId') + except Exception as e: + return JsonResponse({ + 'status': 'Error', + 'msg': 'Invalid token (missing purposeId)' + }, + status=401, + **{'content_type': 'application/problem+json'}) + + # Get the voucher from the PDND API + server_assertion = _get_server_client_assertion( + settings.QPDN_AUDIENCE[qpdndp.pdnd_env], + settings.QPDND_SERVER_KID[qpdndp.pdnd_env], + settings.QPDND_SERVER_ISSUER[qpdndp.pdnd_env], + settings.QPDND_SERVER_SUBJECT[qpdndp.pdnd_env], + settings.QPDND_SERVER_PRIVKEY_PATH[qpdndp.pdnd_env]) + + server_result = _get_voucher(settings.QPDND_API_TOKEN_URL[qpdndp.pdnd_env], settings.QPDND_SERVER_ISSUER[qpdndp.pdnd_env], server_assertion) + + if server_result.status_code != 200: + return JsonResponse({ + 'status': 'Error', + 'msg': 'PDND voucher request failed' + }, + status=401, + **{'content_type': 'application/problem+json'}) + + server_access_token = server_result.json()['access_token'] + purpose_verification_url = settings.QPDND_API_PURPOSE_VERIFICATION_URL[qpdndp.pdnd_env].format(purposeId=purpose_id) + purpose_verification_response = requests.get(purpose_verification_url, headers={settings.QPDND_AUTH_HEADER: 'Bearer ' + server_access_token}) + + if purpose_verification_response.status_code != 200: + return JsonResponse({ + 'status': 'Error', + 'msg': 'PDND purpose request verification failed' + }, + status=401, + **{'content_type': 'application/problem+json'}) + + purpose_verification_response_json = purpose_verification_response.json() + state = purpose_verification_response_json.get('state', False) + if state != 'ACTIVE': + return JsonResponse({ + 'status': 'Error', + 'msg': 'PDND purpose state verification failed' + }, + status=401, + **{'content_type': 'application/problem+json'}) + + eserviceId = purpose_verification_response_json.get('eserviceId', False) + if not eserviceId or not eserviceId == settings.QPDND_ESERVICE_ID[qpdndp.pdnd_env]: + return JsonResponse({ + 'status': 'Error', + 'msg': 'PDND purpose eserviceId verification failed' + }, + status=401, + **{'content_type': 'application/problem+json'}) + + # All checks passed, call the view return func(request, *args, **kwargs) return _wrapped_view diff --git a/qpdnd/settings.py b/qpdnd/settings.py index 73b053a..e82630b 100644 --- a/qpdnd/settings.py +++ b/qpdnd/settings.py @@ -12,17 +12,69 @@ __license__ = 'MPL 2.0' +############################################################# # For PDND request authentication -# ------------------------------- +############################################################# + + +############################################################# +# These may be ported to the model settings + +QPDND_SERVER_KID = { + 'test': "J_z5sjzZ-7yRxGz0Cz_EtIPSbpLE0d5BJoBNGcsTzz4", + 'prod': "J_z5sjzZ-7yRxGz0Cz_EtIPSbpLE0d5BJoBNGcsTzz4" +} + +QPDND_ESERVICE_ID = { + 'test': "929ce5a1-2e82-4e37-bdce-c76bfd66407d", + 'prod': "929ce5a1-2e82-4e37-bdce-c76bfd66407d" +} QPDND_ISSUER = { 'test': "uat.interop.pagopa.it", 'prod': "interop.pagopa.it" } + +QPDND_SERVER_ISSUER = { + 'test': "c2fc3ed2-a096-4a23-bb2e-47c767fa19d6", + 'prod': "c2fc3ed2-a096-4a23-bb2e-47c767fa19d6" +} + +QPDND_SERVER_SUBJECT = { + 'test': "c2fc3ed2-a096-4a23-bb2e-47c767fa19d6", + 'prod': "c2fc3ed2-a096-4a23-bb2e-47c767fa19d6" +} + +# Path to a RSA256 private key file used by G3WSuite to authenticate itself to PDND +QPDND_SERVER_PRIVKEY_PATH = { + 'test': '/path/to/privkey.rsa.priv', + 'prod': '/path/to/privkey.rsa.priv' +} + +############################################################# +# Generic: should be the same for all services + + QPDND_WELL_KNOWN_URL = { 'test': "https://uat.interop.pagopa.it/.well-known/jwks.json", 'prod': "https://interop.pagopa.it/.well-known/jwks.json" } -QPDND_AUTH_HEADER = 'Authorization' \ No newline at end of file +QPDND_AUTH_HEADER = 'Authorization' + +QPDND_API_PURPOSE_VERIFICATION_URL = { + 'test': "https://api.uat.interop.pagopa.it/1.0/purposes/{purposeId}/agreement", + 'prod': "https://api.interop.pagopa.it/1.0/purposes/{purposeId}/agreement" +} + +QPDND_API_TOKEN_URL = { + 'test': "https://auth.uat.interop.pagopa.it/token.oauth2", + 'prod': "https://auth.interop.pagopa.it/token.oauth2" +} + +QPDN_AUDIENCE = { + 'test': "auth.uat.interop.pagopa.it/client-assertion", + 'prod': "auth.interop.pagopa.it/client-assertion" +} + diff --git a/qpdnd/tests/test_pdnd_auth.py b/qpdnd/tests/test_pdnd_auth.py index 462953b..e7856b4 100644 --- a/qpdnd/tests/test_pdnd_auth.py +++ b/qpdnd/tests/test_pdnd_auth.py @@ -104,7 +104,7 @@ def test_auth(self): headers = {'HTTP_AUTHORIZATION': 'Bearer ' + voucher} response = self.client.get(url, **headers) - self.assertEqual(response.status_code, 200) + self.assertEqual(response.status_code, 200, response.content) # Admin01 can pass # ----------------