From c7de5051c97d294b0733cbed432b6edefbc69852 Mon Sep 17 00:00:00 2001 From: Jordan Borean Date: Fri, 16 Feb 2024 12:12:56 +1000 Subject: [PATCH] Update CI and Project Structure Updates the CI and project structure to perform better tests and conform to more modern practices when it comes to Python projects. Also bumps the version to 1.3.0 in preparation for the next release. Signed-off-by: Jordan Borean --- .github/workflows/build.yml | 36 -- .github/workflows/ci.yml | 96 +++++ .gitignore | 1 + HISTORY.rst | 4 +- pyproject.toml | 104 ++++++ requirements.txt | 2 - setup.cfg | 2 - setup.py | 58 --- .../requests_gssapi}/__init__.py | 18 +- .../requests_gssapi}/compat.py | 30 +- .../requests_gssapi}/exceptions.py | 1 + .../requests_gssapi}/gssapi_.py | 85 +++-- .../test_requests_gssapi.py | 340 ++++++++---------- 13 files changed, 428 insertions(+), 349 deletions(-) delete mode 100644 .github/workflows/build.yml create mode 100644 .github/workflows/ci.yml create mode 100644 pyproject.toml delete mode 100644 requirements.txt delete mode 100644 setup.cfg delete mode 100755 setup.py rename {requests_gssapi => src/requests_gssapi}/__init__.py (65%) rename {requests_gssapi => src/requests_gssapi}/compat.py (78%) rename {requests_gssapi => src/requests_gssapi}/exceptions.py (99%) rename {requests_gssapi => src/requests_gssapi}/gssapi_.py (85%) rename test_requests_gssapi.py => tests/test_requests_gssapi.py (68%) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml deleted file mode 100644 index 574ea8e..0000000 --- a/.github/workflows/build.yml +++ /dev/null @@ -1,36 +0,0 @@ -{ - "name": "Build", - "on": { "pull_request": null }, - "jobs": { - "linux": { - "runs-on": "ubuntu-latest", - "strategy": { - "fail-fast": false, - "matrix": { - "python": [ - "3.6", - "3.7", - "3.8", - "3.9", - ], - }, - }, - "steps": [ - { "uses": "actions/checkout@v2" }, - { - "uses": "actions/setup-python@v2", - "with": { "python-version": "${{ matrix.python }}"}, - }, - { "run": "sudo apt update" }, - { "run": "sudo apt install -y libkrb5-dev" }, - { "run": "pip install flake8" }, - { "run": "pip install -r requirements.txt" }, - { "run": "python3 -m unittest" }, - { - "run": "flake8", - "if": "${{ matrix['python'] == 3.9 }}", - }, - ], - }, - }, -} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..b1be615 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,96 @@ +name: Test requests-gssapi +on: + push: + branches: + - main + + pull_request: + branches: + - main + + release: + types: + - published + +jobs: + build: + name: build sdist and wheel + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: build sdist and wheel + run: | + set -ex + + python -m pip install build + python -m build + + - uses: actions/upload-artifact@v4 + with: + name: artifact + path: ./dist/* + + test: + name: test + needs: + - build + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: + - "3.8" + - "3.9" + - "3.10" + - "3.11" + - "3.12" + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - uses: actions/download-artifact@v4 + with: + name: artifact + path: ./dist + + - name: Test + shell: bash + run: | + set -ex + + TOX_PYTHON=py$( echo '${{ matrix.python-version }}' | tr -d . ) + + sudo apt update + sudo apt install -y libkrb5-dev + + python -Im pip install tox + python -Im tox run \ + -f sanity \ + -f "${TOX_PYTHON}" \ + --installpkg dist/*.whl + env: + PYTEST_ADDOPTS: --color=yes --junitxml junit/test-results.xml + + publish: + name: publish + needs: + - test + runs-on: ubuntu-latest + permissions: + # IMPORTANT: this permission is mandatory for trusted publishing + id-token: write + + steps: + - uses: actions/download-artifact@v4 + with: + name: artifact + path: ./dist + + - name: Publish + if: startsWith(github.event.release.tag_name, 'v') + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.gitignore b/.gitignore index 7371324..4ca08ce 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,6 @@ env/ build/ dist/ +.tox/ requests_gssapi.egg-info/ diff --git a/HISTORY.rst b/HISTORY.rst index c6c392d..159841e 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -1,10 +1,12 @@ History ======= -FUTURE: TBD +1.3.0: 2024-02-16 ----------- - Drop flag for out of sequence detection - Use SPNEGO mechanism by default +- Fix ``SanitizedResponse.content`` to be ``bytes`` which reflects the base type +- Migrated project to a ``src`` layout setup and a ``PEP 621`` compliant build, this should have no impact on end users 1.2.3: 2021-02-08 ----------------- diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ff5bf7c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,104 @@ +[build-system] +requires = [ + "setuptools >= 61.0.0", # Support for setuptools config in pyproject.toml +] +build-backend = "setuptools.build_meta" + +[project] +name = "requests-gssapi" +description = "A GSSAPI authentication handler for python-requests" +readme = "README.rst" +requires-python = ">=3.8" +license = { file = "LICENSE" } +authors = [ + { name = "Robbie Harwood", email = "rharwood@redhat.com" }, + { name = "Ian Cordasco" }, + { name = "Cory Benfield" }, + { name = "Michael Komitee" }, +] +keywords = ["ansible", "debug", "lsp", "dap"] +classifiers = [ + "License :: OSI Approved :: ISC License (ISCL)", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [ + "requests >= 1.1.0", + "gssapi", +] +dynamic = ["version"] + +[project.urls] +homepage = "https://github.com/pythongssapi/requests-gssapi" + +[project.optional-dependencies] +dev = [ + "black == 24.2.0", + "isort == 5.13.2", + "pytest", + "tox >= 4.0.0", +] + +[tool.setuptools] +include-package-data = true + +[tool.setuptools.dynamic] +version = { attr = "requests_gssapi.__version__" } + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.black] +line-length = 120 +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' + +[tool.isort] +profile = "black" + +[tool.pytest.ini_options] +addopts = "--import-mode=importlib" +testpaths = "tests" +junit_family = "xunit2" + +[tool.tox] +legacy_tox_ini = """ +[tox] +env_list = + sanity + py3{8,9,10,11,12}-tests +min_version = 4.0 + +[testenv] +package = wheel +wheel_build_env = .pkg + +extras = + dev +install_command = python -Im pip install --no-compile {opts} {packages} + +passenv = + PYTEST_ADDOPTS + +commands = + sanity: python -m black . --check + sanity: python -m isort . --check-only + + tests: python -m pytest -v +""" diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 11c021a..0000000 --- a/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -requests>=1.1.0 -gssapi diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 5e40900..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[wheel] -universal = 1 diff --git a/setup.py b/setup.py deleted file mode 100755 index 3b3cd78..0000000 --- a/setup.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python -# coding: utf-8 -import os -import re -from setuptools import setup - -path = os.path.dirname(__file__) -desc_fd = os.path.join(path, 'README.rst') -hist_fd = os.path.join(path, 'HISTORY.rst') - -long_desc = '' -short_desc = 'A GSSAPI authentication handler for python-requests' - -if os.path.isfile(desc_fd): - with open(desc_fd) as fd: - long_desc = fd.read() - -if os.path.isfile(hist_fd): - with open(hist_fd) as fd: - long_desc = '\n\n'.join([long_desc, fd.read()]) - - -def get_version(): - """ - Simple function to extract the current version using regular expressions. - """ - reg = re.compile(r'__version__ = [\'"]([^\'"]*)[\'"]') - with open('requests_gssapi/__init__.py') as fd: - matches = list(filter(lambda x: x, map(reg.match, fd))) - - if not matches: - raise RuntimeError( - 'Could not find the version information for requests_gssapi' - ) - - return matches[0].group(1) - - -setup( - name='requests-gssapi', - description=short_desc, - long_description=long_desc, - author='Ian Cordasco, Cory Benfield, Michael Komitee, Robbie Harwood', - author_email='rharwood@redhat.com', - url='https://github.com/pythongssapi/requests-gssapi', - packages=['requests_gssapi'], - package_data={'': ['LICENSE', 'AUTHORS']}, - include_package_data=True, - version=get_version(), - install_requires=[ - 'requests>=1.1.0', - 'gssapi', - ], - test_suite='test_requests_gssapi', - classifiers=[ - "License :: OSI Approved :: ISC License (ISCL)" - ], -) diff --git a/requests_gssapi/__init__.py b/src/requests_gssapi/__init__.py similarity index 65% rename from requests_gssapi/__init__.py rename to src/requests_gssapi/__init__.py index 65e6d4e..eca9cea 100644 --- a/requests_gssapi/__init__.py +++ b/src/requests_gssapi/__init__.py @@ -12,14 +12,22 @@ The entire `requests.api` should be supported. """ + import logging -from .gssapi_ import HTTPSPNEGOAuth, SPNEGO, REQUIRED, OPTIONAL, DISABLED # noqa +from .compat import HTTPKerberosAuth, NullHandler from .exceptions import MutualAuthenticationError -from .compat import NullHandler, HTTPKerberosAuth +from .gssapi_ import DISABLED, OPTIONAL, REQUIRED, SPNEGO, HTTPSPNEGOAuth # noqa logging.getLogger(__name__).addHandler(NullHandler()) -__all__ = ('HTTPSPNEGOAuth', 'HTTPKerberosAuth', 'MutualAuthenticationError', - 'SPNEGO', 'REQUIRED', 'OPTIONAL', 'DISABLED') -__version__ = '1.2.3' +__all__ = ( + "HTTPSPNEGOAuth", + "HTTPKerberosAuth", + "MutualAuthenticationError", + "SPNEGO", + "REQUIRED", + "OPTIONAL", + "DISABLED", +) +__version__ = "1.3.0" diff --git a/requests_gssapi/compat.py b/src/requests_gssapi/compat.py similarity index 78% rename from requests_gssapi/compat.py rename to src/requests_gssapi/compat.py index f59f08d..0ae4ca8 100644 --- a/requests_gssapi/compat.py +++ b/src/requests_gssapi/compat.py @@ -1,6 +1,7 @@ """ Compatibility library for older versions of python and requests_kerberos """ + import sys import gssapi @@ -21,9 +22,17 @@ def emit(self, record): class HTTPKerberosAuth(HTTPSPNEGOAuth): """Deprecated compat shim; see HTTPSPNEGOAuth instead.""" - def __init__(self, mutual_authentication=DISABLED, service="HTTP", - delegate=False, force_preemptive=False, principal=None, - hostname_override=None, sanitize_mutual_error_response=True): + + def __init__( + self, + mutual_authentication=DISABLED, + service="HTTP", + delegate=False, + force_preemptive=False, + principal=None, + hostname_override=None, + sanitize_mutual_error_response=True, + ): # put these here for later self.principal = principal self.service = service @@ -36,7 +45,8 @@ def __init__(self, mutual_authentication=DISABLED, service="HTTP", delegate=delegate, opportunistic_auth=force_preemptive, creds=None, - sanitize_mutual_error_response=sanitize_mutual_error_response) + sanitize_mutual_error_response=sanitize_mutual_error_response, + ) def generate_request_header(self, response, host, is_preemptive=False): # This method needs to be shimmed because `host` isn't exposed to @@ -45,8 +55,7 @@ def generate_request_header(self, response, host, is_preemptive=False): try: if self.principal is not None: gss_stage = "acquiring credentials" - name = gssapi.Name( - self.principal, gssapi.NameType.user) + name = gssapi.Name(self.principal, gssapi.NameType.user) self.creds = gssapi.Credentials(name=name, usage="initiate") # contexts still need to be stored by host, but hostname_override @@ -60,14 +69,11 @@ def generate_request_header(self, response, host, is_preemptive=False): kerb_host = self.hostname_override kerb_spn = "{0}@{1}".format(self.service, kerb_host) - self.target_name = gssapi.Name( - kerb_spn, gssapi.NameType.hostbased_service) + self.target_name = gssapi.Name(kerb_spn, gssapi.NameType.hostbased_service) - return HTTPSPNEGOAuth.generate_request_header(self, response, - host, is_preemptive) + return HTTPSPNEGOAuth.generate_request_header(self, response, host, is_preemptive) except gssapi.exceptions.GSSError as error: msg = error.gen_message() - log.exception( - "generate_request_header(): {0} failed:".format(gss_stage)) + log.exception("generate_request_header(): {0} failed:".format(gss_stage)) log.exception(msg) raise SPNEGOExchangeError("%s failed: %s" % (gss_stage, msg)) diff --git a/requests_gssapi/exceptions.py b/src/requests_gssapi/exceptions.py similarity index 99% rename from requests_gssapi/exceptions.py rename to src/requests_gssapi/exceptions.py index a13c400..ee40dd6 100644 --- a/requests_gssapi/exceptions.py +++ b/src/requests_gssapi/exceptions.py @@ -5,6 +5,7 @@ This module contains the set of exceptions. """ + from requests.exceptions import RequestException diff --git a/requests_gssapi/gssapi_.py b/src/requests_gssapi/gssapi_.py similarity index 85% rename from requests_gssapi/gssapi_.py rename to src/requests_gssapi/gssapi_.py index db797b7..0d3c9f6 100644 --- a/requests_gssapi/gssapi_.py +++ b/src/requests_gssapi/gssapi_.py @@ -1,15 +1,13 @@ -import re import logging - -from base64 import b64encode, b64decode +import re +from base64 import b64decode, b64encode import gssapi - from requests.auth import AuthBase -from requests.models import Response from requests.compat import urlparse -from requests.structures import CaseInsensitiveDict from requests.cookies import cookiejar_from_dict +from requests.models import Response +from requests.structures import CaseInsensitiveDict from .exceptions import MutualAuthenticationError, SPNEGOExchangeError @@ -57,23 +55,23 @@ def __init__(self, response): self._content = b"" self.cookies = cookiejar_from_dict({}) self.headers = CaseInsensitiveDict() - self.headers['content-length'] = '0' - for header in ('date', 'server'): + self.headers["content-length"] = "0" + for header in ("date", "server"): if header in response.headers: self.headers[header] = response.headers[header] def _negotiate_value(response): """Extracts the gssapi authentication token from the appropriate header""" - if hasattr(_negotiate_value, 'regex'): + if hasattr(_negotiate_value, "regex"): regex = _negotiate_value.regex else: # There's no need to re-compile this EVERY time it is called. Compile # it once and you won't have the performance hit of the compilation. - regex = re.compile(r'Negotiate\s*([^,]*)', re.I) + regex = re.compile(r"Negotiate\s*([^,]*)", re.I) _negotiate_value.regex = regex - authreq = response.headers.get('www-authenticate', None) + authreq = response.headers.get("www-authenticate", None) if authreq: match_obj = regex.search(authreq) if match_obj: @@ -110,9 +108,17 @@ class HTTPSPNEGOAuth(AuthBase): server responses. See the `SanitizedResponse` class. """ - def __init__(self, mutual_authentication=DISABLED, target_name="HTTP", - delegate=False, opportunistic_auth=False, creds=None, - mech=SPNEGO, sanitize_mutual_error_response=True): + + def __init__( + self, + mutual_authentication=DISABLED, + target_name="HTTP", + delegate=False, + opportunistic_auth=False, + creds=None, + mech=SPNEGO, + sanitize_mutual_error_response=True, + ): self.context = {} self.pos = None self.mutual_authentication = mutual_authentication @@ -142,27 +148,25 @@ def generate_request_header(self, response, host, is_preemptive=False): gss_stage = "initiating context" name = self.target_name if type(name) != gssapi.Name: - if '@' not in name: + if "@" not in name: name = "%s@%s" % (name, host) name = gssapi.Name(name, gssapi.NameType.hostbased_service) self.context[host] = gssapi.SecurityContext( - usage="initiate", flags=gssflags, name=name, - creds=self.creds, mech=self.mech) + usage="initiate", flags=gssflags, name=name, creds=self.creds, mech=self.mech + ) gss_stage = "stepping context" if is_preemptive: gss_response = self.context[host].step() else: - gss_response = self.context[host].step( - _negotiate_value(response)) + gss_response = self.context[host].step(_negotiate_value(response)) return "Negotiate {0}".format(b64encode(gss_response).decode()) except gssapi.exceptions.GSSError as error: msg = error.gen_message() - log.exception( - "generate_request_header(): {0} failed:".format(gss_stage)) + log.exception("generate_request_header(): {0} failed:".format(gss_stage)) log.exception(msg) raise SPNEGOExchangeError("%s failed: %s" % (gss_stage, msg)) @@ -177,9 +181,8 @@ def authenticate_user(self, response, **kwargs): # GSS Failure, return existing response return response - log.debug("authenticate_user(): Authorization header: {0}".format( - auth_header)) - response.request.headers['Authorization'] = auth_header + log.debug("authenticate_user(): Authorization header: {0}".format(auth_header)) + response.request.headers["Authorization"] = auth_header # Consume the content so we can reuse the connection for the next # request. @@ -225,8 +228,7 @@ def handle_other(self, response): # raise an exception so the user doesn't use an untrusted # response. log.error("handle_other(): Mutual authentication failed") - raise MutualAuthenticationError( - "Unable to authenticate {0}".format(response)) + raise MutualAuthenticationError("Unable to authenticate {0}".format(response)) # Authentication successful log.debug("handle_other(): returning {0}".format(response)) @@ -234,11 +236,10 @@ def handle_other(self, response): elif is_http_error or self.mutual_authentication == OPTIONAL: if not response.ok: log.error( - "handle_other(): Mutual authentication unavailable on" - " {0} response".format(response.status_code)) + "handle_other(): Mutual authentication unavailable on" " {0} response".format(response.status_code) + ) - if self.mutual_authentication == REQUIRED and \ - self.sanitize_mutual_error_response: + if self.mutual_authentication == REQUIRED and self.sanitize_mutual_error_response: return SanitizedResponse(response) return response else: @@ -246,8 +247,7 @@ def handle_other(self, response): # required, raise an exception so the user doesn't use an # untrusted response. log.error("handle_other(): Mutual authentication failed") - raise MutualAuthenticationError( - "Unable to authenticate {0}".format(response)) + raise MutualAuthenticationError("Unable to authenticate {0}".format(response)) def authenticate_server(self, response): """ @@ -256,8 +256,7 @@ def authenticate_server(self, response): Returns True on success, False on failure. """ - log.debug("authenticate_server(): Authenticate header: {0}".format( - _negotiate_value(response))) + log.debug("authenticate_server(): Authenticate header: {0}".format(_negotiate_value(response))) host = urlparse(response.url).hostname @@ -274,7 +273,7 @@ def authenticate_server(self, response): def handle_response(self, response, **kwargs): """Takes the given response and tries GSSAPI auth, as needed.""" - num_401s = kwargs.pop('num_401s', 0) + num_401s = kwargs.pop("num_401s", 0) if self.pos is not None: # Rewind the file position indicator of the body to where @@ -301,7 +300,7 @@ def handle_response(self, response, **kwargs): def deregister(self, response): """Deregisters the response handler""" - response.request.deregister_hook('response', self.handle_response) + response.request.deregister_hook("response", self.handle_response) def __call__(self, request): if self.opportunistic_auth: @@ -310,22 +309,20 @@ def __call__(self, request): host = urlparse(request.url).hostname try: - auth_header = self.generate_request_header(None, host, - is_preemptive=True) - log.debug( - "HTTPSPNEGOAuth: Preemptive Authorization header: {0}" - .format(auth_header)) + auth_header = self.generate_request_header(None, host, is_preemptive=True) + log.debug("HTTPSPNEGOAuth: Preemptive Authorization header: {0}".format(auth_header)) except SPNEGOExchangeError as exc: log.warning( "HTTPSPNEGOAuth: Opportunistic auth failed with %s ->" " sending request without adding Authorization header." " Will try again if it results in a 401.", - exc) + exc, + ) else: log.debug("HTTPSPNEGOAuth: Added opportunistic auth header") - request.headers['Authorization'] = auth_header + request.headers["Authorization"] = auth_header - request.register_hook('response', self.handle_response) + request.register_hook("response", self.handle_response) try: self.pos = request.body.tell() except AttributeError: diff --git a/test_requests_gssapi.py b/tests/test_requests_gssapi.py similarity index 68% rename from test_requests_gssapi.py rename to tests/test_requests_gssapi.py index a755732..dabfa0d 100644 --- a/test_requests_gssapi.py +++ b/tests/test_requests_gssapi.py @@ -3,18 +3,16 @@ """Tests for requests_gssapi.""" +import unittest from base64 import b64encode from unittest.mock import Mock, patch -from requests.compat import urlparse -import requests import gssapi +import requests +from requests.compat import urlparse import requests_gssapi -import unittest - -from requests_gssapi import REQUIRED -from requests_gssapi import SPNEGO +from requests_gssapi import REQUIRED, SPNEGO # Note: we're not using the @mock.patch decorator: # > My only word of warning is that in the past, the patch decorator hides @@ -62,94 +60,83 @@ def tearDown(self): def test_negotate_value_extraction(self): response = requests.Response() - response.headers = {'www-authenticate': b64_negotiate_token} - self.assertEqual( - requests_gssapi.gssapi_._negotiate_value(response), - b'token' - ) + response.headers = {"www-authenticate": b64_negotiate_token} + self.assertEqual(requests_gssapi.gssapi_._negotiate_value(response), b"token") def test_negotate_value_extraction_none(self): response = requests.Response() response.headers = {} - self.assertTrue( - requests_gssapi.gssapi_._negotiate_value(response) is None) + self.assertTrue(requests_gssapi.gssapi_._negotiate_value(response) is None) def test_force_preemptive(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): auth = requests_gssapi.HTTPKerberosAuth(force_preemptive=True) request = requests.Request(url="http://www.example.org") auth.__call__(request) - self.assertTrue('Authorization' in request.headers) - self.assertEqual(request.headers.get('Authorization'), - b64_negotiate_response) + self.assertTrue("Authorization" in request.headers) + self.assertEqual(request.headers.get("Authorization"), b64_negotiate_response) def test_no_force_preemptive(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): auth = requests_gssapi.HTTPKerberosAuth() request = requests.Request(url="http://www.example.org") auth.__call__(request) - self.assertTrue('Authorization' not in request.headers) + self.assertTrue("Authorization" not in request.headers) def test_generate_request_header(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth() - self.assertEqual( - auth.generate_request_header(response, host), - b64_negotiate_response) + self.assertEqual(auth.generate_request_header(response, host), b64_negotiate_response) fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - creds=None, mech=SPNEGO, flags=gssflags, usage="initiate") + name=gssapi_sname("HTTP@www.example.org"), creds=None, mech=SPNEGO, flags=gssflags, usage="initiate" + ) fake_resp.assert_called_with(b"token") def test_generate_request_header_init_error(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth() - self.assertRaises(requests_gssapi.exceptions.SPNEGOExchangeError, - auth.generate_request_header, response, host) + self.assertRaises( + requests_gssapi.exceptions.SPNEGOExchangeError, auth.generate_request_header, response, host + ) fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) def test_generate_request_header_step_error(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth() - self.assertRaises(requests_gssapi.exceptions.SPNEGOExchangeError, - auth.generate_request_header, response, host) + self.assertRaises( + requests_gssapi.exceptions.SPNEGOExchangeError, auth.generate_request_header, response, host + ) fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fail_resp.assert_called_with(b"token") def test_authenticate_user(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -161,7 +148,7 @@ def test_authenticate_user(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -171,22 +158,20 @@ def test_authenticate_user(self): self.assertTrue(response in r.history) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - flags=gssflags, usage="initiate", creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), flags=gssflags, usage="initiate", creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_handle_401(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -198,7 +183,7 @@ def test_handle_401(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -208,24 +193,20 @@ def test_handle_401(self): self.assertTrue(response in r.history) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - creds=None, mech=SPNEGO, flags=gssflags, usage="initiate") + name=gssapi_sname("HTTP@www.example.org"), creds=None, mech=SPNEGO, flags=gssflags, usage="initiate" + ) fake_resp.assert_called_with(b"token") def test_authenticate_server(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} auth = requests_gssapi.HTTPKerberosAuth() auth.context = {"www.example.org": gssapi.SecurityContext} @@ -235,17 +216,13 @@ def test_authenticate_server(self): fake_resp.assert_called_with(b"servertoken") def test_handle_other(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": gssapi.SecurityContext} r = auth.handle_other(response_ok) @@ -254,17 +231,13 @@ def test_handle_other(self): fake_resp.assert_called_with(b"servertoken") def test_handle_response_200(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": gssapi.SecurityContext} r = auth.handle_response(response_ok) @@ -273,70 +246,55 @@ def test_handle_response_200(self): fake_resp.assert_called_with(b"servertoken") def test_handle_response_200_mutual_auth_required_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 response_ok.headers = {} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": "CTX"} - self.assertRaises(requests_gssapi.MutualAuthenticationError, - auth.handle_response, response_ok) + self.assertRaises(requests_gssapi.MutualAuthenticationError, auth.handle_response, response_ok) self.assertFalse(fake_resp.called) def test_handle_response_200_mutual_auth_required_failure_2(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": gssapi.SecurityContext} - self.assertRaises(requests_gssapi.MutualAuthenticationError, - auth.handle_response, response_ok) + self.assertRaises(requests_gssapi.MutualAuthenticationError, auth.handle_response, response_ok) fail_resp.assert_called_with(b"servertoken") def test_handle_response_200_mutual_auth_optional_hard_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = { - 'www-authenticate': b64_negotiate_server, - 'authorization': b64_negotiate_response} + response_ok.headers = {"www-authenticate": b64_negotiate_server, "authorization": b64_negotiate_response} - auth = requests_gssapi.HTTPKerberosAuth( - requests_gssapi.OPTIONAL) + auth = requests_gssapi.HTTPKerberosAuth(requests_gssapi.OPTIONAL) auth.context = {"www.example.org": gssapi.SecurityContext} - self.assertRaises(requests_gssapi.MutualAuthenticationError, - auth.handle_response, response_ok) + self.assertRaises(requests_gssapi.MutualAuthenticationError, auth.handle_response, response_ok) fail_resp.assert_called_with(b"servertoken") def test_handle_response_200_mutual_auth_optional_soft_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - auth = requests_gssapi.HTTPKerberosAuth( - requests_gssapi.OPTIONAL) + auth = requests_gssapi.HTTPKerberosAuth(requests_gssapi.OPTIONAL) auth.context = {"www.example.org": gssapi.SecurityContext} r = auth.handle_response(response_ok) @@ -346,8 +304,7 @@ def test_handle_response_200_mutual_auth_optional_soft_failure(self): self.assertFalse(fake_resp.called) def test_handle_response_500_mutual_auth_required_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_500 = requests.Response() response_500.url = "http://www.example.org/" response_500.status_code = 500 @@ -359,14 +316,12 @@ def test_handle_response_500_mutual_auth_required_failure(self): response_500.raw = "RAW" response_500.cookies = "COOKIES" - auth = requests_gssapi.HTTPKerberosAuth( - mutual_authentication=REQUIRED) + auth = requests_gssapi.HTTPKerberosAuth(mutual_authentication=REQUIRED) auth.context = {"www.example.org": "CTX"} r = auth.handle_response(response_500) - self.assertTrue( - isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) + self.assertTrue(isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) self.assertNotEqual(r, response_500) self.assertNotEqual(r.headers, response_500.headers) self.assertEqual(r.status_code, response_500.status_code) @@ -382,18 +337,15 @@ def test_handle_response_500_mutual_auth_required_failure(self): self.assertFalse(fail_resp.called) # re-test with error response sanitizing disabled - auth = requests_gssapi.HTTPKerberosAuth( - sanitize_mutual_error_response=False) + auth = requests_gssapi.HTTPKerberosAuth(sanitize_mutual_error_response=False) auth.context = {"www.example.org": "CTX"} r = auth.handle_response(response_500) - self.assertFalse( - isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) + self.assertFalse(isinstance(r, requests_gssapi.gssapi_.SanitizedResponse)) def test_handle_response_500_mutual_auth_optional_failure(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fail_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fail_resp): response_500 = requests.Response() response_500.url = "http://www.example.org/" response_500.status_code = 500 @@ -405,8 +357,7 @@ def test_handle_response_500_mutual_auth_optional_failure(self): response_500.raw = "RAW" response_500.cookies = "COOKIES" - auth = requests_gssapi.HTTPKerberosAuth( - requests_gssapi.OPTIONAL) + auth = requests_gssapi.HTTPKerberosAuth(requests_gssapi.OPTIONAL) auth.context = {"www.example.org": "CTX"} r = auth.handle_response(response_500) @@ -417,12 +368,11 @@ def test_handle_response_500_mutual_auth_optional_failure(self): def test_handle_response_401(self): # Get a 401 from server, authenticate, and get a 200 back. - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -434,7 +384,7 @@ def test_handle_response_401(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -448,20 +398,18 @@ def test_handle_response_401(self): self.assertTrue(response in r.history) auth.handle_other.assert_called_once_with(response_ok) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_handle_response_401_rejected(self): # Get a 401 from server, authenticate, and get another 401 back. # Ensure there is no infinite recursion. - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): connection = Mock() def connection_send(self, *args, **kwargs): @@ -480,7 +428,7 @@ def connection_send(self, *args, **kwargs): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" @@ -491,36 +439,33 @@ def connection_send(self, *args, **kwargs): r = auth.handle_response(response) self.assertEqual(r.status_code, 401) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( - name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("HTTP@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_generate_request_header_custom_service(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth(service="barfoo") auth.generate_request_header(response, host), fake_init.assert_called_with( - name=gssapi_sname("barfoo@www.example.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + name=gssapi_sname("barfoo@www.example.org"), usage="initiate", flags=gssflags, creds=None, mech=SPNEGO + ) fake_resp.assert_called_with(b"token") def test_delegation(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response_ok = requests.Response() response_ok.url = "http://www.example.org/" response_ok.status_code = 200 - response_ok.headers = {'www-authenticate': b64_negotiate_server} + response_ok.headers = {"www-authenticate": b64_negotiate_server} connection = Mock() connection.send = Mock(return_value=response_ok) @@ -532,121 +477,138 @@ def test_delegation(self): response = requests.Response() response.request = request response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} response.status_code = 401 response.connection = connection response._content = b"" response.raw = raw - auth = requests_gssapi.HTTPKerberosAuth(service="HTTP", - delegate=True) + auth = requests_gssapi.HTTPKerberosAuth(service="HTTP", delegate=True) r = auth.authenticate_user(response) self.assertTrue(response in r.history) self.assertEqual(r, response_ok) - self.assertEqual(request.headers['Authorization'], - b64_negotiate_response) + self.assertEqual(request.headers["Authorization"], b64_negotiate_response) connection.send.assert_called_with(request) raw.release_conn.assert_called_with() fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssdelegflags, creds=None, mech=SPNEGO) + usage="initiate", + flags=gssdelegflags, + creds=None, + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") def test_principal_override(self): - with patch.multiple("gssapi.Credentials", __new__=fake_creds), \ - patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.Credentials", __new__=fake_creds), patch.multiple( + "gssapi.SecurityContext", __init__=fake_init, step=fake_resp + ): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname auth = requests_gssapi.HTTPKerberosAuth(principal="user@REALM") auth.generate_request_header(response, host) - fake_creds.assert_called_with(gssapi.creds.Credentials, - usage="initiate", - name=gssapi_uname("user@REALM", )) + fake_creds.assert_called_with( + gssapi.creds.Credentials, + usage="initiate", + name=gssapi_uname( + "user@REALM", + ), + ) fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, - creds=b"fake creds", mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=b"fake creds", + mech=SPNEGO, + ) def test_realm_override(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname - auth = requests_gssapi.HTTPKerberosAuth( - hostname_override="otherhost.otherdomain.org") + auth = requests_gssapi.HTTPKerberosAuth(hostname_override="otherhost.otherdomain.org") auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@otherhost.otherdomain.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=None, + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") def test_opportunistic_auth(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): auth = requests_gssapi.HTTPSPNEGOAuth(opportunistic_auth=True) request = requests.Request(url="http://www.example.org") auth.__call__(request) - self.assertTrue('Authorization' in request.headers) - self.assertEqual(request.headers.get('Authorization'), - b64_negotiate_response) + self.assertTrue("Authorization" in request.headers) + self.assertEqual(request.headers.get("Authorization"), b64_negotiate_response) def test_explicit_creds(self): - with patch.multiple("gssapi.Credentials", __new__=fake_creds), \ - patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.Credentials", __new__=fake_creds), patch.multiple( + "gssapi.SecurityContext", __init__=fake_init, step=fake_resp + ): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname creds = gssapi.Credentials() auth = requests_gssapi.HTTPSPNEGOAuth(creds=creds) auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, - creds=b"fake creds", mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=b"fake creds", + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") def test_explicit_mech(self): - with patch.multiple("gssapi.Credentials", __new__=fake_creds), \ - patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.Credentials", __new__=fake_creds), patch.multiple( + "gssapi.SecurityContext", __init__=fake_init, step=fake_resp + ): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname - fake_mech = b'fake mech' + fake_mech = b"fake mech" auth = requests_gssapi.HTTPSPNEGOAuth(mech=fake_mech) auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@www.example.org"), - usage="initiate", flags=gssflags, - creds=None, mech=b'fake mech') + usage="initiate", + flags=gssflags, + creds=None, + mech=b"fake mech", + ) fake_resp.assert_called_with(b"token") def test_target_name(self): - with patch.multiple("gssapi.SecurityContext", __init__=fake_init, - step=fake_resp): + with patch.multiple("gssapi.SecurityContext", __init__=fake_init, step=fake_resp): response = requests.Response() response.url = "http://www.example.org/" - response.headers = {'www-authenticate': b64_negotiate_token} + response.headers = {"www-authenticate": b64_negotiate_token} host = urlparse(response.url).hostname - auth = requests_gssapi.HTTPSPNEGOAuth( - target_name="HTTP@otherhost.otherdomain.org") + auth = requests_gssapi.HTTPSPNEGOAuth(target_name="HTTP@otherhost.otherdomain.org") auth.generate_request_header(response, host) fake_init.assert_called_with( name=gssapi_sname("HTTP@otherhost.otherdomain.org"), - usage="initiate", flags=gssflags, creds=None, mech=SPNEGO) + usage="initiate", + flags=gssflags, + creds=None, + mech=SPNEGO, + ) fake_resp.assert_called_with(b"token") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()