diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml deleted file mode 100644 index 574ea8e..0000000 --- a/.github/workflows/build.yml +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "Build", - "on": { "pull_request": null }, - "jobs": { - "linux": { - "runs-on": "ubuntu-latest", - "strategy": { - "fail-fast": false, - "matrix": { - "python": [ - "3.6", - "3.7", - "3.8", - "3.9", - ], - }, - }, - "steps": [ - { "uses": "actions/checkout@v2" }, - { - "uses": "actions/setup-python@v2", - "with": { "python-version": "${{ matrix.python }}"}, - }, - { "run": "sudo apt update" }, - { "run": "sudo apt install -y libkrb5-dev" }, - { "run": "pip install flake8" }, - { "run": "pip install -r requirements.txt" }, - { "run": "python3 -m unittest" }, - { - "run": "flake8", - "if": "${{ matrix['python'] == 3.9 }}", - }, - ], - }, - }, -} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..b1be615 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,96 @@ +name: Test requests-gssapi +on: + push: + branches: + - main + + pull_request: + branches: + - main + + release: + types: + - published + +jobs: + build: + name: build sdist and wheel + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: build sdist and wheel + run: | + set -ex + + python -m pip install build + python -m build + + - uses: actions/upload-artifact@v4 + with: + name: artifact + path: ./dist/* + + test: + name: test + needs: + - build + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: + - "3.8" + - "3.9" + - "3.10" + - "3.11" + - "3.12" + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - uses: actions/download-artifact@v4 + with: + name: artifact + path: ./dist + + - name: Test + shell: bash + run: | + set -ex + + TOX_PYTHON=py$( echo '${{ matrix.python-version }}' | tr -d . ) + + sudo apt update + sudo apt install -y libkrb5-dev + + python -Im pip install tox + python -Im tox run \ + -f sanity \ + -f "${TOX_PYTHON}" \ + --installpkg dist/*.whl + env: + PYTEST_ADDOPTS: --color=yes --junitxml junit/test-results.xml + + publish: + name: publish + needs: + - test + runs-on: ubuntu-latest + permissions: + # IMPORTANT: this permission is mandatory for trusted publishing + id-token: write + + steps: + - uses: actions/download-artifact@v4 + with: + name: artifact + path: ./dist + + - name: Publish + if: startsWith(github.event.release.tag_name, 'v') + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.gitignore b/.gitignore index 7371324..4ca08ce 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,6 @@ env/ build/ dist/ +.tox/ requests_gssapi.egg-info/ diff --git a/HISTORY.rst b/HISTORY.rst index c6c392d..159841e 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -1,10 +1,12 @@ History ======= -FUTURE: TBD +1.3.0: 2024-02-16 ----------- - Drop flag for out of sequence detection - Use SPNEGO mechanism by default +- Fix ``SanitizedResponse.content`` to be ``bytes`` which reflects the base type +- Migrated project to a ``src`` layout setup and a ``PEP 621`` compliant build, this should have no impact on end users 1.2.3: 2021-02-08 ----------------- diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ff5bf7c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,104 @@ +[build-system] +requires = [ + "setuptools >= 61.0.0", # Support for setuptools config in pyproject.toml +] +build-backend = "setuptools.build_meta" + +[project] +name = "requests-gssapi" +description = "A GSSAPI authentication handler for python-requests" +readme = "README.rst" +requires-python = ">=3.8" +license = { file = "LICENSE" } +authors = [ + { name = "Robbie Harwood", email = "rharwood@redhat.com" }, + { name = "Ian Cordasco" }, + { name = "Cory Benfield" }, + { name = "Michael Komitee" }, +] +keywords = ["ansible", "debug", "lsp", "dap"] +classifiers = [ + "License :: OSI Approved :: ISC License (ISCL)", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [ + "requests >= 1.1.0", + "gssapi", +] +dynamic = ["version"] + +[project.urls] +homepage = "https://github.com/pythongssapi/requests-gssapi" + +[project.optional-dependencies] +dev = [ + "black == 24.2.0", + "isort == 5.13.2", + "pytest", + "tox >= 4.0.0", +] + +[tool.setuptools] +include-package-data = true + +[tool.setuptools.dynamic] +version = { attr = "requests_gssapi.__version__" } + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.black] +line-length = 120 +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' + +[tool.isort] +profile = "black" + +[tool.pytest.ini_options] +addopts = "--import-mode=importlib" +testpaths = "tests" +junit_family = "xunit2" + +[tool.tox] +legacy_tox_ini = """ +[tox] +env_list = + sanity + py3{8,9,10,11,12}-tests +min_version = 4.0 + +[testenv] +package = wheel +wheel_build_env = .pkg + +extras = + dev +install_command = python -Im pip install --no-compile {opts} {packages} + +passenv = + PYTEST_ADDOPTS + +commands = + sanity: python -m black . --check + sanity: python -m isort . --check-only + + tests: python -m pytest -v +""" diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 11c021a..0000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -requests>=1.1.0 -gssapi diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 5e40900..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[wheel] -universal = 1 diff --git a/setup.py b/setup.py deleted file mode 100755 index 3b3cd78..0000000 --- a/setup.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python -# coding: utf-8 -import os -import re -from setuptools import setup - -path = os.path.dirname(__file__) -desc_fd = os.path.join(path, 'README.rst') -hist_fd = os.path.join(path, 'HISTORY.rst') - -long_desc = '' -short_desc = 'A GSSAPI authentication handler for python-requests' - -if os.path.isfile(desc_fd): - with open(desc_fd) as fd: - long_desc = fd.read() - -if os.path.isfile(hist_fd): - with open(hist_fd) as fd: - long_desc = '\n\n'.join([long_desc, fd.read()]) - - -def get_version(): - """ - Simple function to extract the current version using regular expressions. - """ - reg = re.compile(r'__version__ = [\'"]([^\'"]*)[\'"]') - with open('requests_gssapi/__init__.py') as fd: - matches = list(filter(lambda x: x, map(reg.match, fd))) - - if not matches: - raise RuntimeError( - 'Could not find the version information for requests_gssapi' - ) - - return matches[0].group(1) - - -setup( - name='requests-gssapi', - description=short_desc, - long_description=long_desc, - author='Ian Cordasco, Cory Benfield, Michael Komitee, Robbie Harwood', - author_email='rharwood@redhat.com', - url='https://github.com/pythongssapi/requests-gssapi', - packages=['requests_gssapi'], - package_data={'': ['LICENSE', 'AUTHORS']}, - include_package_data=True, - version=get_version(), - install_requires=[ - 'requests>=1.1.0', - 'gssapi', - ], - test_suite='test_requests_gssapi', - classifiers=[ - "License :: OSI Approved :: ISC License (ISCL)" - ], -) diff --git a/requests_gssapi/__init__.py b/src/requests_gssapi/__init__.py similarity index 65% rename from requests_gssapi/__init__.py rename to src/requests_gssapi/__init__.py index 65e6d4e..eca9cea 100644 --- a/requests_gssapi/__init__.py +++ b/src/requests_gssapi/__init__.py @@ -12,14 +12,22 @@ The entire `requests.api` should be supported. """ + import logging -from .gssapi_ import HTTPSPNEGOAuth, SPNEGO, REQUIRED, OPTIONAL, DISABLED # noqa +from .compat import HTTPKerberosAuth, NullHandler from .exceptions import MutualAuthenticationError -from .compat import NullHandler, HTTPKerberosAuth +from .gssapi_ import DISABLED, OPTIONAL, REQUIRED, SPNEGO, HTTPSPNEGOAuth # noqa logging.getLogger(__name__).addHandler(NullHandler()) -__all__ = ('HTTPSPNEGOAuth', 'HTTPKerberosAuth', 'MutualAuthenticationError', - 'SPNEGO', 'REQUIRED', 'OPTIONAL', 'DISABLED') -__version__ = '1.2.3' +__all__ = ( + "HTTPSPNEGOAuth", + "HTTPKerberosAuth", + "MutualAuthenticationError", + "SPNEGO", + "REQUIRED", + "OPTIONAL", + "DISABLED", +) +__version__ = "1.3.0" diff --git a/requests_gssapi/compat.py b/src/requests_gssapi/compat.py similarity index 78% rename from requests_gssapi/compat.py rename to src/requests_gssapi/compat.py index f59f08d..0ae4ca8 100644 --- a/requests_gssapi/compat.py +++ b/src/requests_gssapi/compat.py @@ -1,6 +1,7 @@ """ Compatibility library for older versions of python and requests_kerberos """ + import sys import gssapi @@ -21,9 +22,17 @@ def emit(self, record): class HTTPKerberosAuth(HTTPSPNEGOAuth): """Deprecated compat shim; see HTTPSPNEGOAuth instead.""" - def __init__(self, mutual_authentication=DISABLED, service="HTTP", - delegate=False, force_preemptive=False, principal=None, - hostname_override=None, sanitize_mutual_error_response=True): + + def __init__( + self, + mutual_authentication=DISABLED, + service="HTTP", + delegate=False, + force_preemptive=False, + principal=None, + hostname_override=None, + sanitize_mutual_error_response=True, + ): # put these here for later self.principal = principal self.service = service @@ -36,7 +45,8 @@ def __init__(self, mutual_authentication=DISABLED, service="HTTP", delegate=delegate, opportunistic_auth=force_preemptive, creds=None, - sanitize_mutual_error_response=sanitize_mutual_error_response) + sanitize_mutual_error_response=sanitize_mutual_error_response, + ) def generate_request_header(self, response, host, is_preemptive=False): # This method needs to be shimmed because `host` isn't exposed to @@ -45,8 +55,7 @@ def generate_request_header(self, response, host, is_preemptive=False): try: if self.principal is not None: gss_stage = "acquiring credentials" - name = gssapi.Name( - self.principal, gssapi.NameType.user) + name = gssapi.Name(self.principal, gssapi.NameType.user) self.creds = gssapi.Credentials(name=name, usage="initiate") # contexts still need to be stored by host, but hostname_override @@ -60,14 +69,11 @@ def generate_request_header(self, response, host, is_preemptive=False): kerb_host = self.hostname_override kerb_spn = "{0}@{1}".format(self.service, kerb_host) - self.target_name = gssapi.Name( - kerb_spn, gssapi.NameType.hostbased_service) + self.target_name = gssapi.Name(kerb_spn, gssapi.NameType.hostbased_service) - return HTTPSPNEGOAuth.generate_request_header(self, response, - host, is_preemptive) + return HTTPSPNEGOAuth.generate_request_header(self, response, host, is_preemptive) except gssapi.exceptions.GSSError as error: msg = error.gen_message() - log.exception( - "generate_request_header(): {0} failed:".format(gss_stage)) + log.exception("generate_request_header(): {0} failed:".format(gss_stage)) log.exception(msg) raise SPNEGOExchangeError("%s failed: %s" % (gss_stage, msg)) diff --git a/requests_gssapi/exceptions.py b/src/requests_gssapi/exceptions.py similarity index 99% rename from requests_gssapi/exceptions.py rename to src/requests_gssapi/exceptions.py index a13c400..ee40dd6 100644 --- a/requests_gssapi/exceptions.py +++ b/src/requests_gssapi/exceptions.py @@ -5,6 +5,7 @@ This module contains the set of exceptions. """ + from requests.exceptions import RequestException diff --git a/requests_gssapi/gssapi_.py b/src/requests_gssapi/gssapi_.py similarity index 85% rename from requests_gssapi/gssapi_.py rename to src/requests_gssapi/gssapi_.py index db797b7..0d3c9f6 100644 --- a/requests_gssapi/gssapi_.py +++ b/src/requests_gssapi/gssapi_.py @@ -1,15 +1,13 @@ -import re import logging - -from base64 import b64encode, b64decode +import re +from base64 import b64decode, b64encode import gssapi - from requests.auth import AuthBase -from requests.models import Response from requests.compat import urlparse -from requests.structures import CaseInsensitiveDict from requests.cookies import cookiejar_from_dict +from requests.models import Response +from requests.structures import CaseInsensitiveDict from .exceptions import MutualAuthenticationError, SPNEGOExchangeError @@ -57,23 +55,23 @@ def __init__(self, response): self._content = b"" self.cookies = cookiejar_from_dict({}) self.headers = CaseInsensitiveDict() - self.headers['content-length'] = '0' - for header in ('date', 'server'): + self.headers["content-length"] = "0" + for header in ("date", "server"): if header in response.headers: self.headers[header] = response.headers[header] def _negotiate_value(response): """Extracts the gssapi authentication token from the appropriate header""" - if hasattr(_negotiate_value, 'regex'): + if hasattr(_negotiate_value, "regex"): regex = _negotiate_value.regex else: # There's no need to re-compile this EVERY time it is called. Compile # it once and you won't have the performance hit of the compilation. - regex = re.compile(r'Negotiate\s*([^,]*)', re.I) + regex = re.compile(r"Negotiate\s*([^,]*)", re.I) _negotiate_value.regex = regex - authreq = response.headers.get('www-authenticate', None) + authreq = response.headers.get("www-authenticate", None) if authreq: match_obj = regex.search(authreq) if match_obj: @@ -110,9 +108,17 @@ class HTTPSPNEGOAuth(AuthBase): server responses. See the `SanitizedResponse` class. """ - def __init__(self, mutual_authentication=DISABLED, target_name="HTTP", - delegate=False, opportunistic_auth=False, creds=None, - mech=SPNEGO, sanitize_mutual_error_response=True): + + def __init__( + self, + mutual_authentication=DISABLED, + target_name="HTTP", + delegate=False, + opportunistic_auth=False, + creds=None, + mech=SPNEGO, + sanitize_mutual_error_response=True, + ): self.context = {} self.pos = None self.mutual_authentication = mutual_authentication @@ -142,27 +148,25 @@ def generate_request_header(self, response, host, is_preemptive=False): gss_stage = "initiating context" name = self.target_name if type(name) != gssapi.Name: - if '@' not in name: + if "@" not in name: name = "%s@%s" % (name, host) name = gssapi.Name(name, gssapi.NameType.hostbased_service) self.context[host] = gssapi.SecurityContext( - usage="initiate", flags=gssflags, name=name, - creds=self.creds, mech=self.mech) + usage="initiate", flags=gssflags, name=name, creds=self.creds, mech=self.mech + ) gss_stage = "stepping context" if is_preemptive: gss_response = self.context[host].step() else: - gss_response = self.context[host].step( - _negotiate_value(response)) + gss_response = self.context[host].step(_negotiate_value(response)) return "Negotiate {0}".format(b64encode(gss_response).decode()) except gssapi.exceptions.GSSError as error: msg = error.gen_message() - log.exception( - "generate_request_header(): {0} failed:".format(gss_stage)) + log.exception("generate_request_header(): {0} failed:".format(gss_stage)) log.exception(msg) raise SPNEGOExchangeError("%s failed: %s" % (gss_stage, msg)) @@ -177,9 +181,8 @@ def authenticate_user(self, response, **kwargs): # GSS Failure, return existing response return response - log.debug("authenticate_user(): Authorization header: {0}".format( - auth_header)) - response.request.headers['Authorization'] = auth_header + log.debug("authenticate_user(): Authorization header: {0}".format(auth_header)) + response.request.headers["Authorization"] = auth_header # Consume the content so we can reuse the connection for the next # request. @@ -225,8 +228,7 @@ def handle_other(self, response): # raise an exception so the user doesn't use an untrusted # response. log.error("handle_other(): Mutual authentication failed") - raise MutualAuthenticationError( - "Unable to authenticate {0}".format(response)) + raise MutualAuthenticationError("Unable to authenticate {0}".format(response)) # Authentication successful log.debug("handle_other(): returning {0}".format(response)) @@ -234,11 +236,10 @@ def handle_other(self, response): elif is_http_error or self.mutual_authentication == OPTIONAL: if not response.ok: log.error( - "handle_other(): Mutual authentication unavailable on" - " {0} response".format(response.status_code)) + "handle_other(): Mutual authentication unavailable on" " {0} response".format(response.status_code) + ) - if self.mutual_authentication == REQUIRED and \ - self.sanitize_mutual_error_response: + if self.mutual_authentication == REQUIRED and self.sanitize_mutual_error_response: return SanitizedResponse(response) return response else: @@ -246,8 +247,7 @@ def handle_other(self, response): # required, raise an exception so the user doesn't use an # untrusted response. log.error("handle_other(): Mutual authentication failed") - raise MutualAuthenticationError( - "Unable to authenticate {0}".format(response)) + raise MutualAuthenticationError("Unable to authenticate {0}".format(response)) def authenticate_server(self, response): """ @@ -256,8 +256,7 @@ def authenticate_server(self, response): Returns True on success, False on failure. """ - log.debug("authenticate_server(): Authenticate header: {0}".format( - _negotiate_value(response))) + log.debug("authenticate_server(): Authenticate header: {0}".format(_negotiate_value(response))) host = urlparse(response.url).hostname @@ -274,7 +273,7 @@ def authenticate_server(self, response): def handle_response(self, response, **kwargs): """Takes the given response and tries GSSAPI auth, as needed.""" - num_401s = kwargs.pop('num_401s', 0) + num_401s = kwargs.pop("num_401s", 0) if self.pos is not None: # Rewind the file position indicator of the body to where @@ -301,7 +300,7 @@ def handle_response(self, response, **kwargs): def deregister(self, response): """Deregisters the response handler""" - response.request.deregister_hook('response', self.handle_response) + response.request.deregister_hook("response", self.handle_response) def __call__(self, request): if self.opportunistic_auth: @@ -310,22 +309,20 @@ def __call__(self, request): host = urlparse(request.url).hostname try: - auth_header = self.generate_request_header(None, host, - is_preemptive=True) - log.debug( - "HTTPSPNEGOAuth: Preemptive Authorization header: {0}" - .format(auth_header)) + auth_header = self.generate_request_header(None, host, is_preemptive=True) + log.debug("HTTPSPNEGOAuth: Preemptive Authorization header: {0}".format(auth_header)) except SPNEGOExchangeError as exc: log.warning( "HTTPSPNEGOAuth: Opportunistic auth failed with %s ->" " sending request without adding Authorization header." " Will try again if it results in a 401.", - exc) + exc, + ) else: log.debug("HTTPSPNEGOAuth: Added opportunistic auth header") - request.headers['Authorization'] = auth_header + request.headers["Authorization"] = auth_header - request.register_hook('response', self.handle_response) + request.register_hook("response", self.handle_response) try: self.pos = request.body.tell() except AttributeError: diff --git a/test_requests_gssapi.py b/tests/test_requests_gssapi.py similarity index 68% rename from test_requests_gssapi.py rename to tests/test_requests_gssapi.py index a755732..dabfa0d 100644 --- a/test_requests_gssapi.py +++ b/tests/test_requests_gssapi.py @@ -3,18 +3,16 @@ """Tests for requests_gssapi.""" +import unittest from base64 import b64encode from unittest.mock import Mock, patch -from requests.compat import urlparse -import requests import gssapi +import requests +from requests.compat import urlparse import requests_gssapi -import unittest - -from requests_gssapi import REQUIRED -from requests_gssapi import SPNEGO +from requests_gssapi import REQUIRED, SPNEGO # Note: we're not using the @mock.patch decorator: # > My only word of warning is that in the past, the patch decorator hides @@ -62,94 +60,83 @@ def tearDown(self): def test_negotate_value_extraction(self): response = requests.Response() - response.headers = {'www-authenticate': b64_negotiate_token} - self.assertEqual( - requests_gssapi.gssapi_._negotiate_value(response), - b'token' - ) + response.headers = {"www-authenticate": b64_negotiate_token} + self.assertEqual(requests_gssapi.gssapi_._negotiate_value(response), b"token") def test_negotate_value_extraction_none(self): response = requests.Response() response.headers = {} - self.assertTrue( - requests_gssapi.gssapi_._negotiate_value(response) is None) + self.assertTrue(requests_gssapi.gssapi_._negotiate_value(response) is None) def test_force_preemptive(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): auth = requests_gssapi.HTTPKerberosAuth(force_preemptive=True) request = requests.Request(url="http://www.example.org") auth.__call__(request) - self.assertTrue('Authorization' in request.headers) - self.assertEqual(request.headers.get('Authorization'), - b64_negotiate_response) + self.assertTrue("Authorization" in request.headers) + self.assertEqual(request.headers.get("Authorization"), b64_negotiate_response) def test_no_force_preemptive(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): auth = requests_gssapi.HTTPKerberosAuth() request = requests.Request(url="http://www.example.org") auth.__call__(request) - self.assertTrue('Authorization' not in request.headers) + self.assertTrue("Authorization" not in request.headers) def test_generate_request_header(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth() - self.assertEqual( - auth.generate_request_header(response, host), - b64_negotiate_response) + self.assertEqual(auth.generate_request_header(response, host), b64_negotiate_response) fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - creds=None, mech=SPNEGO, flags=gssflags, usage="initiate") + name=gssapi_sname("HTTP@www.example.org"), creds=None, mech=SPNEGO, flags=gssflags, usage="initiate" + ) fake_resp.assert_called_with(b"token") def test_generate_request_header_init_error(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth() - self.assertRaises(requests_gssapi.exceptions.SPNEGOExchangeError, - auth.generate_request_header, response, host) + self.assertRaises( + requests_gssapi.exceptions.SPNEGOExchangeError, auth.generate_request_header, response, host + ) fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) def test_generate_request_header_step_error(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth() - self.assertRaises(requests_gssapi.exceptions.SPNEGOExchangeError, - auth.generate_request_header, response, host) + self.assertRaises( + requests_gssapi.exceptions.SPNEGOExchangeError, auth.generate_request_header, response, host + ) fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fail_resp.assert_called_with(b"token") def test_authenticate_user(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -161,7 +148,7 @@ def test_authenticate_user(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -171,22 +158,20 @@ def test_authenticate_user(self): self.assertTrue(response in r.history) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - flags=gssflags, usage="initiate", creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), flags=gssflags, usage="initiate", creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_handle_401(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -198,7 +183,7 @@ def test_handle_401(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -208,24 +193,20 @@ def test_handle_401(self): self.assertTrue(response in r.history) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - creds=None, mech=SPNEGO, flags=gssflags, usage="initiate") + name=gssapi_sname("HTTP@www.example.org"), creds=None, mech=SPNEGO, flags=gssflags, usage="initiate" + ) fake_resp.assert_called_with(b"token") def test_authenticate_server(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} auth = requests_gssapi.HTTPKerberosAuth() auth.context = {"www.example.org": gssapi.SecurityContext} @@ -235,17 +216,13 @@ def test_authenticate_server(self): fake_resp.assert_called_with(b"servertoken") def test_handle_other(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": gssapi.SecurityContext} r = auth.handle_other(response_ok) @@ -254,17 +231,13 @@ def test_handle_other(self): fake_resp.assert_called_with(b"servertoken") def test_handle_response_200(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": gssapi.SecurityContext} r = auth.handle_response(response_ok) @@ -273,70 +246,55 @@ def test_handle_response_200(self): fake_resp.assert_called_with(b"servertoken") def test_handle_response_200_mutual_auth_required_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 response_ok.headers = {} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": "CTX"} - self.assertRaises(requests_gssapi.MutualAuthenticationError, - auth.handle_response, response_ok) + self.assertRaises(requests_gssapi.MutualAuthenticationError, auth.handle_response, response_ok) self.assertFalse(fake_resp.called) def test_handle_response_200_mutual_auth_required_failure_2(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": gssapi.SecurityContext} - self.assertRaises(requests_gssapi.MutualAuthenticationError, - auth.handle_response, response_ok) + self.assertRaises(requests_gssapi.MutualAuthenticationError, auth.handle_response, response_ok) fail_resp.assert_called_with(b"servertoken") def test_handle_response_200_mutual_auth_optional_hard_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - requests_gssapi.OPTIONAL) + auth = requests_gssapi.HTTPKerberosAuth(requests_gssapi.OPTIONAL) auth.context = {"www.example.org": gssapi.SecurityContext} - self.assertRaises(requests_gssapi.MutualAuthenticationError, - auth.handle_response, response_ok) + self.assertRaises(requests_gssapi.MutualAuthenticationError, auth.handle_response, response_ok) fail_resp.assert_called_with(b"servertoken") def test_handle_response_200_mutual_auth_optional_soft_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - auth = requests_gssapi.HTTPKerberosAuth( - requests_gssapi.OPTIONAL) + auth = requests_gssapi.HTTPKerberosAuth(requests_gssapi.OPTIONAL) auth.context = {"www.example.org": gssapi.SecurityContext} r = auth.handle_response(response_ok) @@ -346,8 +304,7 @@ def test_handle_response_200_mutual_auth_optional_soft_failure(self): self.assertFalse(fake_resp.called) def test_handle_response_500_mutual_auth_required_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_500 = requests.Response() response_500.url = "http://www.example.org/" response_500.status_code = 500 @@ -359,14 +316,12 @@ def test_handle_response_500_mutual_auth_required_failure(self): response_500.raw = "RAW" response_500.cookies = "COOKIES" - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": "CTX"} r = auth.handle_response(response_500) - self.assertTrue( - isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) + self.assertTrue(isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) self.assertNotEqual(r, response_500) self.assertNotEqual(r.headers, response_500.headers) self.assertEqual(r.status_code, response_500.status_code) @@ -382,18 +337,15 @@ def test_handle_response_500_mutual_auth_required_failure(self): self.assertFalse(fail_resp.called) # re-test with error response sanitizing disabled - auth = requests_gssapi.HTTPKerberosAuth( - sanitize_mutual_error_response=False) + auth = requests_gssapi.HTTPKerberosAuth(sanitize_mutual_error_response=False) auth.context = {"www.example.org": "CTX"} r = auth.handle_response(response_500) - self.assertFalse( - isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) + self.assertFalse(isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) def test_handle_response_500_mutual_auth_optional_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_500 = requests.Response() response_500.url = "http://www.example.org/" response_500.status_code = 500 @@ -405,8 +357,7 @@ def test_handle_response_500_mutual_auth_optional_failure(self): response_500.raw = "RAW" response_500.cookies = "COOKIES" - auth = requests_gssapi.HTTPKerberosAuth( - requests_gssapi.OPTIONAL) + auth = requests_gssapi.HTTPKerberosAuth(requests_gssapi.OPTIONAL) auth.context = {"www.example.org": "CTX"} r = auth.handle_response(response_500) @@ -417,12 +368,11 @@ def test_handle_response_500_mutual_auth_optional_failure(self): def test_handle_response_401(self): # Get a 401 from server, authenticate, and get a 200 back. - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -434,7 +384,7 @@ def test_handle_response_401(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -448,20 +398,18 @@ def test_handle_response_401(self): self.assertTrue(response in r.history) auth.handle_other.assert_called_once_with(response_ok) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_handle_response_401_rejected(self): # Get a 401 from server, authenticate, and get another 401 back. # Ensure there is no infinite recursion. - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): connection = Mock() def connection_send(self, *args, **kwargs): @@ -480,7 +428,7 @@ def connection_send(self, *args, **kwargs): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -491,36 +439,33 @@ def connection_send(self, *args, **kwargs): r = auth.handle_response(response) self.assertEqual(r.status_code, 401) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_generate_request_header_custom_service(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth(service="barfoo") auth.generate_request_header(response, host), fake_init.assert_called_with( - name=gssapi_sname("barfoo@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("barfoo@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_delegation(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -532,121 +477,138 @@ def test_delegation(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" response.raw = raw - auth = requests_gssapi.HTTPKerberosAuth(service="HTTP", - delegate=True) + auth = requests_gssapi.HTTPKerberosAuth(service="HTTP", delegate=True) r = auth.authenticate_user(response) self.assertTrue(response in r.history) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssdelegflags, creds=None, mech=SPNEGO) + usage="initiate", + flags=gssdelegflags, + creds=None, + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") def test_principal_override(self): - with patch.multiple("gssapi.Credentials", __new__=fake_creds), \ - patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.Credentials", __new__=fake_creds), patch.multiple( + "gssapi.SecurityContext", __init__=fake_init, step=fake_resp + ): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth(principal="user@REALM") auth.generate_request_header(response, host) - fake_creds.assert_called_with(gssapi.creds.Credentials, - usage="initiate", - name=gssapi_uname("user@REALM", )) + fake_creds.assert_called_with( + gssapi.creds.Credentials, + usage="initiate", + name=gssapi_uname( + "user@REALM", + ), + ) fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, - creds=b"fake creds", mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=b"fake creds", + mech=SPNEGO, + ) def test_realm_override(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname - auth = requests_gssapi.HTTPKerberosAuth( - hostname_override="otherhost.otherdomain.org") + auth = requests_gssapi.HTTPKerberosAuth(hostname_override="otherhost.otherdomain.org") auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@otherhost.otherdomain.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=None, + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") def test_opportunistic_auth(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): auth = requests_gssapi.HTTPSPNEGOAuth(opportunistic_auth=True) request = requests.Request(url="http://www.example.org") auth.__call__(request) - self.assertTrue('Authorization' in request.headers) - self.assertEqual(request.headers.get('Authorization'), - b64_negotiate_response) + self.assertTrue("Authorization" in request.headers) + self.assertEqual(request.headers.get("Authorization"), b64_negotiate_response) def test_explicit_creds(self): - with patch.multiple("gssapi.Credentials", __new__=fake_creds), \ - patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.Credentials", __new__=fake_creds), patch.multiple( + "gssapi.SecurityContext", __init__=fake_init, step=fake_resp + ): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname creds = gssapi.Credentials() auth = requests_gssapi.HTTPSPNEGOAuth(creds=creds) auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, - creds=b"fake creds", mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=b"fake creds", + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") def test_explicit_mech(self): - with patch.multiple("gssapi.Credentials", __new__=fake_creds), \ - patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.Credentials", __new__=fake_creds), patch.multiple( + "gssapi.SecurityContext", __init__=fake_init, step=fake_resp + ): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname - fake_mech = b'fake mech' + fake_mech = b"fake mech" auth = requests_gssapi.HTTPSPNEGOAuth(mech=fake_mech) auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, - creds=None, mech=b'fake mech') + usage="initiate", + flags=gssflags, + creds=None, + mech=b"fake mech", + ) fake_resp.assert_called_with(b"token") def test_target_name(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname - auth = requests_gssapi.HTTPSPNEGOAuth( - target_name="HTTP@otherhost.otherdomain.org") + auth = requests_gssapi.HTTPSPNEGOAuth(target_name="HTTP@otherhost.otherdomain.org") auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@otherhost.otherdomain.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=None, + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()