diff --git a/src/idp_test/__init__.py b/src/idp_test/__init__.py deleted file mode 100644 index a06151101..000000000 --- a/src/idp_test/__init__.py +++ /dev/null @@ -1,426 +0,0 @@ -from __future__ import print_function -from importlib import import_module -import json -import os -import pprint -import types -import argparse -import sys -import six - -import logging -import imp -from saml2 import xmldsig -from saml2 import xmlenc - -from saml2.client import Saml2Client -from saml2.config import SPConfig -from saml2.mdstore import MetadataStore -from saml2.mdstore import MetaData -from saml2.mdstore import ToOld - -from saml2test import CheckError -from saml2test import FatalError -from saml2test import exception_trace -from saml2test import ContextFilter - -from idp_test.base import Conversation -from idp_test.check import CheckSaml2IntMetaData - -# Schemas supported -from saml2 import md -from saml2 import saml -from saml2 import root_logger -from saml2.extension import mdui -from saml2.extension import idpdisc -from saml2.extension import dri -from saml2.extension import mdattr -from saml2.extension import ui -from saml2.metadata import entity_descriptor -from saml2.saml import NAME_FORMAT_UNSPECIFIED - -SCHEMA = [dri, idpdisc, md, mdattr, mdui, saml, ui, xmldsig, xmlenc] - -__author__ = 'rolandh' - -logger = logging.getLogger("") -logger.addHandler(logging.StreamHandler()) -logger.setLevel(logging.DEBUG) -#formatter = logging.Formatter("%(asctime)s %(name)s:%(levelname)s %(message)s") -formatter_2 = logging.Formatter("%(delta).6f - %(levelname)s - [%(name)s] %(message)s") - -cf = ContextFilter() -cf.start() - -streamhandler = logging.StreamHandler(sys.stderr) -streamhandler.setFormatter(formatter_2) - -memoryhandler = logging.handlers.MemoryHandler(1024*10, logging.DEBUG) -memoryhandler.addFilter(cf) - -saml2testlog = logging.getLogger("saml2test") -saml2testlog.addHandler(memoryhandler) -saml2testlog.setLevel(logging.DEBUG) - - -def recursive_find_module(name, path=None): - parts = name.split(".") - - mod_a = None - for part in parts: - try: - (fil, pathname, desc) = imp.find_module(part, path) - except ImportError: - raise - - mod_a = imp.load_module(name, fil, pathname, desc) - sys.modules[name] = mod_a - path = mod_a.__path__ - - return mod_a - - -def get_mod(name, path=None): - try: - mod_a = sys.modules[name] - if not isinstance(mod_a, types.ModuleType): - raise KeyError - except KeyError: - try: - (fil, pathname, desc) = imp.find_module(name, path) - mod_a = imp.load_module(name, fil, pathname, desc) - except ImportError: - if "." in name: - mod_a = recursive_find_module(name, path) - else: - raise - sys.modules[name] = mod_a - return mod_a - - -class SAML2client(object): - - def __init__(self, check_factory): - self.tests = None - self.check_factory = check_factory - - self._parser = argparse.ArgumentParser() - self._parser.add_argument('-d', dest='debug', action='store_true', - help="Print debug information") - self._parser.add_argument('-L', dest='log', action='store_true', - help="Print log information") - self._parser.add_argument( - '-C', dest="cacerts", - help=("CA certs to use to verify HTTPS server certificates, ", - "if HTTPS is used and no server CA certs are defined then ", - "no cert verification will be done")) - self._parser.add_argument('-J', dest="json_config_file", - help="Test target configuration") - self._parser.add_argument('-m', dest="metadata", action='store_true', - help="Return the SP metadata") - self._parser.add_argument( - "-l", dest="list", action="store_true", - help="List all the test flows as a JSON object") - self._parser.add_argument( - "-c", dest="spconfig", default="config", - help=("Configuration module for the SP Test Driver at the current" - "directory or the path specified with the -P option. Do not" - "use relative paths or filename extension.")) - self._parser.add_argument( - "-P", dest="path", default=".", - help="Path to the configuration stuff") - self._parser.add_argument("-t", dest="testpackage", - help="Module describing tests") - self._parser.add_argument("-O", dest="operations", - help="Tests") - self._parser.add_argument("-Y", dest="pysamllog", action='store_true', - help="Print PySAML2 logs") - self._parser.add_argument("-H", dest="pretty", action='store_true', - help="Output summary on stdout as pretty " - "printed python dict instead of JSON") - self._parser.add_argument("-i", dest="insecure", action='store_true', - help="Do not verify SSL certificate") - self._parser.add_argument("oper", nargs="?", help="Which test to run") - - self.interactions = None - self.entity_id = None - self.sp_config = None - self.constraints = {} - self.operations = None - self.args = None - - def json_config_file(self): - if self.args.json_config_file == "-": - return json.loads(sys.stdin.read()) - else: - return json.loads(open(self.args.json_config_file).read()) - - def sp_configure(self, metadata_construction=False): - """ - Need to know where 4 different things are. The config, key_file and - cert_file files and the attributemaps directory - """ - # Always first look in the present working directory - sys.path.insert(0, self.args.path) - if self.args.path != ".": - sys.path.insert(0, ".") - mod = import_module(self.args.spconfig) - - if self.args.path != ".": - for param in ["attribute_map_dir", "key_file", "cert_file"]: - if mod.CONFIG[param].startswith("/"): # Absolute path - continue - - for _path in [".", self.args.path]: - _obj = os.path.join(_path, mod.CONFIG[param]) - _obj = os.path.normpath(_obj) - if os.path.exists(_obj): - mod.CONFIG[param] = _obj - break - - self.sp_config = SPConfig().load(mod.CONFIG, metadata_construction) - - if not self.args.insecure: - self.sp_config.verify_ssl_cert = False - else: - if self.args.ca_certs: - self.sp_config.ca_certs = self.args.ca_certs - else: - self.sp_config.ca_certs = "../keys/cacert.pem" - - def setup(self): - self.json_config = self.json_config_file() - - _jc = self.json_config - - try: - self.interactions = _jc["interaction"] - except KeyError: - self.interactions = [] - - self.sp_configure() - - metadata = MetadataStore(SCHEMA, self.sp_config.attribute_converters, - self.sp_config) - info = _jc["metadata"].encode("utf-8") - md = MetaData(SCHEMA, self.sp_config.attribute_converters, info) - md.load() - metadata[0] = md - self.sp_config.metadata = metadata - - if self.args.testpackage: - self.tests = import_module("idp_test.package.%s" % - self.args.testpackage) - - try: - self.entity_id = _jc["entity_id"] - # Verify its the correct metadata - assert self.entity_id in md.entity.keys(), "metadata does not contain entityId %s" % self.entity_id - except KeyError: - if len(md.entity.keys()) == 1: - self.entity_id = md.entity.keys()[0] - else: - raise Exception("Don't know which entity to talk to") - - if "constraints" in _jc: - self.constraints = _jc["constraints"] - if "name_format" not in self.constraints: - self.constraints["name_format"] = NAME_FORMAT_UNSPECIFIED - - def test_summation(self, sid): - status = 0 - for item in self.test_log: - if item["status"] > status: - status = item["status"] - - if status == 0: - status = 1 - - info = { - "id": sid, - "status": status, - "tests": self.test_log - } - - if status == 5: - info["url"] = self.test_log[-1]["url"] - info["htmlbody"] = self.test_log[-1]["message"] - - return info - - def output_log(self, memhndlr, hndlr2): - """ - """ - - print(80 * ":", file=sys.stderr) - hndlr2.setFormatter(formatter_2) - memhndlr.setTarget(hndlr2) - memhndlr.flush() - memhndlr.close() - # streamhandler.setFormatter(formatter_2) - # pys_memoryhandler.setTarget(streamhandler) - # pys_memoryhandler.flush() - # pys_memoryhandler.close() - - def run(self): - self.args = self._parser.parse_args() - - if self.args.pysamllog: - root_logger.addHandler(memoryhandler) - root_logger.setLevel(logging.DEBUG) - - if self.args.operations: - path, name = os.path.split(self.args.operations) - self.operations = get_mod(name, [path]) - else: - self.operations = __import__("idp_test.saml2base", - fromlist=["idp_test"]) - - if self.args.metadata: - return self.make_meta() - elif self.args.list: - return self.list_operations() - elif self.args.oper == "check": - return self.verify_metadata() - else: - if not self.args.oper: - raise Exception("Missing test case specification") - self.args.oper = self.args.oper.strip("'") - self.args.oper = self.args.oper.strip('"') - - try: - self.setup() - except (AttributeError, ToOld) as err: - print("Configuration Error: %s" % err, file=sys.stderr) - - self.client = Saml2Client(self.sp_config) - conv = None - - if self.args.pretty: - pp = pprint.PrettyPrinter(indent=4) - else: - pp = None - - try: - try: - oper = self.operations.OPERATIONS[self.args.oper] - except KeyError: - if self.tests: - try: - oper = self.tests.OPERATIONS[self.args.oper] - except ValueError: - logger.error("Undefined testcase") - return - else: - logger.error("Undefined testcase") - return - - logger.info("Starting conversation") - conv = Conversation(self.client, self.sp_config, self.interactions, - check_factory=self.check_factory, - entity_id=self.entity_id, - constraints=self.constraints) - conv.do_sequence(oper) - #testres, trace = do_sequence(oper, - self.test_log = conv.test_output - tsum = self.test_summation(self.args.oper) - err = None - except CheckError as err: - self.test_log = conv.test_output - tsum = self.test_summation(self.args.oper) - except FatalError as err: - if conv: - self.test_log = conv.test_output - self.test_log.append(exception_trace("RUN", err)) - else: - self.test_log = exception_trace("RUN", err) - tsum = self.test_summation(self.args.oper) - except Exception as err: - if conv: - self.test_log = conv.test_output - self.test_log.append(exception_trace("RUN", err)) - else: - self.test_log = exception_trace("RUN", err) - tsum = self.test_summation(self.args.oper) - - if pp: - pp.pprint(tsum) - else: - print(json.dumps(tsum), file=sys.stdout) - - if tsum["status"] > 1 or self.args.debug or err: - self.output_log(memoryhandler, streamhandler) - - def list_operations(self): - lista = [] - for key, val in self.operations.OPERATIONS.items(): - item = {"id": key, "name": val["name"]} - try: - _desc = val["descr"] - if isinstance(_desc, six.string_types): - item["descr"] = _desc - else: - item["descr"] = "\n".join(_desc) - except KeyError: - pass - - for key in ["depend", "endpoints"]: - try: - item[key] = val[key] - except KeyError: - pass - - lista.append(item) - - if self.args.testpackage: - mod = import_module(self.args.testpackage, "idp_test") - for key, val in mod.OPERATIONS.items(): - item = {"id": key, "name": val["name"]} - try: - _desc = val["descr"] - if isinstance(_desc, six.string_types): - item["descr"] = _desc - else: - item["descr"] = "\n".join(_desc) - except KeyError: - pass - - for key in ["depends", "endpoints"]: - try: - item[key] = val[key] - except KeyError: - pass - - lista.append(item) - - print(json.dumps(lista)) - - def _get_operation(self, operation): - return self.operations.OPERATIONS[operation] - - def make_meta(self): - self.sp_configure(True) - print(entity_descriptor(self.sp_config)) - - def list_conf_id(self): - sys.path.insert(0, ".") - mod = import_module("config") - _res = dict([(key, cnf["description"]) for key, cnf in - mod.CONFIG.items()]) - print(json.dumps(_res)) - - def verify_metadata(self): - self.json_config = self.json_config_file() - self.sp_configure() - - metadata = MetadataStore(SCHEMA, self.sp_config.attribute_converters, - self.sp_config.xmlsec_binary) - info = self.json_config["metadata"].encode("utf-8") - md = MetaData(SCHEMA, self.sp_config.attribute_converters, info) - md.load() - metadata[0] = md - env = {"metadata": metadata} - chk = CheckSaml2IntMetaData() - output = [] - res = chk(env, output) - print(res, file=sys.stdout) diff --git a/src/idp_test/base.py b/src/idp_test/base.py deleted file mode 100644 index 4707147df..000000000 --- a/src/idp_test/base.py +++ /dev/null @@ -1,259 +0,0 @@ -#!/usr/bin/env python -import inspect -import logging -import urllib -import cookielib - -from saml2 import BINDING_HTTP_REDIRECT, BINDING_URI -from saml2 import BINDING_HTTP_POST -from saml2 import BINDING_SOAP - -from saml2.mdstore import REQ2SRV -from saml2.pack import http_redirect_message, http_form_post_message -from saml2.s_utils import rndstr - -from saml2test import tool -from saml2test import CheckError, FatalError -from saml2test.interaction import InteractionNeeded - -try: - from xml.etree import cElementTree as ElementTree - if ElementTree.VERSION < '1.3.0': - # cElementTree has no support for register_namespace - # neither _namespace_map, thus we sacrify performance - # for correctness - from xml.etree import ElementTree -except ImportError: - import cElementTree as ElementTree - - -__author__ = 'rohe0002' - -logger = logging.getLogger(__name__) - - -class HTTPError(Exception): - pass - - -def unpack_form(_str, ver="SAMLRequest"): - SR_STR = "name=\"%s\" value=\"" % ver - RS_STR = 'name="RelayState" value="' - - i = _str.find(SR_STR) - i += len(SR_STR) - j = _str.find('"', i) - - sr = _str[i:j] - - k = _str.find(RS_STR, j) - k += len(RS_STR) - l = _str.find('"', k) - - rs = _str[k:l] - - return {ver: sr, "RelayState": rs} - - -def form_post(_dict): - return urllib.urlencode(_dict) - - -def tuple_list2dict(tl): - return dict(tl) - - -class Conversation(tool.Conversation): - def __init__(self, client, config, interaction, - check_factory, entity_id, msg_factory=None, - features=None, verbose=False, constraints=None): - tool.Conversation.__init__(self, client, config, - interaction, check_factory, msg_factory, - features, verbose) - self.entity_id = entity_id - self.cjar = {"browser": cookielib.CookieJar(), - "rp": cookielib.CookieJar(), - "service": cookielib.CookieJar()} - - self.args = {} - self.qargs = {} - self.response_args = {} - self.saml_response = [] - self.destination = "" - self.request = None - self.position = "" - self.response = None - self.oper = None - self.msg_constraints = constraints - - def send(self): - srvs = getattr(self.client.metadata, REQ2SRV[self.oper.request])( - self.args["entity_id"], self.args["request_binding"], "idpsso") - - response = None - for srv in srvs: - response = self._send(srv) - if response is not None: - break - - return response - - def _send(self, srv): - _client = self.client - loc = srv["location"] - self.qargs["destination"] = loc - self.response_args = {} - use_artifact = getattr(self.oper, "use_artifact", False) - - try: - req = self.oper.args["message"] - except KeyError: - req = self.qfunc(**self.qargs) - - req_id, self.request = self.oper.pre_processing(req, self.args) - str_req = "%s" % self.request - - if use_artifact: - saml_art = _client.use_artifact(str_req, self.args["entity_id"]) - logger.info("SAML Artifact: %s", saml_art) - info_typ = "SAMLart" - else: - logger.info("SAML Request: %s", str_req) - info_typ = "SAMLRequest" - # depending on binding send the query - - if self.args["request_binding"] is BINDING_SOAP: - res = _client.send_using_soap(str_req, loc) - if res.status_code >= 400: - logger.info("Received a HTTP error (%d) '%s'", - res.status_code, res.text) - raise HTTPError(res.text) - else: - self.response_args["binding"] = BINDING_SOAP - else: - self.response_args["binding"] = self.args["response_binding"] - if self.args["request_binding"] is BINDING_HTTP_REDIRECT: - htargs = http_redirect_message(str_req, loc, self.relay_state, - info_typ) - self.response_args["outstanding"] = {self.request.id: "/"} - # - res = _client.send(htargs["headers"][0][1], "GET") - elif self.args["request_binding"] is BINDING_HTTP_POST: - htargs = http_form_post_message(str_req, loc, self.relay_state, - info_typ) - info = unpack_form(htargs["data"][3]) - data = form_post(info) - self.response_args["outstanding"] = {self.request.id: "/"} - htargs["data"] = data - htargs["headers"] = [("Content-type", - 'application/x-www-form-urlencoded')] - res = _client.send(loc, "POST", **htargs) - elif self.args["request_binding"] == BINDING_URI: - self.response_args["binding"] = BINDING_URI - htargs = _client.use_http_uri(str_req, "SAMLRequest", loc) - res = _client.send(htargs["url"], "GET") - else: - res = None - - if res is not None and res.status_code >= 400: - logger.info("Received a HTTP error (%d) '%s'", - res.status_code, res.text) - raise HTTPError(res.text) - - self.last_response = res - try: - self.last_content = res.text - except AttributeError: - self.last_content = None - - return res - - def init(self, phase): - self.phase = phase - _oper = phase(self) - _oper.setup() - self.args = _oper.args - #self.oper.args = _oper.args.copy() - self.args["entity_id"] = self.entity_id - self.oper = _oper - self.client.cookiejar = self.cjar["browser"] - try: - self.test_sequence(self.oper.tests["pre"]) - except KeyError: - pass - - def setup_request(self): - query = self.oper.request - _client = self.client - _oper = self.oper - - self.response_func = getattr(_client, "parse_%s_response" % query) - qargs = self.args.copy() - self.relay_state = rndstr() - - if "message" not in _oper.args: - self.qfunc = getattr(_client, "create_%s" % query) - # remove args the create function can't handle - fargs = inspect.getargspec(self.qfunc).args - if _oper._class: - fargs.extend([p for p, c, r in - _oper._class.c_attributes.values()]) - fargs.extend([p for p, c in _oper._class.c_children.values()]) - for arg in qargs.keys(): - if arg not in fargs: - del qargs[arg] - - self.qargs = qargs - - def my_endpoints(self): - return [e for e, b in self.client.config.getattr("endpoints", "sp")[ - "assertion_consumer_service"]] - - def handle_result(self): - try: - if self.last_response.status_code in [302, 303]: - return False - except AttributeError: - pass - - _resp = None - try: - response = self.oper.post_processing(self.last_content) - if isinstance(response, dict): - try: - assert self.relay_state == response["RelayState"] - except AssertionError: - assert self.relay_state == response["RelayState"][0] - except KeyError: - pass - if "SAMLResponse" in response: - response = response["SAMLResponse"] - elif "SAMLart" in response: - response = self.client.artifact2message( - response["SAMLart"][0], "idpsso") - - _resp = self.response_func(response, **self.response_args) - if not _resp: - return False - self.saml_response.append(_resp) - try: - self.test_sequence(self.oper.tests["post"]) - except KeyError: - pass - logger.info("SAML Response: %s", _resp) - except FatalError as ferr: - if _resp: - logger.info("Faulty response: %s", _resp) - logger.error("Exception %s", ferr) - raise - except ElementTree.ParseError: - return False - except CheckError: - raise - except Exception as err: - if _resp: - logger.info("Faulty response: %s", _resp) - logger.error("Exception %s", err) - self.err_check("exception", err) - - return True diff --git a/src/idp_test/check.py b/src/idp_test/check.py deleted file mode 100644 index ab115ea94..000000000 --- a/src/idp_test/check.py +++ /dev/null @@ -1,707 +0,0 @@ -import inspect -import logging -import sys -from time import mktime -from saml2.response import AttributeResponse - -from saml2test import check -from saml2test.check import Check - -from saml2.mdstore import REQ2SRV -from saml2.s_utils import UnknownPrincipal -from saml2.s_utils import UnsupportedBinding -from saml2.saml import NAMEID_FORMAT_PERSISTENT -from saml2.saml import NAMEID_FORMAT_UNSPECIFIED -from saml2.saml import NAMEID_FORMAT_TRANSIENT -from saml2.saml import NAME_FORMAT_UNSPECIFIED -from saml2.saml import NAME_FORMAT_URI -from saml2.samlp import STATUS_SUCCESS -from saml2.samlp import Response -from saml2.sigver import cert_from_key_info_dict -from saml2.sigver import key_from_key_value_dict - -from saml2.time_util import str_to_time - -__author__ = 'rolandh' - -INFORMATION = 0 -OK = 1 -WARNING = 2 -ERROR = 3 -CRITICAL = 4 -INTERACTION = 5 - -STATUSCODE = ["INFORMATION", "OK", "WARNING", "ERROR", "CRITICAL", - "INTERACTION"] - -PREFIX = "-----BEGIN CERTIFICATE-----" -POSTFIX = "-----END CERTIFICATE-----" - -M2_TIME_FORMAT = "%b %d %H:%M:%S %Y" - -logger = logging.getLogger(__name__) - -def to_time(_time): - assert _time.endswith(" GMT") - _time = _time[:-4] - return mktime(str_to_time(_time, M2_TIME_FORMAT)) - - -class CheckSaml2IntMetaData(Check): - """ - Checks that the Metadata follows the Saml2Int profile - """ - cid = "check-saml2int-metadata" - msg = "Metadata error" - - def _func(self, conv): - mds = conv.client.metadata.metadata[0] - # Should only be one - ed = mds.entity.values()[0] - res = {} - - assert len(ed["idpsso_descriptor"]) - idpsso = ed["idpsso_descriptor"][0] - - # contact person - if "contact_person" not in idpsso and "contact_person" not in ed: - self._message = "Metadata should contain contact person information" - self._status = WARNING - return res - else: - item = {"support": False, "technical": False} - if "contact_person" in idpsso: - for contact in idpsso["contact_person"]: - try: - item[contact["contact_type"]] = True - except KeyError: - pass - if "contact_person" in ed: - for contact in ed["contact_person"]: - try: - item[contact["contact_type"]] = True - except KeyError: - pass - - if "support" in item and "technical" in item: - pass - elif "support" not in item and "technical" not in item: - self._message = \ - "Missing technical and support contact information" - self._status = WARNING - elif "technical" not in item: - self._message = "Missing technical contact information" - self._status = WARNING - elif "support" not in item: - self._message = "Missing support contact information" - self._status = WARNING - - if self._message: - return res - - # NameID format - if "name_id_format" not in idpsso: - self._message = "Metadata should specify NameID format support" - self._status = WARNING - return res - else: - # should support Transient - item = {NAMEID_FORMAT_TRANSIENT: False} - for nformat in idpsso["name_id_format"]: - try: - item[nformat["text"]] = True - except KeyError: - pass - - if not item[NAMEID_FORMAT_TRANSIENT]: - self._message = "IdP should support Transient NameID Format" - self._status = WARNING - return res - - return res - - -class CheckATMetaData(CheckSaml2IntMetaData): - cid = "check-at-metadata" - - def verify_key_info(self, ki): - # key_info - # one or more key_value and/or x509_data.X509Certificate - - verified_x509_keys = [] - - xkeys = cert_from_key_info_dict(ki) - vkeys = key_from_key_value_dict(ki) - - if xkeys or vkeys: - if xkeys: - # time validity is checked in cert_from_key_info_dict - verified_x509_keys.append(xkeys) - if vkeys: # don't expect this to happen - pass - - if not verified_x509_keys: - if cert_from_key_info_dict(ki, ignore_age=True): - self._message = "Keys too old" - else: - self._message = "Missing KeyValue or X509Data.X509Certificate" - self._status = CRITICAL - return False - - if xkeys and vkeys: - # verify that it's the same keys TODO - pass - - return True - - def verify_key_descriptor(self, kd): - # key_info - if not self.verify_key_info(kd["key_info"]): - return False - - # use - if "use" in kd: - try: - assert kd["use"] in ["encryption", "signing"] - except AssertionError: - self._message = "Unknown use specification: '%s'" % kd.use.text - self._status = CRITICAL - return False - - return True - - def _func(self, conv): - mds = conv.client.metadata.metadata[0] - # Should only be one - ed = mds.entity.values()[0] - res = {} - - assert len(ed["idpsso_descriptor"]) - idpsso = ed["idpsso_descriptor"][0] - for kd in idpsso["key_descriptor"]: - if not self.verify_key_descriptor(kd): - return res - - return CheckSaml2IntMetaData._func(self, conv) - - -class CheckSaml2IntAttributes(Check): - """ - Any elements exchanged via any SAML 2.0 messages, - assertions, or metadata MUST contain a NameFormat of - urn:oasis:names:tc:SAML:2.0:attrname-format:uri. - """ - cid = "check-saml2int-attributes" - msg = "Attribute error" - - def _func(self, conv): - response = conv.saml_response[-1] - try: - opaque_identifier = conv.opaque_identifier - except AttributeError: - opaque_identifier = False - try: - name_format_not_specified = conv.name_format_not_specified - except AttributeError: - name_format_not_specified = False - - res = {} - - # should be a list but isn't - #assert len(response.assertion) == 1 - assertion = response.assertion - if isinstance(response, AttributeResponse): - pass - else: - assert len(assertion.authn_statement) == 1 - assert len(assertion.attribute_statement) < 2 - - if assertion.attribute_statement: - atrstat = assertion.attribute_statement[0] - for attr in atrstat.attribute: - try: - assert attr.name_format == NAME_FORMAT_URI - except AssertionError: - self._message = "Attribute name format error" - self._status = CRITICAL - return res - try: - assert attr.name.startswith("urn:oid") - except AssertionError: - self._message = "Attribute name should be an OID" - self._status = CRITICAL - return res - - assert not assertion.subject.encrypted_id - assert not assertion.subject.base_id - - if opaque_identifier: - try: - assert assertion.subject.name_id.format == \ - NAMEID_FORMAT_PERSISTENT - - except AssertionError: - self._message = "NameID format should be PERSISTENT" - self._status = WARNING - - if name_format_not_specified: - try: - assert assertion.subject.name_id.format == \ - NAMEID_FORMAT_TRANSIENT - except AssertionError: - self._message = "NameID format should be TRANSIENT" - self._status = WARNING - - return res - - -class CheckSubjectNameIDFormat(Check): - """ - The element tailors the name identifier in the subjects of - assertions resulting from an . - When this element is used, if the content is not understood by or acceptable - to the identity provider, then a message element MUST be - returned with an error , and MAY contain a second-level - of urn:oasis:names:tc:SAML:2.0:status:InvalidNameIDPolicy. - If the Format value is omitted or set to urn:oasis:names:tc:SAML:2.0:nameid- - format:unspecified, then the identity provider is free to return any kind - of identifier, subject to any additional constraints due to the content of - this element or the policies of the identity provider or principal. - """ - cid = "check-saml2int-nameid-format" - msg = "Attribute error" - - def _func(self, conv): - response = conv.saml_response[-1].response - request = conv.request - - res = {} - if request.name_id_policy: - nformat = request.name_id_policy.format - sp_name_qualifier = request.name_id_policy.sp_name_qualifier - - subj = response.assertion.subject - try: - assert subj.name_id.format == nformat - if sp_name_qualifier: - assert subj.name_id.sp_name_qualifier == sp_name_qualifier - except AssertionError: - self._message = "The IdP returns wrong NameID format" - self._status = CRITICAL - - return res - - -class CheckLogoutSupport(Check): - """ - Verifies that the tested entity supports single log out - """ - cid = "check-logout-support" - msg = "Does not support logout" - - def _func(self, conv): - mds = conv.client.metadata.metadata[0] - # Should only be one - ed = mds.entity.values()[0] - - assert len(ed["idpsso_descriptor"]) - - idpsso = ed["idpsso_descriptor"][0] - try: - assert idpsso["single_logout_service"] - except AssertionError: - self._message = self.msg - self._status = CRITICAL - - return {} - - -class VerifyLogout(Check): - cid = "verify-logout" - msg = "Logout failed" - - def _func(self, conv): - # Check that the logout response says it was a success - resp = conv.saml_response[-1] - status = resp.response.status - try: - assert status.status_code.value == STATUS_SUCCESS - except AssertionError: - self._message = self.msg - self._status = CRITICAL - - # Check that there are no valid cookies - # should only result in a warning - httpc = conv.client - try: - assert httpc.cookies(conv.destination) == {} - except AssertionError: - self._message = "Remaining cookie ?" - self._status = WARNING - - return {} - - -class VerifyContent(Check): - """ Basic content verification class, does required and max/min checks - """ - cid = "verify-content" - - def _func(self, conv): - try: - conv.saml_response[-1].response.verify() - except ValueError: - self._status = CRITICAL - - return {} - - -class VerifySuccessStatus(Check): - """ Verifies that the response was a success response """ - cid = "verify-success-status" - - def _func(self, conv): - response = conv.saml_response[-1].response - - try: - assert response.status.status_code.value == STATUS_SUCCESS - except AssertionError: - self._message = self.msg - self._status = CRITICAL - - return {} - - -class VerifyNameIDPolicyUsage(Check): - """ - Verify the nameID in the response is according to the provided - NameIDPolicy - """ - cid = "verify-name-id-policy-usage" - - def _func(self, conv): - response = conv.saml_response[-1].response - nip = conv.oper.args["name_id_policy"] - for assertion in response.assertion: - nid = assertion.subject.name_id - if nip.format: - try: - assert nid.format == nip.format - except AssertionError: - self._message = "Wrong NameID Format" - self._status = WARNING - if nip.sp_name_qualifier: - try: - assert nid.sp_name_qualifier == nip.sp_name_qualifier - except AssertionError: - self._message = "Wrong SPNameQualifier" - self._status = WARNING - return {} - - -class VerifyNameIDMapping(Check): - """ - Verify that a new NameID is issued and that it follows the - given policy. - """ - cid = "verify-name-id-mapping" - - def _func(self, conv): - response = conv.saml_response[-1].response - nip = conv.oper.args["name_id_policy"] - nid = response.name_id - if nip.format: - try: - assert nid.format == nip.format - except AssertionError: - self._message = "Wrong NameID Format" - self._status = WARNING - if nip.sp_name_qualifier: - try: - assert nid.sp_name_qualifier == nip.sp_name_qualifier - except AssertionError: - self._message = "Wrong SPNameQualifier" - self._status = WARNING - - return {} - - -class VerifySPProvidedID(Check): - """ - Verify that the IdP allows the SP so set a SP provided ID - """ - cid = "verify-sp-provided-id" - - def _func(self, conv): - response = conv.saml_response[-1].response - nip = conv.oper.args["new_id"] - nid = response.name_id - try: - assert nid.sp_provided_id == nip.new_id - except AssertionError: - self._message = "SP provided id not properly set" - self._status = WARNING - - return {} - - -class VerifyFunctionality(Check): - """ - Verifies that the IdP supports the needed functionality - """ - - def _nameid_format_support(self, conv, nameid_format): - md = conv.client.metadata - entity = md[conv.entity_id] - for idp in entity["idpsso_descriptor"]: - for nformat in idp["name_id_format"]: - if nameid_format == nformat["text"]: - return {} - - self._message = "No support for NameIDFormat '%s'" % nameid_format - self._status = CRITICAL - - return {} - - def _srv_support(self, conv, service): - md = conv.client.metadata - entity = md[conv.entity_id] - for desc in ["idpsso_descriptor", "attribute_authority_descriptor", - "auth_authority_descriptor"]: - try: - srvgrps = entity[desc] - except KeyError: - pass - else: - for srvgrp in srvgrps: - if service in srvgrp: - return {} - - self._message = "No support for '%s'" % service - self._status = CRITICAL - return {} - - def _binding_support(self, conv, request, binding, typ): - service = REQ2SRV[request] - md = conv.client.metadata - entity_id = conv.entity_id - func = getattr(md, service, None) - try: - func(entity_id, binding, typ) - except UnknownPrincipal: - self._message = "Unknown principal: %s" % entity_id - self._status = CRITICAL - except UnsupportedBinding: - self._message = "Unsupported binding at the IdP: %s" % binding - self._status = CRITICAL - - return {} - - def _func(self, conv): - oper = conv.oper - args = conv.oper.args - res = self._srv_support(conv, REQ2SRV[oper.request]) - if self._status != OK: - return res - - res = self._binding_support(conv, oper.request, args["request_binding"], - "idpsso") - if self._status != OK: - return res - - if "nameid_format" in args and args["nameid_format"]: - if args["nameid_format"] == NAMEID_FORMAT_UNSPECIFIED: - pass - else: - res = self._nameid_format_support(conv, args["nameid_format"]) - - if "name_id_policy" in args and args["name_id_policy"]: - if args["name_id_policy"].format == NAMEID_FORMAT_UNSPECIFIED: - pass - else: - res = self._nameid_format_support(conv, - args["name_id_policy"].format) - - return res - - -class VerifyAttributeNameFormat(Check): - """ - Verify that the correct attribute name format is used. - """ - cid = "verify-attribute-name-format" - - def _func(self, conv): - if "name_format" not in conv.msg_constraints: - return {} - - # Should be a AuthnResponse or Response instance - response = conv.saml_response[-1] - assert isinstance(response.response, Response) - - assertion = response.assertion - - if assertion: - if assertion.attribute_statement: - atrstat = assertion.attribute_statement[0] - for attr in atrstat.attribute: - try: - assert attr.name_format == conv.msg_constraints[ - "name_format"] - logger.debug("Attribute name format valid: %s", - attr.name_format) - except AssertionError: - if NAME_FORMAT_UNSPECIFIED != conv.msg_constraints[ - "name_format"]: - self._message = \ - "Wrong name format: '%s', should be %s" % \ - (attr.name_format, \ - conv.msg_constraints["name_format"]) - self._status = CRITICAL - break - else: - logger.debug("Accepting any attribute name format") - - return {} - - -class VerifyDigestAlgorithm(Check): - """ - verify that the used digest algorithm was one from the approved set. - """ - - def _digest_algo(self, signature, allowed): - try: - assert signature.signed_info.reference[0].digest_method.algorithm in allowed - except AssertionError: - self._message = "signature digest algorithm not allowed: '%s'" % \ - signature.signed_info.reference[0].digest_method.algorithm - self._status = CRITICAL - return False - return True - - def _func(self, conv): - if "digest_algorithm" not in conv.msg_constraints: - logger.info("Not verifying digest_algorithm (not configured)") - return {} - else: - try: - assert len(conv.msg_constraints["digest_algorithm"]) > 0 - except AssertionError: - self._message = "List of allowed digest algorithm must not be empty" - self._status = CRITICAL - return {} - _algs = conv.msg_constraints["digest_algorithm"] - - response = conv.saml_response[-1].response - - if response.signature: - if not self._digest_algo(response.signature, _algs): - return {} - - for assertion in response.assertion: - if not self._digest_algo(assertion.signature, _algs): - return {} - - return {} - - -class VerifySignatureAlgorithm(Check): - """ - verify that the used signature algorithm was one from an approved set. - """ - - def _sig_algo(self, signature, allowed): - try: - assert signature.signed_info.signature_method.algorithm in allowed - except AssertionError: - self._message = "Wrong algorithm used for signing: '%s'" % \ - signature.signed_info.signature_method.algorithm - self._status = CRITICAL - return False - - return True - - def _func(self, conv): - if "signature_algorithm" not in conv.msg_constraints: - logger.info("Not verifying signature_algorithm (not configured)") - return {} - else: - try: - assert len(conv.msg_constraints["signature_algorithm"]) > 0 - except AssertionError: - self._message = "List of allowed signature algorithm must not be empty" - self._status = CRITICAL - return {} - _algs = conv.msg_constraints["signature_algorithm"] - - response = conv.saml_response[-1].response - - if response.signature: - if not self._sig_algo(response.signature, _algs): - return {} - - for assertion in response.assertion: - if not self._sig_algo(assertion.signature, _algs): - return {} - - return {} - - -class VerifySignedPart(Check): - """ - verify that the correct part was signed. - """ - - def _func(self, conv): - - if "signed_part" not in conv.msg_constraints: - return {} - - response = conv.saml_response[-1].response - if "response" in conv.msg_constraints["signed_part"]: - if response.signature: - pass - else: - self._message = "Response not signed" - self._status = CRITICAL - - if self._status == OK: - if "assertion" in conv.msg_constraints["signed_part"]: - for assertion in response.assertion: - if assertion.signature: - pass - else: - self._message = "Assertion not signed" - self._status = CRITICAL - break - - return {} - - -class VerifyEndpoint(Check): - def _func(self, conv): - _ = conv.last_response - return {} - - -# ============================================================================= - - -CLASS_CACHE = {} - - -def factory(cid, classes=CLASS_CACHE): - if len(classes) == 0: - check.factory(cid, classes) - for name, obj in inspect.getmembers(sys.modules[__name__]): - if inspect.isclass(obj): - try: - classes[obj.cid] = obj - except AttributeError: - pass - - if cid in classes: - return classes[cid] - else: - return None diff --git a/src/idp_test/interaction.py b/src/idp_test/interaction.py deleted file mode 100644 index 2eddc70c3..000000000 --- a/src/idp_test/interaction.py +++ /dev/null @@ -1,388 +0,0 @@ -__author__ = 'rohe0002' - -import json -import logging -import six - -from urlparse import urlparse -from bs4 import BeautifulSoup - -from mechanize import ParseResponseEx -from mechanize._form import ControlNotFoundError, AmbiguityError -from mechanize._form import ListControl - -logger = logging.getLogger(__name__) - -def pick_interaction(interactions, _base="", content="", req=None): - unic = content - if content: - _bs = BeautifulSoup(content) - else: - _bs = None - - for interaction in interactions: - _match = 0 - for attr, val in interaction["matches"].items(): - if attr == "url": - if val == _base: - _match += 1 - elif attr == "title": - if _bs is None: - break - if _bs.title is None: - break - if val in _bs.title.contents: - _match += 1 - else: - _c = _bs.title.contents - if isinstance(_c, list) and not isinstance( - _c, six.string_types): - for _line in _c: - if val in _line: - _match += 1 - continue - elif attr == "content": - if unic and val in unic: - _match += 1 - elif attr == "class": - if req and val == req: - _match += 1 - - if _match == len(interaction["matches"]): - return interaction - - raise KeyError("No interaction matched") - - -class FlowException(Exception): - def __init__(self, function="", content="", url=""): - Exception.__init__(self) - self.function = function - self.content = content - self.url = url - - def __str__(self): - return json.dumps(self.__dict__) - - -class RResponse(): - """ - A Response class that behaves in the way that mechanize expects it. - Links to a requests.Response - """ - def __init__(self, resp): - self._resp = resp - self.index = 0 - self.text = resp.text - if isinstance(self.text, unicode): - if resp.encoding == "UTF-8": - self.text = self.text.encode("utf-8") - else: - self.text = self.text.encode("latin-1") - self._len = len(self.text) - self.url = str(resp.url) - self.statuscode = resp.status_code - - def geturl(self): - return self._resp.url - - def __getitem__(self, item): - try: - return getattr(self._resp, item) - except AttributeError: - return getattr(self._resp.headers, item) - - def __getattribute__(self, item): - try: - return getattr(self._resp, item) - except AttributeError: - return getattr(self._resp.headers, item) - - def read(self, size=0): - """ - Read from the content of the response. The class remembers what has - been read so it's possible to read small consecutive parts of the - content. - - :param size: The number of bytes to read - :return: Somewhere between zero and 'size' number of bytes depending - on how much it left in the content buffer to read. - """ - if size: - if self._len < size: - return self.text - else: - if self._len == self.index: - part = None - elif self._len - self.index < size: - part = self.text[self.index:] - self.index = self._len - else: - part = self.text[self.index: self.index + size] - self.index += size - return part - else: - return self.text - - -def pick_form(response, url=None, **kwargs): - """ - Picks which form in a web-page that should be used - - :param response: A HTTP request response. A DResponse instance - :param content: The HTTP response content - :param url: The url the request was sent to - :return: The picked form or None of no form matched the criteria. - """ - #_txt = response.text - - forms = ParseResponseEx(response) - if not forms: - raise FlowException(content=response.text, url=url) - - #if len(forms) == 1: - # return forms[0] - #else: - - _form = None - # ignore the first form, because I use ParseResponseEx which adds - # one form at the top of the list - forms = forms[1:] - if len(forms) == 1: - _form = forms[0] - else: - if "pick" in kwargs: - _dict = kwargs["pick"] - for form in forms: - if _form: - break - for key, _ava in _dict.items(): - if key == "form": - _keys = form.attrs.keys() - for attr, val in _ava.items(): - if attr in _keys and val == form.attrs[attr]: - _form = form - elif key == "control": - prop = _ava["id"] - _default = _ava["value"] - try: - orig_val = form[prop] - if isinstance(orig_val, six.string_types): - if orig_val == _default: - _form = form - elif _default in orig_val: - _form = form - except KeyError: - pass - except ControlNotFoundError: - pass - elif key == "method": - if form.method == _ava: - _form = form - else: - _form = None - - if not _form: - break - elif "index" in kwargs: - _form = forms[int(kwargs["index"])] - - return _form - - -def do_click(httpc, form, **kwargs): - """ - Emulates the user clicking submit on a form. - - :param httpc: The Client instance - :param form: The form that should be submitted - :return: What do_request() returns - """ - - if "click" in kwargs: - request = None - _name = kwargs["click"] - try: - _ = form.find_control(name=_name) - request = form.click(name=_name) - except AmbiguityError: - # more than one control with that name - _val = kwargs["set"][_name] - _nr = 0 - while True: - try: - cntrl = form.find_control(name=_name, nr=_nr) - if cntrl.value == _val: - request = form.click(name=_name, nr=_nr) - break - else: - _nr += 1 - except ControlNotFoundError: - raise Exception("No submit control with the name='%s' and " - "value='%s' could be found" % (_name, - _val)) - else: - request = form.click() - - headers = {} - for key, val in request.unredirected_hdrs.items(): - headers[key] = val - - url = request._Request__original - - if form.method == "POST": - return httpc.send(url, "POST", data=request.data, headers=headers) - else: - return httpc.send(url, "GET", headers=headers) - - -def select_form(httpc, orig_response, **kwargs): - """ - Pick a form on a web page, possibly enter some information and submit - the form. - - :param httpc: A HTTP client instance - :param orig_response: The original response (as returned by requests) - :return: The response do_click() returns - """ - response = RResponse(orig_response) - try: - _url = response.url - except KeyError: - _url = kwargs["location"] - - form = pick_form(response, _url, **kwargs) - #form.backwards_compatible = False - if not form: - raise Exception("Can't pick a form !!") - - if "set" in kwargs: - for key, val in kwargs["set"].items(): - if key.startswith("_"): - continue - if "click" in kwargs and kwargs["click"] == key: - continue - - try: - form[key] = val - except ControlNotFoundError: - pass - except TypeError: - cntrl = form.find_control(key) - if isinstance(cntrl, ListControl): - form[key] = [val] - else: - raise - - return do_click(httpc, form, **kwargs) - - -#noinspection PyUnusedLocal -def chose(httpc, orig_response, path, **kwargs): - """ - Sends a HTTP GET to a url given by the present url and the given - relative path. - - :param orig_response: The original response - :param content: The content of the response - :param path: The relative path to add to the base URL - :return: The response do_click() returns - """ - - - if not path.startswith("http"): - try: - _url = orig_response.url - except KeyError: - _url = kwargs["location"] - - part = urlparse(_url) - url = "%s://%s%s" % (part[0], part[1], path) - else: - url = path - - return httpc.send(url, "GET") - #return resp, "" - - -def post_form(httpc, orig_response, **kwargs): - """ - The same as select_form but with no possibility of change the content - of the form. - - :param httpc: A HTTP Client instance - :param orig_response: The original response (as returned by requests) - :param content: The content of the response - :return: The response do_click() returns - """ - response = RResponse(orig_response) - - form = pick_form(response, **kwargs) - - return do_click(httpc, form, **kwargs) - - -def NoneFunc(): - return None - - -#noinspection PyUnusedLocal -def parse(httpc, orig_response, **kwargs): - # content is a form from which I get the SAMLResponse - response = RResponse(orig_response) - - form = pick_form(response, **kwargs) - #form.backwards_compatible = False - if not form: - raise Exception("Can't pick a form !!") - - return {"SAMLResponse": form["SAMLResponse"], - "RelayState": form["RelayState"]} - - -#noinspection PyUnusedLocal -def interaction(args): - _type = args["type"] - if _type == "form": - return select_form - elif _type == "link": - return chose - elif _type == "response": - return parse - else: - return NoneFunc - -# ======================================================================== - - -class Operation(object): - def __init__(self, args=None): - if args: - self.function = interaction(args) - - self.args = args or {} - self.request = None - - def update(self, dic): - self.args.update(dic) - - #noinspection PyUnusedLocal - def post_op(self, result, conv, args): - pass - - def __call__(self, httpc, conv, location, response, content, - features): - try: - _args = self.args.copy() - except (KeyError, AttributeError): - _args = {} - - _args["location"] = location - _args["features"] = features - - logger.info("--> FUNCTION: %s", self.function.__name__) - logger.info("--> ARGS: %s", _args) - - result = self.function(httpc, response, **_args) - self.post_op(result, conv, _args) - return result diff --git a/src/idp_test/package/__init__.py b/src/idp_test/package/__init__.py deleted file mode 100644 index 3b031d2bf..000000000 --- a/src/idp_test/package/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'rolandh' diff --git a/src/idp_test/package/authn_request.py b/src/idp_test/package/authn_request.py deleted file mode 100644 index ccebf59e9..000000000 --- a/src/idp_test/package/authn_request.py +++ /dev/null @@ -1,49 +0,0 @@ -from idp_test import CheckSaml2IntMetaData -from idp_test.check import CheckSaml2IntAttributes -from saml2 import SamlBase, ExtensionContainer - -__author__ = 'rolandh' - -from idp_test.saml2base import AuthnRequest - -class DummyExtension(SamlBase): - """The urn:mace:umu.se:SAML:2.0:extension:foo element """ - - c_tag = 'DummyExtension' - c_namespace = "urn:mace:umu.se:SAML:2.0:extension:foo" - c_value_type = {'base': 'NCName'} - c_children = SamlBase.c_children.copy() - c_attributes = SamlBase.c_attributes.copy() - c_child_order = SamlBase.c_child_order[:] - c_cardinality = SamlBase.c_cardinality.copy() - -class AuthnRequest_UnknownIssuer(AuthnRequest): - def pre_processing(self, message, args): - _issuer = message.issuer - _issuer.text = "https://www.example.com/foobar.xml" - return message - -class AuthnRequest_UnknownExtension(AuthnRequest): - def pre_processing(self, message, args): - message.extension = ExtensionContainer() - message.extension.add_extension_element(DummyExtension(text="foo")) - return message - -OPERATIONS = { - 'authn_unknown-issuer': { - "name": 'AuthnRequest with unknown issuer', - "descr": 'AuthnRequest with unknown issuer', - "sequence": [AuthnRequest_UnknownIssuer], - "depends": ['authn'], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": [CheckSaml2IntAttributes]} - }, - 'authn_unknown-extension': { - "name": 'AuthnRequest with unknown extension', - "descr": 'AuthnRequest with unknown extension', - "sequence": [AuthnRequest_UnknownExtension], - "depends": ['authn'], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": [CheckSaml2IntAttributes]} - }, -} diff --git a/src/idp_test/saml2base.py b/src/idp_test/saml2base.py deleted file mode 100644 index 9f6112fef..000000000 --- a/src/idp_test/saml2base.py +++ /dev/null @@ -1,554 +0,0 @@ -from saml2 import samlp -from saml2 import BINDING_HTTP_ARTIFACT -from saml2 import BINDING_HTTP_POST -from saml2 import BINDING_HTTP_REDIRECT -from saml2 import BINDING_PAOS -from saml2 import BINDING_SOAP -from saml2 import BINDING_URI -from saml2.saml import NAMEID_FORMAT_PERSISTENT -from saml2.saml import NAMEID_FORMAT_UNSPECIFIED -from saml2.saml import NAMEID_FORMAT_TRANSIENT -from saml2.saml import NAMEID_FORMAT_EMAILADDRESS - -from idp_test.check import CheckLogoutSupport -from idp_test.check import CheckSaml2IntAttributes -from idp_test.check import CheckSaml2IntMetaData -from idp_test.check import VerifyAttributeNameFormat -from idp_test.check import VerifyFunctionality -from idp_test.check import VerifyContent -from idp_test.check import VerifyNameIDMapping -from idp_test.check import VerifyNameIDPolicyUsage -from idp_test.check import VerifySuccessStatus -from idp_test.check import VerifyDigestAlgorithm -from idp_test.check import VerifySignatureAlgorithm -from idp_test.check import VerifySignedPart -from idp_test.check import VerifyEndpoint - -from saml2.samlp import NameIDPolicy - -__author__ = 'rolandh' - - -class Request(object): - _args = {} - _class = None - tests = {"post": [VerifyContent], "pre": []} - - def __init__(self, conv): - self.args = self._args.copy() - self.conv = conv - - def setup(self): - pass - - def pre_processing(self, message, args): - return message - - def post_processing(self, message): - return message - -#class Saml2IntRequest(Request): -# tests = {"pre": [], -# "post": [CheckSaml2IntAttributes, VerifyContent -# # CheckSubjectNameIDFormat, -# ]} - - -class AuthnRequest(Request): - _class = samlp.AuthnRequest - request = "authn_request" - _args = {"response_binding": BINDING_HTTP_POST, - "request_binding": BINDING_HTTP_REDIRECT, - "nameid_format": NAMEID_FORMAT_PERSISTENT, - "allow_create": True} - tests = {"pre": [VerifyFunctionality], - "post": [CheckSaml2IntAttributes, - VerifyAttributeNameFormat, - VerifySignedPart, - VerifyDigestAlgorithm, - VerifySignatureAlgorithm]} - - -class AuthnRequestNID_Transient(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT - - def setup(self): - cnf = self.conv.client.config - endps = cnf.getattr("endpoints", "sp") - url = "" - for url, binding in endps["assertion_consumer_service"]: - if binding == BINDING_HTTP_POST: - self.args["assertion_consumer_service_url"] = url - break - - self.tests["post"].append((VerifyEndpoint, {"endpoint": url})) - - -class AuthnRequestNID_Email(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["nameid_format"] = NAMEID_FORMAT_EMAILADDRESS - - def setup(self): - cnf = self.conv.client.config - endps = cnf.getattr("endpoints", "sp") - url = "" - for url, binding in endps["assertion_consumer_service"]: - if binding == BINDING_HTTP_POST: - self.args["assertion_consumer_service_url"] = url - break - - self.tests["post"].append((VerifyEndpoint, {"endpoint": url})) - - -class AuthnRequestNID_Unspecified(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["nameid_format"] = NAMEID_FORMAT_UNSPECIFIED - - def setup(self): - cnf = self.conv.client.config - endps = cnf.getattr("endpoints", "sp") - url = "" - for url, binding in endps["assertion_consumer_service"]: - if binding == BINDING_HTTP_POST: - self.args["assertion_consumer_service_url"] = url - break - - self.tests["post"].append((VerifyEndpoint, {"endpoint": url})) - - -class AuthnRequestNID_no(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["nameid_format"] = "" - - def setup(self): - cnf = self.conv.client.config - endps = cnf.getattr("endpoints", "sp") - url = "" - for url, binding in endps["assertion_consumer_service"]: - if binding == BINDING_HTTP_POST: - self.args["assertion_consumer_service_url"] = url - break - - self.tests["post"].append((VerifyEndpoint, {"endpoint": url})) - - -class AuthnRequestEndpointIndex(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["attribute_consuming_service_index"] = 3 - - def setup(self): - cnf = self.conv.client.config - endps = cnf.getattr("endpoints", "sp") - acs3 = endps["assertion_consumer_service"][3] - self.tests["post"].append((VerifyEndpoint, {"endpoint": acs3[0]})) - - -class AuthnRequestEndpointIndexNIDTransient(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["attribute_consuming_service_index"] = 3 - self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT - - def setup(self): - cnf = self.conv.client.config - endps = cnf.getattr("endpoints", "sp") - acs3 = endps["assertion_consumer_service"][3] - self.tests["post"].append((VerifyEndpoint, {"endpoint": acs3[0]})) - - -class AuthnRequestSpecEndpoint(AuthnRequest): - def setup(self): - cnf = self.conv.client.config - endps = cnf.getattr("endpoints", "sp") - acs3 = endps["assertion_consumer_service"][3] - self.args["assertion_consumer_service_url"] = acs3[0] - self.tests["post"].append((VerifyEndpoint, {"endpoint": acs3[0]})) - - -class DynAuthnRequest(Request): - _class = samlp.AuthnRequest - request = "authn_request" - _args = {"response_binding": BINDING_HTTP_POST} - tests = {} - name_id_formats = [NAMEID_FORMAT_TRANSIENT, NAMEID_FORMAT_PERSISTENT] - bindings = [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST] - - def setup(self): - metadata = self.conv.client.metadata - entity = metadata[self.conv.entity_id] - self.args.update({"nameid_format": "", "request_binding": ""}) - for idp in entity["idpsso_descriptor"]: - for nformat in self.name_id_formats: - if self.args["nameid_format"]: - break - for nif in idp["name_id_format"]: - if nif["text"] == nformat: - self.args["nameid_format"] = nformat - break - for bind in self.bindings: - if self.args["request_binding"]: - break - for sso in idp["single_sign_on_service"]: - if sso["binding"] == bind: - self.args["request_binding"] = bind - break - - -class AuthnRequestPost(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["request_binding"] = BINDING_HTTP_POST - - -class AuthnRequest_using_Artifact(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["response_binding"] = BINDING_HTTP_ARTIFACT - self.args["binding"] = BINDING_HTTP_ARTIFACT - - -class AuthnRequest_using_ArtifactNID_Transient(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT - self.args["response_binding"] = BINDING_HTTP_ARTIFACT - self.args["binding"] = BINDING_HTTP_ARTIFACT - - -class AuthnRequestPostNID_Transient(AuthnRequestPost): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT - - -class LogOutRequest(Request): - request = "logout_request" - _args = {"request_binding": BINDING_SOAP} - tests = {"pre": [VerifyFunctionality], "post": []} - - def __init__(self, conv): - Request.__init__(self, conv) - self.tests["pre"].append(CheckLogoutSupport) - #self.tests["post"].append(VerifyLogout) - - def setup(self): - resp = self.conv.saml_response[-1].response - assertion = resp.assertion[0] - subj = assertion.subject - self.args["name_id"] = subj.name_id - self.args["issuer_entity_id"] = assertion.issuer.text - - -class AssertionIDRequest(Request): - request = "assertion_id_request" - _args = {"request_binding": BINDING_URI, - "response_binding": None} - tests = {"pre": [VerifyFunctionality]} - - def setup(self): - assertion = self.conv.saml_response[-1].assertion - self.args["assertion_id_refs"] = [assertion.id] - - -class AuthnQuery(Request): - request = "authn_query" - _args = {"request_binding": BINDING_SOAP} - tests = {"pre": [VerifyFunctionality], "post": []} - - def __init__(self, conv): - Request.__init__(self, conv) - self.tests["post"].append(VerifySuccessStatus) - - def setup(self): - assertion = self.conv.saml_response[-1].assertion - self.args["subject"] = assertion.subject - - -class NameIDMappingRequest(Request): - request = "name_id_mapping_request" - _args = {"request_binding": BINDING_SOAP, - "name_id_policy": NameIDPolicy(format=NAMEID_FORMAT_PERSISTENT, - sp_name_qualifier="GroupOn", - allow_create="true")} - - def __init__(self, conv): - Request.__init__(self, conv) - self.tests["post"].append(VerifyNameIDMapping) - - def setup(self): - assertion = self.conv.saml_response[-1].assertion - self.args["name_id"] = assertion.subject.name_id - - -class AuthnRequest_NameIDPolicy1(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["name_id_policy"] = NameIDPolicy( - format=NAMEID_FORMAT_PERSISTENT, sp_name_qualifier="Group1", - allow_create="true") - self.tests["post"].append(VerifyNameIDPolicyUsage) - - -class AuthnRequest_NameIDPolicy1Transient(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["name_id_policy"] = NameIDPolicy( - format=NAMEID_FORMAT_TRANSIENT, sp_name_qualifier="Group1", - allow_create="true") - self.args["nameid_format"] = NAMEID_FORMAT_TRANSIENT - self.tests["post"].append(VerifyNameIDPolicyUsage) - - -class AuthnRequest_TransientNameID(AuthnRequest): - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["name_id_policy"] = NameIDPolicy( - format=NAMEID_FORMAT_TRANSIENT, sp_name_qualifier="Group", - allow_create="true") - self.tests["post"].append(VerifyNameIDPolicyUsage) - - -class ECP_AuthnRequest(AuthnRequest): - - def __init__(self, conv): - AuthnRequest.__init__(self, conv) - self.args["request_binding"] = BINDING_SOAP - self.args["service_url_binding"] = BINDING_PAOS - - def setup(self): - _client = self.conv.client - _client.user = "babs" - _client.passwd = "howes" - - # def post_processing(self, message): - # # Unpacking SOAP message - # return parse_soap_enveloped_saml_response(message) - - -class ManageNameIDRequest(Request): - request = "manage_name_id_request" - _args = {"request_binding": BINDING_SOAP, - "new_id": samlp.NewID("New identifier")} - - def __init__(self, conv): - Request.__init__(self, conv) - self.tests["post"].append(VerifySuccessStatus) - - def setup(self): - assertion = self.conv.saml_response[-1].assertion - self.args["name_id"] = assertion.subject.name_id - - -class AttributeQuery(Request): - request = "attribute_query" - _args = {"request_binding": BINDING_SOAP} - tests = {"pre": [VerifyFunctionality], - "post": [CheckSaml2IntAttributes, VerifyAttributeNameFormat]} - - def setup(self): - assertion = self.conv.saml_response[-1].assertion - self.args["name_id"] = assertion.subject.name_id - -# ----------------------------------------------------------------------------- - -OPERATIONS = { - 'verify': { - 'tc_id': "S2c-16", - "name": 'Verify SAML connectivity', - "descr": 'Uses AuthnRequest to check connectivity', - "sequence": [DynAuthnRequest], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []} - }, - 'authn': { - "tc_id": "S2c-02", - "name": 'Absolute basic AuthnRequest', - "descr": 'AuthnRequest using HTTP-Redirect', - "sequence": [AuthnRequest], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []}, - "depend":["verify"] - }, - 'authn-nid_transient': { - "tc_id": "S2c-10", - "name": 'AuthnRequest, NameID-trans', - "descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, ' - 'transient name ID', - "sequence": [AuthnRequestNID_Transient], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []}, - "depend":["authn"] - }, - 'authn-nid_email': { - "tc_id": "S2c-20", - "name": 'AuthnRequest email nameID', - "descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, NameID-email' - 'specified', - "sequence": [AuthnRequestNID_Email], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []}, - "depend":["authn"] - }, - 'authn-nid_no': { - "tc_id": "S2c-21", - "name": 'AuthnRequest no NameID format', - "descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, no NameID format ' - 'specified', - "sequence": [AuthnRequestNID_no], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []}, - "depend":["authn"] - }, - 'authn-nid_unspecified': { - "tc_id": "S2c-21", - "name": 'AuthnRequest using unspecified NameID format', - "descr": 'Basic SAML2 AuthnRequest, HTTP-Redirect, NameID-unspec', - "sequence": [AuthnRequestNID_Unspecified], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []}, - "depend":["authn"] - }, - 'authn-post': { - "tc_id": "S2c-08", - "name": 'Basic SAML2 AuthnRequest using HTTP POST', - "descr": 'AuthnRequest using HTTP-POST', - "sequence": [AuthnRequestPost], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []}, - "depend":["authn"] - }, - 'authn-post-transient': { - "tc_id": "S2c-09", - "name": 'AuthnRequest HTTP-POST, transient NameID fmt', - "descr": 'AuthnRequest using HTTP-POST expecting transient NameID', - "sequence": [AuthnRequestPostNID_Transient], - "tests": {"pre": [CheckSaml2IntMetaData], - "post": []}, - "depend":["authn-post"] - }, - 'attribute-query':{ - "tc_id": "S2c-01", - "name": "Attribute query", - "sequence":[AuthnRequest, AttributeQuery], - "depend":["authn"] - }, - 'attribute-query-transient':{ - "tc_id": "S2c-20", - "name": "Attribute query, NameID transient", - "sequence":[AuthnRequestNID_Transient, AttributeQuery], - "depend":["authn"] - }, - 'authn_endpoint_index': { - "tc_id": "S2c-03", - "name": 'AuthnRequest, endpoint index', - "descr": '', - "sequence": [AuthnRequestEndpointIndex], - "depend":["authn"] - }, - 'authn_endpoint_index-transient': { - "tc_id": "S2c-03", - "name": 'AuthnRequest, endpoint index, NameID-trans', - "descr": '', - "sequence": [AuthnRequestEndpointIndexNIDTransient], - "depend":["authn"] - }, - 'authn_specified_endpoint': { - "tc_id": "S2c-04", - "name": 'AuthnRequest, specified endpoint', - "descr": '', - "sequence": [AuthnRequestSpecEndpoint], - "depend":["authn"] - }, - 'authn-artifact':{ - 'tc_id': "S2c-05", - "name": "SAML2 AuthnRequest using an artifact", - "descr": ('AuthnRequest using HTTP-Redirect and artifact'), - "sequence": [AuthnRequest_using_Artifact] - }, - 'authn-artifact_nid-transient':{ - 'tc_id': "S2c-05", - "name": "SAML2 AuthnRequest expecting artifact response", - "descr": ('AuthnRequest using HTTP-Redirect and artifact'), - "sequence": [AuthnRequest_using_ArtifactNID_Transient] - }, - 'authn-assertion_id_request': { - "tc_id": "S2c-06", - "name": 'AuthnRequest then AssertionIDRequest', - "descr": 'AuthnRequest followed by an AssertionIDRequest', - "sequence": [AuthnRequest, AssertionIDRequest], - "tests": {"pre": [CheckSaml2IntMetaData], "post": []}, - "depend":["authn"] - }, - 'authn-nid_transient-assertion_id_request': { - "tc_id": "S2c-26", - "name": 'AuthnRequest then AssertionIDRequest, NameID-trans', - "descr": 'AuthnRequest followed by an AssertionIDRequest', - "sequence": [AuthnRequestNID_Transient, AssertionIDRequest], - "tests": {"pre": [CheckSaml2IntMetaData], "post": []}, - "depend":["authn"] - }, - 'authn-with-name_id_policy': { - "tc_id": "S2c-11", - "name": 'SAML2 AuthnRequest with specific NameIDPolicy', - "descr": 'AuthnRequest with specific NameIDPolicy', - "sequence": [AuthnRequest_NameIDPolicy1], - "tests": {"pre": [CheckSaml2IntMetaData], "post": []}, - "depend":["authn"] - }, - 'authn-with-name_id_policy_nid-transient': { - "tc_id": "S2c-31", - "name": 'AuthnRequest NameIDPolicy transient', - "descr": 'AuthnRequest with specific NameIDPolicy', - "sequence": [AuthnRequest_NameIDPolicy1Transient], - "tests": {"pre": [CheckSaml2IntMetaData], "post": []}, - "depend":["authn"] - }, - 'ecp_authn': { - 'tc_id': "S2c-12", - "name": "AuthnRequest using ECP and PAOS", - "descr": "SAML2 AuthnRequest using ECP and PAOS", - "sequence":[ECP_AuthnRequest] - }, - 'log-in-out': { - "tc_id": "S2c-13", - "name": 'Basic SAML2 log in and out', - "descr": 'AuthnRequest using HTTP-Redirect followed by a logout', - "sequence": [AuthnRequest, LogOutRequest], - "tests": {"pre": [CheckSaml2IntMetaData], "post": []}, - "depend":["authn"] - }, - 'manage_nameid':{ - "tc_id": "S2c-14", - "name": "ManageNameID; set NameID", - "descr": "Setting the SP provided ID by using ManageNameID", - "sequence":[AuthnRequest, ManageNameIDRequest], - "depend":["authn"] - }, - 'nameid-mapping':{ - "tc_id": "S2c-15", - "name": "Simple NameIDMapping request", - "sequence":[AuthnRequest, NameIDMappingRequest], - "depend":["authn"] - }, - 'manage_nameid_nid-transient':{ - "tc_id": "S2c-16", - "name": "ManageNameID; set NameID; AuthRequ/NameID-trans", - "descr": "Setting the SP provided ID by using ManageNameID", - "sequence":[AuthnRequestNID_Transient, ManageNameIDRequest], - "depend":["authn"] - }, - 'authn-authn_query': { - "tc_id": "S2c-17", - "name": 'AuthnRequest then AuthnQuery', - "descr": 'AuthnRequest followed by an AuthnQuery', - "sequence": [AuthnRequest, AuthnQuery], - "tests": {"pre": [CheckSaml2IntMetaData], "post": []}, - "depend":["authn"] - }, -} \ No newline at end of file diff --git a/src/idp_test/tmp.py b/src/idp_test/tmp.py deleted file mode 100644 index 60bd9ac44..000000000 --- a/src/idp_test/tmp.py +++ /dev/null @@ -1,226 +0,0 @@ -from saml2 import config, NAMEID_FORMAT_EMAILADDRESS -from saml2 import samlp -from saml2 import BINDING_HTTP_POST -from saml2 import VERSION - -from saml2.client import Saml2Client -from saml2.s_utils import rndstr -from saml2.time_util import instant - -__author__ = 'rolandh' - -try: - from xmlsec_location import xmlsec_path -except ImportError: - xmlsec_path = '/opt/local/bin/xmlsec1' - -cnf_dict = { - "entityid" : "urn:mace:example.com:saml:roland:sp", - "name" : "urn:mace:example.com:saml:roland:sp", - "description": "Test SP", - "service": { - "sp": { - "endpoints":{ - "assertion_consumer_service": ["http://lingon.catalogix.se:8087/"], - }, - "required_attributes": ["surName", "givenName", "mail"], - "optional_attributes": ["title"], - "idp": ["urn:mace:example.com:saml:roland:idp"], - } - }, - "key_file" : "test.key", - "cert_file" : "test.pem", - "ca_certs": "cacerts.txt", - "xmlsec_binary" : xmlsec_path, - "metadata": { - "local": ["idp.xml"], - }, - "subject_data": "subject_data.db", - "accepted_time_diff": 60, - "attribute_map_dir" : "attributemaps", -} - - -conf = config.SPConfig() -conf.load(cnf_dict) -client = Saml2Client(conf) - -binding= BINDING_HTTP_POST -query_id = rndstr() -service_url = "https://example.com" - -authn_request = { - #===== AuthRequest ===== - "subject":{ - "base_id":{ - "name_qualifier":None, - "sp_name_qualifier":None, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - "name_id":{ - "name_qualifier":None, - "sp_name_qualifier":None, - "format":None, - "sp_provided_id": None, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - "encrypted_id":{ - "encrypted_data":None, - "encrypted_key":None, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - "subject_confirmation":[{ - "base_id":{ - "name_qualifier":None, - "sp_name_qualifier":None, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - "name_id":{ - "name_qualifier":None, - "sp_name_qualifier":None, - "format":None, - "sp_provided_id": None, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - "encrypted_id":{ - "encrypted_data":None, - "encrypted_key":None, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - "subject_confirmation_data":{ - "not_before":None, - "not_on_or_after":None, - "recipient":None, - "in_response_to":None, - "address":None, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - "text":None, - "extension_elements":None, - "extension_attributes":None, - }], - "text":None, - "extension_elements":None, - "extension_attributes":None, - }, - #NameIDPolicy - "name_id_policy":{ - "format":NAMEID_FORMAT_EMAILADDRESS, - # NAMEID_FORMAT_EMAILADDRESS = ( - # "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress") - # NAMEID_FORMAT_UNSPECIFIED = ( - # "urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified") - # NAMEID_FORMAT_ENCRYPTED = ( - # "urn:oasis:names:tc:SAML:2.0:nameid-format:encrypted") - # NAMEID_FORMAT_PERSISTENT = ( - # "urn:oasis:names:tc:SAML:2.0:nameid-format:persistent") - # NAMEID_FORMAT_TRANSIENT = ( - # "urn:oasis:names:tc:SAML:2.0:nameid-format:transient") - # NAMEID_FORMAT_ENTITY = ( - # "urn:oasis:names:tc:SAML:2.0:nameid-format:entity") - - "sp_name_qualifier":None, - "allow_create":None, - #text=None, - #extension_elements=None, - #extension_attributes=None, - }, - #saml.Conditions - "conditions":{ - #Condition - "condition":[{}], - #AudienceRestriction - "audience_restriction":[{}], - #OneTimeUse - "one_time_use":[{}], - #ProxyRestriction - "proxy_restriction":[{}], - #not_before=None, - #not_on_or_after=None, - #text=None, - #extension_elements=None, - #extension_attributes=None, - }, - #RequestedAuthnContext - "requested_authn_context":{ - #saml.AuthnContextClassRef - "authn_context_class_ref":None, - #saml.AuthnContextDeclRef - "authn_context_decl_ref":None, - #AuthnContextComparisonType_ - "comparison":None, - #text=None, - #extension_elements=None, - #extension_attributes=None, - }, - #Scoping - "scoping":{ - #IDPList - "idp_list":{ - #IDPEntry - "idp_entry":{ - "provider_id":None, - "name":None, - "loc":None, - #text=None, - #extension_elements=None, - #extension_attributes=None, - }, - #GetComplete - "get_complete":{}, - #text=None, - #extension_elements=None, - #extension_attributes=None, - }, - #RequesterID - "requester_id":{}, - #proxy_count=None, - #text=None, - #extension_elements=None, - #extension_attributes=None, - }, - "force_authn":None, - "is_passive":None, - "protocol_binding":None, - "assertion_consumer_service_index":None, - "assertion_consumer_service_url":None, - "attribute_consuming_service_index":None, - "provider_name":None, - #saml.Issuer - "issuer":{}, - #ds.Signature - "signature":{}, - #Extensions - "extensions":{}, - "id":None, - "version":None, - "issue_instant":None, - "destination":None, - "consent":None, - #text=None, - #extension_elements=None, - #extension_attributes=None, - -} - -request = samlp.AuthnRequest( - id= query_id, - version= VERSION, - issue_instant= instant(), - assertion_consumer_service_url= service_url, - protocol_binding= binding -) diff --git a/src/saml2/pack.py b/src/saml2/pack.py index ff7bd0ad1..e4c146256 100644 --- a/src/saml2/pack.py +++ b/src/saml2/pack.py @@ -123,7 +123,6 @@ def http_redirect_message(message, location, relay_state="", typ="SAMLRequest", :param sigalg: Which algorithm the signature function will use to sign the message :param signer: A signature function that can be used to sign the message - :param key: Key to use for signing :return: A tuple containing header information and a HTML message. """ diff --git a/src/sp_test/__init__.py b/src/sp_test/__init__.py deleted file mode 100644 index d275c87ee..000000000 --- a/src/sp_test/__init__.py +++ /dev/null @@ -1,298 +0,0 @@ -from __future__ import print_function -import json -import pprint -import argparse -import os.path -import sys -import traceback -from importlib import import_module - -from idp_test import SCHEMA -from saml2 import root_logger - -from saml2.mdstore import MetadataStore, MetaData -from saml2.saml import NAME_FORMAT_UNSPECIFIED -from saml2.server import Server -from saml2.config import IdPConfig -from saml2.config import logging - -from sp_test.base import Conversation - -from saml2test import FatalError -from saml2test import CheckError -from saml2test import ContextFilter -from saml2test import exception_trace -from saml2test.check import CRITICAL - -__author__ = 'rolandh' - -#formatter = logging.Formatter("%(asctime)s %(name)s:%(levelname)s %(message)s") -formatter_2 = logging.Formatter( - "%(delta).6f - %(levelname)s - [%(name)s] %(message)s") - -cf = ContextFilter() -cf.start() - -streamhandler = logging.StreamHandler(sys.stderr) -streamhandler.setFormatter(formatter_2) - -memoryhandler = logging.handlers.MemoryHandler(1024 * 10, logging.DEBUG) -memoryhandler.addFilter(cf) - -logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) -logger.addHandler(memoryhandler) -logger.setLevel(logging.DEBUG) - - -class Client(object): - def __init__(self, operations, check_factory): - self.operations = operations - self.tests = None - self.check_factory = check_factory - - self._parser = argparse.ArgumentParser() - self._parser.add_argument("-c", dest="config", default="config", - help="Configuration file for the IdP") - self._parser.add_argument( - '-C', dest="ca_certs", - help=("CA certs to use to verify HTTPS server certificates, ", - "if HTTPS is used and no server CA certs are defined then ", - "no cert verification will be done")) - self._parser.add_argument('-d', dest='debug', action='store_true', - help="Print debug information") - self._parser.add_argument("-H", dest="pretty", action='store_true', - help="Output summary on stdout as pretty " - "printed python dict instead of JSON") - self._parser.add_argument("-i", dest="insecure", action='store_true', - help="Do not verify SSL certificate") - self._parser.add_argument("-I", dest="keysdir", default="keys", - help="Directory for invalid IDP keys") - self._parser.add_argument('-J', dest="json_config_file", - help="Test target configuration") - self._parser.add_argument( - '-k', dest='content_log', action='store_true', - help="Log HTTP content in spearate files in directory " - "/, which defaults to the path in -L") - self._parser.add_argument( - "-l", dest="list", action="store_true", - help="List all the test flows as a JSON object") - self._parser.add_argument("-L", dest="logpath", default=".", - help="Path to the logfile directory") - self._parser.add_argument('-m', dest="metadata", action='store_true', - help="Return the IdP metadata") - self._parser.add_argument( - "-P", dest="configpath", default=".", - help="Path to the configuration file for the IdP") - self._parser.add_argument("-t", dest="testpackage", - help="Module describing tests") - #self._parser.add_argument('-v', dest='verbose', action='store_true', - # help="Print runtime information") # unsused - self._parser.add_argument("-Y", dest="pysamllog", action='store_true', - help="Print PySAML2 logs") - self._parser.add_argument("oper", nargs="?", help="Which test to run") - - self.interactions = None - self.entity_id = None - self.constraints = {} - self.args = None - self.idp = None - self.idp_config = None - - def json_config_file(self): - if self.args.json_config_file == "-": - return json.loads(sys.stdin.read()) - else: - return json.loads(open(self.args.json_config_file).read()) - - def idp_configure(self, metadata_construction=False): - sys.path.insert(0, self.args.configpath) - mod = import_module(self.args.config) - self.idp_config = IdPConfig().load(mod.CONFIG, metadata_construction) - - if not self.args.insecure: - self.idp_config.verify_ssl_cert = False - else: - if self.args.ca_certs: - self.idp_config.ca_certs = self.args.ca_certs - else: - self.idp_config.ca_certs = "../keys/cacert.pem" - # hack to change idp cert without config change. TODO: find interface to - # change IDP cert after __init__ - if self.args.oper == 'sp-04': - self.idp_config.cert_file = os.path.join(self.args.keysdir, "non_md_cert.pem") - self.idp_config.key_file = os.path.join(self.args.keysdir, "non_md_key.pem") - for f in [self.idp_config.cert_file, self.idp_config.key_file]: - if not os.path.isfile(f): - print("File not found: %s" % os.path.abspath(f)) - raise - - self.idp = Server(config=self.idp_config) - - def test_summation(self, sid): - status = 0 - for item in self.test_log: - if item["status"] > status: - status = item["status"] - - if status == 0: - status = 1 - - info = { - "id": sid, - "status": status, - "tests": self.test_log - } - - if status == 5: - info["url"] = self.test_log[-1]["url"] - info["htmlbody"] = self.test_log[-1]["message"] - - return info - - def output_log(self, memhndlr, hndlr2): - """ - """ - - print(80 * ":", file=sys.stderr) - hndlr2.setFormatter(formatter_2) - memhndlr.setTarget(hndlr2) - memhndlr.flush() - memhndlr.close() - - def run(self): - self.args = self._parser.parse_args() - - if self.args.pysamllog: - root_logger.addHandler(memoryhandler) - root_logger.setLevel(logging.DEBUG) - - if self.args.metadata: - return self.make_meta() - elif self.args.list: - return self.list_operations() - elif self.args.oper == "check": - return self.verify_metadata() - else: - if not self.args.oper: - raise Exception("Missing test case specification") - self.args.oper = self.args.oper.strip("'") - self.args.oper = self.args.oper.strip('"') - - self.setup() - - try: - oper = self.operations.OPERATIONS[self.args.oper] - except KeyError: - if self.tests: - try: - oper = self.tests.OPERATIONS[self.args.oper] - except ValueError: - print("Undefined testcase " + self.args.oper, - file=sys.stderr) - return - else: - print("Undefined testcase " + self.args.oper, file=sys.stderr) - return - - if self.args.pretty: - pp = pprint.PrettyPrinter(indent=4) - else: - pp = None - - logger.info("Starting conversation") - conv = Conversation(self.idp, self.idp_config, - self.interactions, self.json_config, - check_factory=self.check_factory, - entity_id=self.entity_id, - constraints=self.constraints, - commandlineargs = self.args) - try: - conv.do_sequence_and_tests(oper["sequence"], oper["tests"]) - self.test_log = conv.test_output - tsum = self.test_summation(self.args.oper) - err = None - except CheckError as err: - self.test_log = conv.test_output - tsum = self.test_summation(self.args.oper) - except FatalError as err: - if conv: - self.test_log = conv.test_output - self.test_log.append(exception_trace("RUN", err)) - else: - self.test_log = exception_trace("RUN", err) - tsum = self.test_summation(self.args.oper) - except Exception as err: - if conv: - conv.test_output.append({"status": CRITICAL, - "name": "test driver error", - "id": "critial exception"}) - self.test_log = conv.test_output - self.test_log.append(exception_trace("RUN", err)) - else: - self.test_log = exception_trace("RUN", err) - tsum = self.test_summation(self.args.oper) - logger.error("Unexpected exception in test driver %s", - traceback.format_exception(*sys.exc_info())) - - - if pp: - pp.pprint(tsum) - else: - print(json.dumps(tsum), file=sys.stdout) - - if tsum["status"] > 1 or self.args.debug or err: - self.output_log(memoryhandler, streamhandler) - - def setup(self): - self.json_config = self.json_config_file() - - _jc = self.json_config - - try: - self.interactions = _jc["interaction"] - except KeyError: - self.interactions = [] - - self.idp_configure() - - metadata = MetadataStore(SCHEMA, self.idp_config.attribute_converters, - self.idp_config) - info = _jc["metadata"].encode("utf-8") - md = MetaData(SCHEMA, self.idp_config.attribute_converters, info) - md.load() - metadata[0] = md - self.idp.metadata = metadata - #self.idp_config.metadata = metadata - - if self.args.testpackage: - self.tests = import_module("sp_test.package.%s" % - self.args.testpackage) - - try: - self.entity_id = _jc["entity_id"] - # Verify its the correct metadata - assert self.entity_id in md.entity.keys() - except KeyError: - if len(md.entity.keys()) == 1: - self.entity_id = md.entity.keys()[0] - else: - raise Exception("Don't know which entity to talk to") - - if "constraints" in _jc: - self.constraints = _jc["constraints"] - if "name_format" not in self.constraints: - self.constraints["name_format"] = NAME_FORMAT_UNSPECIFIED - - def make_meta(self): - pass - - def list_operations(self): - res = [] - for key, val in self.operations.OPERATIONS.items(): - res.append({"id": key, "name": val["name"]}) - - print(json.dumps(res)) - - def verify_metadata(self): - pass diff --git a/src/sp_test/base.py b/src/sp_test/base.py deleted file mode 100644 index c1e549213..000000000 --- a/src/sp_test/base.py +++ /dev/null @@ -1,538 +0,0 @@ -import base64 -import cookielib -import re -import os -import traceback -import urllib -import sys -import six - -from urlparse import parse_qs -from saml2 import BINDING_HTTP_REDIRECT, class_name -from saml2 import BINDING_HTTP_POST -from saml2.request import SERVICE2REQUEST -from saml2.sigver import signed_instance_factory, pre_signature_part -from saml2.httputil import HttpParameters - -from saml2test import CheckError, FatalError -from saml2test.check import Check -from saml2test.check import ExpectedError -from saml2test.check import INTERACTION -from saml2test.check import STATUSCODE -from saml2test.interaction import Action -from saml2test.interaction import Interaction -from saml2test.interaction import InteractionNeeded - -from sp_test.tests import ErrorResponse - -__author__ = 'rolandh' - -import logging - -logger = logging.getLogger(__name__) - -FILE_EXT = {"text/html": "html", "test/plain": "txt", "application/json": "json", - "text/xml": "xml", "application/xml": "xml", } - -camel2underscore = re.compile('((?<=[a-z0-9])[A-Z]|(?!^)[A-Z](?=[a-z]))') - - -class Conversation(): - def __init__(self, instance, config, interaction, json_config, - check_factory, entity_id, msg_factory=None, - features=None, constraints=None, # verbose=False, - expect_exception=None, commandlineargs=None): - self.instance = instance - self._config = config - self.test_output = [] - self.features = features - #self.verbose = verbose # removed (not used) - self.check_factory = check_factory - self.msg_factory = msg_factory - self.expect_exception = expect_exception - self.commandlineargs = commandlineargs - - self.cjar = {"browser": cookielib.CookieJar(), - "rp": cookielib.CookieJar(), - "service": cookielib.CookieJar()} - - self.protocol_response = [] - self.last_response = None - self.last_content = None - self.response = None - self.interaction = Interaction(self.instance, interaction) - self.exception = None - - self.entity_id = entity_id - self.cjar = {"rp": cookielib.CookieJar()} - self.args = {} - self.qargs = {} - self.response_args = {} - self.saml_response = [] - self.destination = "" - self.request = None - self.position = "" - self.response = None - self.oper = None - self.msg_constraints = constraints - self.json_config = json_config - self.start_page = json_config["start_page"] - - def check_severity(self, stat): - if stat["status"] >= 3: - logger.error("WHERE: %s", stat["id"]) - logger.error("STATUS:%s", STATUSCODE[stat["status"]]) - try: - logger.error("HTTP STATUS: %s", stat["http_status"]) - except KeyError: - pass - try: - logger.error("INFO: %s", stat["message"]) - except KeyError: - pass - - raise CheckError - - def do_check(self, test, **kwargs): - if isinstance(test, six.string_types): - chk = self.check_factory(test)(**kwargs) - else: - chk = test(**kwargs) - if not chk.call_on_redirect() and \ - 300 < self.last_response.status_code <= 303: - pass - else: - stat = chk(self, self.test_output) - self.check_severity(stat) - - def err_check(self, test, err=None, bryt=True): - if err: - self.exception = err - chk = self.check_factory(test)() - chk(self, self.test_output) - if bryt: - e = FatalError("%s" % err) - e.trace = "".join(traceback.format_exception(*sys.exc_info())) - raise e - - def test_sequence(self, sequence): - if sequence is None: - return True - - for test in sequence: - if isinstance(test, tuple): - test, kwargs = test - else: - kwargs = {} - self.do_check(test, **kwargs) - if test == ExpectedError: - return False # TODO: return value is unused - return True - - def my_endpoints(self): - for serv in ["aa", "aq", "idp"]: - endpoints = self._config.getattr("endpoints", serv) - if endpoints: - for typ, spec in endpoints.items(): - for url, binding in spec: - yield url - - def which_endpoint(self, url): - for serv in ["aa", "aq", "idp"]: - endpoints = self._config.getattr("endpoints", serv) - if endpoints: - for typ, spec in endpoints.items(): - for endp, binding in spec: - if url.startswith(endp): - return typ, binding - return None - - def _log_response(self, response): - """Depending on -k argument write content to either logger or extra file - - Create the directory; delete all possibly existing files - Write response content into response_x. (with x incrementing from 0) - """ - logger.info("<-- Status: %s", response.status_code) - if response.status_code in [302, 301, 303]: - logger.info("<-- location: %s", - response.headers._store['location'][1]) - else: - if self.commandlineargs.content_log: - self._content_log_fileno = getattr(self, '_content_log_fileno', 0) + 1 - if not getattr(self, 'logcontentpath', None): - try: - content_type_hdr = response.headers._store['content-type'][1] - l = content_type_hdr.split(';') + ['charset=ISO-8859-1',] - content_type = l[0] - encoding = l[1].split("=") - ext = "." + FILE_EXT[content_type] - except Exception as e: - ext = "" - self._logcontentpath = os.path.join( - self.commandlineargs.logpath, - self.commandlineargs.oper) - if not os.path.exists(self._logcontentpath): - os.makedirs(self._logcontentpath) - for fn in os.listdir(self._logcontentpath): - old_file = os.path.join(self._logcontentpath, fn) - if os.path.isfile(old_file): - os.unlink(old_file) - fn = os.path.join(self._logcontentpath, "response_%d%s" - % (self._content_log_fileno, ext )) - f = open(fn, "w") - f.write(response.content) - f.close() - logger.info("<-- Response content (encoding=%s) in file %s", - encoding, fn) - pass - else: - logger.info("<-- Content: %s", response.content) - - def wb_send_GET_startpage(self): - """ - The action that starts the whole sequence, a HTTP GET on a web page - """ - self.last_response = self.instance.send(self.start_page) - self._log_response(self.last_response) - - def handle_result(self, check_response=None): - if check_response: - if isinstance(check_response(), Check): - if 300 < self.last_response.status_code <= 303: - self._redirect(self.last_response) - self.do_check(check_response) - else: - # A HTTP redirect or HTTP Post - if 300 < self.last_response.status_code <= 303: - self._redirect(self.last_response) - - if self.last_response.status_code >= 400: - raise FatalError(self.last_response.reason) - - _txt = self.last_response.content - assert _txt.startswith("

") - else: - if 300 < self.last_response.status_code <= 303: - self._redirect(self.last_response) - - _txt = self.last_response.content - if self.last_response.status_code >= 400: - raise FatalError("Did not expected error") - - def parse_saml_message(self): - try: - url, query = self.last_response.headers["location"].split("?") - except KeyError: - return - - _dict = parse_qs(query) - try: - self.relay_state = _dict["RelayState"][0] - except KeyError: - self.relay_state = "" - _str = _dict["SAMLRequest"][0] - self.saml_request = self.instance._parse_request( - _str, SERVICE2REQUEST[self._endpoint], self._endpoint, - self._binding) - if self._binding == BINDING_HTTP_REDIRECT: - self.http_parameters = HttpParameters(_dict) - - def _redirect(self, _response): - rdseq = [] - url = None - while _response.status_code in [302, 301, 303]: - url = _response.headers["location"] - if url in rdseq: - raise FatalError("Loop detected in redirects") - else: - rdseq.append(url) - if len(rdseq) > 8: - raise FatalError( - "Too long sequence of redirects: %s" % rdseq) - - logger.info("--> REDIRECT TO: %s", url) - # If back to me - for_me = False - try: - self._endpoint, self._binding = self.which_endpoint(url) - for_me = True - except TypeError: - pass - - if for_me: - break - else: - try: - _response = self.instance.send(url, "GET") - except Exception as err: - raise FatalError("%s" % err) - - self._log_response(_response) - self.last_response = _response - if _response.status_code >= 400: - break - return url - - def send_idp_response(self, req_flow, resp_flow): - """ - :param req_flow: The flow to check the request - :param resp_flow: The flow to prepare the response - :return: The SP's HTTP response on receiving the SAML response - """ - - # Pick information from the request that should be in the response - args = self.instance.response_args(self.saml_request.message, - [resp_flow._binding]) - _mods = list(resp_flow.__mro__[:]) - _mods.reverse() - for m in _mods: - try: - args.update(self.json_config["args"][m.__name__]) - except KeyError: - pass - - args.update(resp_flow._response_args) - - for param in ["identity", "userid"]: - if param in self.json_config: - args[param] = self.json_config[param] - - if resp_flow == ErrorResponse: - func = getattr(self.instance, "create_error_response") - else: - _op = camel2underscore.sub(r'_\1', req_flow._class.c_tag).lower() - func = getattr(self.instance, "create_%s_response" % _op) - - # get from config which parts shall be signed - sign = [] - for styp in ["sign_assertion", "sign_response"]: - if styp in args: - try: - if args[styp].lower() == "always": - sign.append(styp) - del args[styp] - except (AttributeError, TypeError): - raise AssertionError('config parameters "sign_assertion", ' - '"sign_response" must be of type string') - - response = func(**args) - response = resp_flow(self).pre_processing(response) - # and now for signing - if sign: - to_sign = [] - try: - _digest_alg=args["sign_digest_alg"] - except KeyError: - _digest_alg=None - try: - _sign_alg=args["sign_signature_alg"] - except KeyError: - _sign_alg=None - # Order is important, first assertion and then response if both - if "sign_assertion" in sign: - to_sign = [(class_name(response.assertion), - response.assertion.id)] - response.assertion.signature = pre_signature_part( - response.assertion.id, self.instance.sec.my_cert, 1, - digest_alg=_digest_alg, sign_alg=_sign_alg) - if "sign_response" in sign: - to_sign = [(class_name(response), response.id)] - response.signature = pre_signature_part( - response.id, self.instance.sec.my_cert, 1, - digest_alg=_digest_alg, sign_alg=_sign_alg) - - response = signed_instance_factory(response, self.instance.sec, - to_sign) - - info = self.instance.apply_binding(resp_flow._binding, response, - args["destination"], - self.relay_state, - "SAMLResponse", resp_flow._sign) - - if resp_flow._binding == BINDING_HTTP_REDIRECT: - url = None - for param, value in info["headers"]: - if param == "Location": - url = value - break - self.last_response = self.instance.send(url) - elif resp_flow._binding == BINDING_HTTP_POST: - resp_flow = base64.b64encode("%s" % response) - info["data"] = urllib.urlencode({"SAMLResponse": resp_flow, - "RelayState": self.relay_state}) - info["method"] = "POST" - info["headers"] = { - 'Content-type': 'application/x-www-form-urlencoded'} - self.last_response = self.instance.send(**info) - try: - self.last_content = self.last_response.content - except AttributeError: - self.last_content = None - - self._log_response(self.last_response) - - def do_flow(self, flow, mid_tests): - """ - Solicited or 'un-solicited' flows. - - Solicited always starts with the Web client accessing a page. - Un-solicited starts with the IDP sending a SAMl Response. - """ - if len(flow) >= 3: - self.wb_send_GET_startpage() - self.intermit(flow[0]._interaction) - self.parse_saml_message() - # make sure I got the request I expected - assert isinstance(self.saml_request.message, flow[1]._class) - try: - self.test_sequence(mid_tests) - except KeyError: - pass - self.send_idp_response(flow[1], flow[2]) - if len(flow) == 4: - self.handle_result(flow[3]) - else: - self.handle_result() - - def do_sequence_and_tests(self, oper, tests=None): - self.current_oper = oper - try: - self.test_sequence(tests["pre"]) - except KeyError: - pass - - for flow in oper: - try: - self.do_flow(flow, tests["mid"]) - except InteractionNeeded: - self.test_output.append({"status": INTERACTION, - "message": "see detail log for response content", - "id": "exception", - "name": "interaction needed", - "url": self.position}) - break - except FatalError: - raise - except Exception as err: - #self.err_check("exception", err) - raise - - try: - self.test_sequence(tests["post"]) - except KeyError: - pass - - def intermit(self, page_types): - """ - Currently handles only SP-issued redirects - - :param page_types: not used (could be used to implement wayf, disco) - """ - _response = self.last_response - _last_action = None - _same_actions = 0 - if _response.status_code >= 400: - try: - self.last_content = _response.text - except AttributeError: - self.last_content = None - raise FatalError( - "HTTP response status code: %d" % _response.status_code) - - url = _response.url - content = _response.text - done = False - while not done: - rdseq = [] - while _response.status_code in [302, 301, 303]: - url = _response.headers["location"] - if url in rdseq: - raise FatalError("Loop detected in redirects") - else: - rdseq.append(url) - if len(rdseq) > 8: - raise FatalError( - "Too long sequence of redirects: %s" % rdseq) - - # If back to me - for_me = False - try: - self._endpoint, self._binding = self.which_endpoint(url) - for_me = True - except TypeError: - pass - - if for_me: - done = True - break - else: - try: - _response = self.instance.send(url, "GET") - except Exception as err: - raise FatalError("%s" % err) - - self._log_response(_response) - content = _response.text - self.position = url - self.last_content = content - self.response = _response - - if _response.status_code >= 400: - done = True - break - - if done or url is None: - break - - _base = url.split("?")[0] - - try: - _spec = self.interaction.pick_interaction(_base, content) - except InteractionNeeded: - self.position = url - logger.error("Page Content: %s", content) - raise - except KeyError: - self.position = url - logger.error("Page Content: %s", content) - self.err_check("interaction-needed") - - if _spec == _last_action: - _same_actions += 1 - if _same_actions >= 3: - raise InteractionNeeded("Interaction loop detection") - else: - _last_action = _spec - - if len(_spec) > 2: - logger.info(">> %s <<", _spec["page-type"]) - if _spec["page-type"] == "login": - self.login_page = content - - _op = Action(_spec["control"]) - - try: - _response = _op(self.instance, self, logger, url, - _response, content, self.features) - if isinstance(_response, dict): - self.last_response = _response - self.last_content = _response - return _response - content = _response.text - self.position = url - self.last_content = content - self.response = _response - - if _response.status_code >= 400: - break - except (FatalError, InteractionNeeded): - raise - except Exception as err: - self.err_check("exception", err, False) - - self.last_response = _response - try: - self.last_content = _response.text - except AttributeError: - self.last_content = None diff --git a/src/sp_test/check.py b/src/sp_test/check.py deleted file mode 100644 index 1ad930544..000000000 --- a/src/sp_test/check.py +++ /dev/null @@ -1,258 +0,0 @@ -import inspect -import logging -import re -import sys - -from saml2 import BINDING_HTTP_POST, BINDING_HTTP_REDIRECT -from saml2test.check import Check -from saml2test.check import ERROR, INFORMATION, WARNING -from saml2test import check -from saml2test.interaction import Interaction - -__author__ = 'rolandh' - -logger = logging.getLogger(__name__) - - -class VerifyAuthnRequest(Check): - """ Basic AuthnRequest verification as provided by pysaml2 - """ - cid = "verify-authnrequest" - - def _func(self, conv): - try: - conv.saml_request.message.verify() - except ValueError: - self._status = ERROR - - return {} - - -class MatchResult(Check): - cid = "match-result" - - def _func(self, conv): - interaction = Interaction(conv.instance, [conv.json_config["result"]]) - _int = interaction.pick_interaction(content=conv.last_response.content) - - return {} - - -class ErrorResponse(Check): - cid = "saml-error" - msg = "Expected error message, but test target returned OK" - - def _func(self, conv): - try: - assert conv.last_response.status_code >= 400 - except AssertionError: - self._message = self.msg - self._status = ERROR - return {} - - -class VerifyDigestAlgorithm(Check): - """ - verify that the used digest algorithm was one from the approved set. - """ - cid = "verify-digest-algorithm" - - - def _digest_algo(self, signature, allowed): - _alg = signature.signed_info.reference[0].digest_method.algorithm - try: - assert alg in allowed - except AssertionError: - self._message = "signature digest algorithm not allowed: " + alg - self._status = ERROR - return False - return True - - def _func(self, conv): - if "digest_algorithm" not in conv.msg_constraints: - logger.info("Not verifying digest_algorithm (not configured)") - return {} - else: - try: - assert len(conv.msg_constraints["digest_algorithm"]) > 0 - except AssertionError: - self._message = "List of allowed digest algorithm must not be empty" - self._status = ERROR - return {} - _algs = conv.msg_constraints["digest_algorithm"] - - request = conv.saml_request.message - - if request.signature: - if not self._digest_algo(request.signature, _algs): - return {} - elif conv._binding == BINDING_HTTP_REDIRECT: - self._message = "no digest with redirect binding" - self._status = INFORMATION - return {} - elif conv._binding == BINDING_HTTP_POST: - self._message = "cannot verify digest algorithm: request not signed" - self._status = WARNING - return {} - - - return {} - - -class VerifyIfRequestIsSigned(Check): - """ - verify that the request has been signed - """ - cid = "verify-if-request-is-signed" - - def _func(self, conv): - try: - check_sig = conv.msg_constraints["authnRequest_signature_required"] - except KeyError: - check_sig = False - if check_sig: - if conv._binding == BINDING_HTTP_REDIRECT: - try: - assert conv.http_parameters.signature is not None - except AssertionError: - self._message = "No AuthnRequest simple signature found" - self._status = ERROR - return {} - else: - try: - assert conv.saml_request.message.signature is not None - except AssertionError: - self._message = "No AuthnRequest XML signature found" - self._status = ERROR - return {} - else: - logger.debug("AuthnRequest signature is optional") - return {} - return {} - - -class VerifySignatureAlgorithm(Check): - """ - verify that the used signature algorithm was one from an approved set. - """ - cid = "verify-signature-algorithm" - - def _func(self, conv): - if "signature_algorithm" not in conv.msg_constraints: - logger.info("Not verifying signature_algorithm (not configured)") - return {} - else: - try: - assert len(conv.msg_constraints["signature_algorithm"]) > 0 - except AssertionError: - self._message = "List of allowed signature algorithm must " \ - "not be empty" - self._status = ERROR - return {} - - allowed_algs = [a[1] for a in conv.msg_constraints["signature_algorithm"]] - if conv._binding == BINDING_HTTP_REDIRECT: - if getattr(conv.http_parameters, "signature", None): - _alg = conv.http_parameters.sigalg - try: - assert _alg in allowed_algs - except AssertionError: - self._message = "Algorithm not in white list for " \ - "redirect signing: " + _alg - self._status = ERROR - else: - signature = getattr(conv.saml_request.message, "signature", None) - if signature: - try: - assert signature.signed_info.signature_method.algorithm in \ - allowed_algs - except AssertionError: - self._message = "Wrong algorithm used for signing: '%s'" % \ - signature.signed_info.signature_method.algorithm - self._status = ERROR - else: - self._message = "cannot verify signature algorithm: request not signed" - self._status = WARNING - return {} - return {} - - -class VerifyEchopageContents(Check): - """ Verify that the last success response (HTTP code 200) from the SP - contains static text and SAML response values - """ - cid = "verify-echopage-contents" - msg = "Cannot match expected contents on SP echo page" - - def _func(self, conv): - if conv.last_response.status_code < 300: - try: - pattern = conv.json_config["echopageIdPattern"] - m = re.search(pattern, conv.last_response.content) - try: - assert m is not None - except AssertionError: - self._message = "Cannot match expected static contents " \ - "in SP echo page" - self._status = ERROR - for pattern in conv.json_config["echopageContentPattern"]: - m = re.search(pattern, conv.last_response.content) - try: - assert m is not None - except AssertionError: - self._message = 'Cannot match expected response value' \ - ', pattern="' + pattern + '"' - self._status = ERROR - except KeyError: - self._message = 'Configuration error: missing key ' \ - '"echopageIdString" in test target config' - self._status = ERROR - return {} - - def call_on_redirect(self): - return False - - -class SetResponseAndAssertionSignaturesFalse(Check): - """ Prepare config to suppress signatures of both response and assertion""" - cid = "set-response-and-assertion-signature-false" - msg = "Prepare config to suppress signatures of both response and assertion" - - def _func(self, conv): - conv.json_config['args']['AuthnResponse']['sign_assertion'] = 'never' - conv.json_config['args']['AuthnResponse']['sign_response'] = 'never' - self._status = INFORMATION - return {} - - -#class SetInvalidIdpKey(Check): -# """ Prepare config to set IDP signing key to some useless key""" -# cid = "set-idp-key-invalid" -# msg = "Prepare config to set IDP signing key invalid" -# -# def _func(self, conv): -# conv.instance.sec.cert_file = conv.instance.config.invalid_idp_cert_file -# conv.instance.sec.key_file = conv.instance.config.invalid_idp_key_file -# return {} - - -# ============================================================================= - - -CLASS_CACHE = {} - - -def factory(cid, classes=CLASS_CACHE): - if len(classes) == 0: - check.factory(cid, classes) - for name, obj in inspect.getmembers(sys.modules[__name__]): - if inspect.isclass(obj): - try: - classes[obj.cid] = obj - except AttributeError: - pass - - if cid in classes: - return classes[cid] - else: - return None diff --git a/src/sp_test/tests.py b/src/sp_test/tests.py deleted file mode 100644 index d75a140b0..000000000 --- a/src/sp_test/tests.py +++ /dev/null @@ -1,774 +0,0 @@ -import copy -from saml2 import samlp, SamlBase -from saml2 import NAMEID_FORMAT_EMAILADDRESS -from saml2 import BINDING_HTTP_REDIRECT -from saml2 import BINDING_HTTP_POST -from saml2.s_utils import rndstr - -from saml2.saml import SCM_BEARER, Condition, XSI_TYPE, Audience -from saml2.saml import NAMEID_FORMAT_PERSISTENT -from saml2.saml import SCM_SENDER_VOUCHES -from saml2.saml import ConditionAbstractType_ -from saml2.samlp import STATUS_AUTHN_FAILED -from saml2.time_util import in_a_while, a_while_ago -from sp_test import check -from sp_test.check import VerifyAuthnRequest, VerifyDigestAlgorithm -from sp_test.check import VerifySignatureAlgorithm, VerifyIfRequestIsSigned -from sp_test.check import SetResponseAndAssertionSignaturesFalse -from saml2test.check import CheckSpHttpResponseOK, CheckSpHttpResponse500 -from saml2test import ip_addresses - -__author__ = 'rolandh' - -USER = { - "adam": { - "given_name": "Adam", - "sn": "Andersson" - }, - "eva": { - "given_name": "Eva", - "sn": "Svensson" - } -} - - -# Extension class - extra condition -class TimeRestriction(ConditionAbstractType_): - """ """ - - c_tag = 'TimeRestriction' - c_namespace = "urn:mace:umu.se:sso" - c_children = ConditionAbstractType_.c_children.copy() - c_attributes = ConditionAbstractType_.c_attributes.copy() - c_child_order = ConditionAbstractType_.c_child_order[:] - c_cardinality = ConditionAbstractType_.c_cardinality.copy() - c_attributes['StartTime'] = ('start_time', 'time', False) - c_attributes['EndTime'] = ('end_time', 'time', False) - - def __init__(self, - start_time=None, - end_time=None, - text=None, - extension_elements=None, - extension_attributes=None): - ConditionAbstractType_.__init__( - self, text=text, extension_elements=extension_elements, - extension_attributes=extension_attributes) - self.start_time = start_time - self.end_time = end_time - - -# ============================================================================= - - -class Response(object): - _args = {} - _class = samlp.Response - _sign = False - tests = {"pre": [], "post": []} - - def __init__(self, conv): - self.args = self._args.copy() - self.conv = conv - - def setup(self): - pass - - def pre_processing(self, message, *kwargs): - return message - - def post_processing(self, message, *kwargs): - return message - - -class Request(object): - response = "" - _class = None - tests = {"pre": [], - "mid": [VerifyAuthnRequest], - "post": []} - - def __init__(self): - pass - - def __call__(self, conv, response): - pass - - -class Operation(object): - pass - - -class AuthnResponse(Response): - _response_args = { - "identity": USER["adam"], - "userid": "adam", - } - _binding = BINDING_HTTP_POST - - -class AuthnResponse_redirect(AuthnResponse): - _binding = BINDING_HTTP_REDIRECT - - -class ErrorResponse(Response): - _response_args = { - "info": (STATUS_AUTHN_FAILED, "Unknown user") - } - _binding = BINDING_HTTP_POST - - -class LogoutResponse(Response): - _class = samlp.LogoutRequest - pass - - -class Login(Operation): - _interaction = ["wayf"] - - -class AuthnRequest(Request): - _class = samlp.AuthnRequest - - -class AuthnResponse_NameIDformat_persistent(AuthnResponse): - def pre_processing(self, message, **kwargs): - name_id = message.assertion.subject.name_id - name_id.name_format = NAMEID_FORMAT_PERSISTENT - return message - - -class AuthnResponse_NameIDformat_email(AuthnResponse): - def pre_processing(self, message, **kwargs): - name_id = message.assertion.subject.name_id - name_id.name_format = NAMEID_FORMAT_EMAILADDRESS - name_id.text = "adam@example.com" - return message - - -class AuthnResponse_NameIDformat_foo(AuthnResponse): - def pre_processing(self, message, **kwargs): - name_id = message.assertion.subject.name_id - name_id.name_format = "foo" - name_id.text = "fruit basket" - return message - - -class AuthnResponse_without_SubjectConfirmationData_1(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - _confirmation.subject_confirmation_data = None - _confirmation.method = SCM_SENDER_VOUCHES - return message - - -class AuthnResponse_without_SubjectConfirmationData_2(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - _confirmation.subject_confirmation_data = None - _confirmation.method = SCM_BEARER - return message - - -class AuthnResponse_rnd_Response_inresponseto(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.in_response_to = "invalid_rand_" + rndstr(6) - return message - - -class AuthnResponse_rnd_Response_assertion_inresponseto(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.assertion.in_response_to = "invalid_rand_" + rndstr(6) - return message - - -class AuthnResponse_Response_no_inresponse(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.in_response_to = None - return message - - -class AuthnResponse_SubjectConfirmationData_no_inresponse(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - _confirmation[0].subject_confirmation_data.in_response_to = None - return message - - -class AuthnResponse_wrong_Recipient(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - _confirmation[0].subject_confirmation_data.recipient = rndstr(16) - return message - - -class AuthnResponse_missing_Recipient(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - _confirmation[0].subject_confirmation_data.recipient = None - return message - - -class AuthnResponse_missing_Recipient(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - _confirmation[0].subject_confirmation_data.recipient = None - return message - - -class AuthnResponse_broken_destination(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.destination = "NotAUrl" - return message - - -class AuthnResponse_correct_recipient_address(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - if "localhost" in self.conv.entity_id: - addr = "127.0.0.1" - else: - addr = ip_addresses()[0] - _confirmation[0].subject_confirmation_data.address = addr - return message - - -class AuthnResponse_incorrect_recipient_address(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - _confirmation[0].subject_confirmation_data.address = "10.0.0.1" - return message - - -class AuthnResponse_2_recipients_me_last(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - sc = copy.copy(_confirmation[0]) - if "localhost" in self.conv.entity_id: - addr = "127.0.0.1" - else: - addr = ip_addresses()[0] - sc.subject_confirmation_data.address = addr - _confirmation.insert(0, sc) - return message - - -class AuthnResponse_2_recipients_me_first(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation - sc = copy.copy(_confirmation[0]) - if "localhost" in self.conv.entity_id: - addr = "127.0.0.1" - else: - addr = ip_addresses()[0] - sc.subject_confirmation_data.address = addr - _confirmation.append(sc) - return message - - -class AuthnResponse_unknown_condition(AuthnResponse): - def pre_processing(self, message, **kwargs): - conditions = message.assertion.conditions - conditions.condition = [Condition( - extension_elements=[TimeRestriction(start_time="08:00:00", - end_time="17:00:00")], - extension_attributes={XSI_TYPE: "foo:bas"})] - return message - - -class AuthnResponse_future_NotBefore(AuthnResponse): - def pre_processing(self, message, **kwargs): - conditions = message.assertion.conditions - # Valid starting five hours from now - conditions.not_before = in_a_while(hours=5) - return message - - -class AuthnResponse_past_NotOnOrAfter(AuthnResponse): - def pre_processing(self, message, **kwargs): - conditions = message.assertion.conditions - # Valid up until five hours ago - conditions.not_on_or_after = a_while_ago(hours=5) - return message - - -class AuthnResponse_past_SubjectConfirmationData_NotOnOrAfter(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation[0] - _confirmation.subject_confirmation_data.not_on_or_after = a_while_ago( - hours=5) - return message - - -class AuthnResponse_future_SubjectConfirmationData_NotBefore(AuthnResponse): - def pre_processing(self, message, **kwargs): - _confirmation = message.assertion.subject.subject_confirmation[0] - _confirmation.subject_confirmation_data.not_before = in_a_while( - hours=5) - return message - - -class AuthnResponse_past_AuthnStatement_SessionNotOnOrAfter(AuthnResponse): - def pre_processing(self, message, **kwargs): - _statement = message.assertion.authn_statement[0] - _statement.session_not_on_or_after = a_while_ago(hours=5) - return message - - -class AuthnResponse_missing_AuthnStatement(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.assertion.authn_statement = [] - return message - - -class AuthnResponse_future_24h_IssueInstant(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.assertion.issue_instant = in_a_while(hours=24) - return message - - -class AuthnResponse_past_24h_IssueInstant(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.assertion.issue_instant = a_while_ago(hours=24) - return message - - -class AuthnResponse_datetime_millisecond(AuthnResponse): - def pre_processing(self, message, **kwargs): - message.assertion.issue_instant = in_a_while(milliseconds=123) - return message - - -class AuthnResponse_AudienceRestriction_no_audience(AuthnResponse): - def pre_processing(self, message, **kwargs): - conditions = message.assertion.conditions - conditions.audience_restriction[0].audience = None - return message - - -class AuthnResponse_AudienceRestriction_wrong_audience(AuthnResponse): - def pre_processing(self, message, **kwargs): - conditions = message.assertion.conditions - conditions.audience_restriction[0].audience = [ - Audience("http://saml.example.com")] - return message - - -class AuthnResponse_AudienceRestriction_prepended_audience(AuthnResponse): - def pre_processing(self, message, **kwargs): - conditions = message.assertion.conditions - extra = Audience("http://saml.example.com") - conditions.audience_restriction[0].audience.insert(0, extra) - return message - - -class AuthnResponse_AudienceRestriction_appended_audience(AuthnResponse): - def pre_processing(self, message, **kwargs): - conditions = message.assertion.conditions - extra = Audience("http://saml.example.com") - conditions.audience_restriction[0].audience.append(extra) - return message - - -PHASES = { - "login_redirect": (Login, AuthnRequest, AuthnResponse_redirect), -} - -# Each operation defines 4 flows and 3 sets of tests, in chronological order: -# test "pre": executes before anything is sent to the SP -# flow 0: Start conversation flow -# flow 1: SAML request flow -# test "mid": executes after receiving the SAML request -# flow 2: SAML response flow -# flow 3: check SP response after authentication -# test "post": executes after finals response has been received from SP - -OPERATIONS = { - 'sp-00': { - "name": 'Basic Login test expect HTTP 200 result', - "descr": 'WebSSO verify authentication request, verify ' - 'HTTP-Response after sending the SAML response', - "sequence": [(Login, AuthnRequest, AuthnResponse, CheckSpHttpResponseOK)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'sp-01': { - "name": 'Login OK & echo page verification test', - "descr": 'Same as SP-00, then check if result page is displayed', - "sequence": [(Login, AuthnRequest, AuthnResponse, check.VerifyEchopageContents)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'sp-02': { - "name": 'Require AuthnRequest to be signed', - "descr": 'Same as SP-00, and check if a request signature can be found', - "sequence": [(Login, AuthnRequest, AuthnResponse, None)], - "tests": {"pre": [], "mid": [VerifyIfRequestIsSigned], "post": []} - }, - 'sp-03': { - "name": 'Reject unsigned reponse/assertion', - "descr": 'Check if SP flags missing signature with HTTP 500', - "sequence": [(Login, AuthnRequest, AuthnResponse, CheckSpHttpResponse500)], - "tests": {"pre": [SetResponseAndAssertionSignaturesFalse], "mid": [], "post": []} - }, - 'sp-04': { # test-case specific code in sp_test/__init__ - "name": 'Reject siganture with invalid IDP key', - "descr": 'IDP-key for otherwise valid signature not in metadata - expect HTTP 500 result', - "sequence": [(Login, AuthnRequest, AuthnResponse, CheckSpHttpResponse500)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'sp-05': { - "name": 'Verify digest algorithm', - "descr": 'Trigger WebSSO AuthnRequest and verify that the used ' - 'digest algorithm was one from the approved set.', - "sequence": [(Login, AuthnRequest, AuthnResponse, None)], - "tests": {"pre": [], "mid": [VerifyDigestAlgorithm], "post": []} - }, - 'sp-06': { - "name": 'Verify signature algorithm', - "descr": 'Trigger WebSSO AuthnRequest and verify that the used ' - 'signature algorithm was one from the approved set.', - "sequence": [(Login, AuthnRequest, AuthnResponse, None)], - "tests": {"pre": [], "mid": [VerifySignatureAlgorithm], "post": []} - }, - 'sp-08': { - "name": "SP should accept a Response without a " - "SubjectConfirmationData element. If confirmation method" - "is SCM_SENDER_VOUCHES", - "sequence": [(Login, AuthnRequest, - AuthnResponse_without_SubjectConfirmationData_2, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL02': { - "name": 'Verify various aspects of the generated AuthnRequest message', - "descr": 'Basic Login test', - "sequence": [], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL03': { - "name": "SP should not accept a Response as valid, when the StatusCode" - " is not success", - "sequence": [(Login, AuthnRequest, ErrorResponse, check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL04': { - "name": "SP should accept a NameID with Format: persistent", - "sequence": [(Login, AuthnRequest, - AuthnResponse_NameIDformat_persistent, None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL05': { - "name": "SP should accept a NameID with Format: e-mail", - "sequence": [(Login, AuthnRequest, AuthnResponse_NameIDformat_email, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL06': { - "name": "Do SP work with unknown NameID Format, such as : foo", - "sequence": [(Login, AuthnRequest, AuthnResponse_NameIDformat_foo, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL07': { - "name": "SP should accept a Response without a " - "SubjectConfirmationData element. If confirmation method " - "is SCM_SENDER_VOUCHES", - "sequence": [(Login, AuthnRequest, - AuthnResponse_without_SubjectConfirmationData_1, None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL09': { - "name": "SP should not accept a response InResponseTo " - "which is chosen randomly", - "sequence": [(Login, AuthnRequest, - AuthnResponse_rnd_Response_inresponseto, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL10': { - "name": "SP should not accept an assertion InResponseTo " - "which is chosen randomly", - "sequence": [(Login, AuthnRequest, - AuthnResponse_rnd_Response_assertion_inresponseto, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL11': { - "name": "Does the SP allow the InResponseTo attribute to be missing" - "from the Response element?", - "sequence": [(Login, AuthnRequest, - AuthnResponse_Response_no_inresponse, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL12': { - "name": "Does the SP allow the InResponseTo attribute to be missing" - "from the SubjectConfirmationData element?" - "(Test is questionable - review)", # TODO - "sequence": [(Login, AuthnRequest, - AuthnResponse_SubjectConfirmationData_no_inresponse, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL13': { - "name": "SP should not accept a broken DestinationURL attribute", - "sequence": [(Login, AuthnRequest, - AuthnResponse_broken_destination, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - # New untested - 'FL14a': { - "name": "SP should not accept wrong Recipient attribute", - "sequence": [(Login, AuthnRequest, - AuthnResponse_broken_destination, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL14b': { - "name": "SP should not accept missing Recipient attribute", - "sequence": [(Login, AuthnRequest, - AuthnResponse_missing_Recipient, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL20': { - "name": "Accept a Response with a SubjectConfirmationData elements " - "with a correct @Address attribute", - "sequence": [(Login, AuthnRequest, - AuthnResponse_correct_recipient_address, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL21': { - "name": "Accept a Response with a SubjectConfirmationData elements " - "with a incorrect @Address attribute", - "sequence": [(Login, AuthnRequest, - AuthnResponse_incorrect_recipient_address, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL22': { - "name": "Accept a Response with two SubjectConfirmationData elements" - "representing two recipients (test 1 of 2, correct one last)", - "sequence": [(Login, AuthnRequest, - AuthnResponse_2_recipients_me_last, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL23': { - "name": "Accept a Response with two SubjectConfirmationData elements" - "representing two recipients (test 1 of 2, correct one last)", - "sequence": [(Login, AuthnRequest, - AuthnResponse_2_recipients_me_first, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL26': { - "name": "Reject an assertion containing an unknown Condition.", - "sequence": [(Login, AuthnRequest, - AuthnResponse_unknown_condition, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL27': { - "name": "Reject a Response with a Condition with a NotBefore in the " - "future.", - "sequence": [(Login, AuthnRequest, - AuthnResponse_future_NotBefore, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL28': { - "name": "Reject a Response with a Condition with a NotOnOrAfter in " - "the past.", - "sequence": [(Login, AuthnRequest, - AuthnResponse_future_NotBefore, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL29': { - "name": "Reject a Response with a SubjectConfirmationData@NotOnOrAfter " - "in the past", - "sequence": [(Login, AuthnRequest, - AuthnResponse_past_SubjectConfirmationData_NotOnOrAfter, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL24': { - "name": "Reject a Response with a SubjectConfirmationData@NotBefore " - "in the future", - "sequence": [(Login, AuthnRequest, - AuthnResponse_future_SubjectConfirmationData_NotBefore, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL30': { - "name": "Reject a Response with an AuthnStatement where " - "SessionNotOnOrAfter is set in the past.", - "sequence": [(Login, AuthnRequest, - AuthnResponse_past_AuthnStatement_SessionNotOnOrAfter, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL31': { - "name": "Reject a Response with an AuthnStatement missing", - "sequence": [(Login, AuthnRequest, - AuthnResponse_missing_AuthnStatement, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL32': { - "name": "Reject an IssueInstant far (24 hours) into the future", - "sequence": [(Login, AuthnRequest, - AuthnResponse_future_24h_IssueInstant, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL33': { - "name": "Reject an IssueInstant far (24 hours) into the past", - "sequence": [(Login, AuthnRequest, - AuthnResponse_past_24h_IssueInstant, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL34': { - "name": "Accept xs:datetime with millisecond precision " - "http://www.w3.org/TR/xmlschema-2/#dateTime", - "sequence": [(Login, AuthnRequest, - AuthnResponse_datetime_millisecond, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL36': { - "name": "Reject a Response with a Condition with a empty set of " - "Audience.", - "sequence": [(Login, AuthnRequest, - AuthnResponse_AudienceRestriction_no_audience, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL37': { - "name": "Reject a Response with a Condition with a wrong Audience.", - "sequence": [(Login, AuthnRequest, - AuthnResponse_AudienceRestriction_wrong_audience, - check.ErrorResponse)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL38': { - "name": "Accept a Response with a Condition with an additional " - "Audience prepended", - "sequence": [(Login, AuthnRequest, - AuthnResponse_AudienceRestriction_prepended_audience, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, - 'FL39': { - "name": "Accept a Response with a Condition with an additional " - "Audience appended", - "sequence": [(Login, AuthnRequest, - AuthnResponse_AudienceRestriction_appended_audience, - None)], - "tests": {"pre": [], "mid": [], "post": []} - }, -} - -# -# SP should not accept a broken Recipient attribute in assertion -# SubjectConfirmationData/@Recipient -# SP should not accept a broken DestinationURL attribute in response -# SP should accept a Response with two SubjectConfirmationData elements -# representing two recipients (test 1 of 2, correct one last) -# SP should accept a Response with two SubjectConfirmationData elements -# representing two recipients (test 1 of 2, correct one first) -# SP should accept a Response with two SubjectConfirmation elements -# representing two recipients (test 1 of 2, correct one last) -# SP should accept a Response with two SubjectConfirmation elements -# representing two recipients (test 1 of 2, correct one first) -# SP should accept a Response with a SubjectConfirmationData elements with a -# correct @Address attribute -# SP should nnot accept a Response with a SubjectConfirmationData elements -# with a incorrect @Address attribute -# SP should accept a Response with multiple SubjectConfirmation elements -# with /SubjectConfirmationData/@Address-es, where one is correct (test 1 of -# 2, correct o last) -# SP should accept a Response with multiple SubjectConfirmationData elements -# with /SubjectConfirmationData/@Address-es, where one is correct (test 1 of -# 2, corr one last) -# SP should accept a Response with multiple SubjectConfirmationData elements -# with /SubjectConfirmationData/@Address-es, where one is correct (test 1 of -# 2, corr one first) -# SP Should not accept an assertion containing an uknown Condition -# SP should not accept a Response with a Condition with a NotBefore in the -# future. -# SP should not accept a Response with a Condition with a NotOnOrAfter in -# the past. -# SP should not accept a Response with a -# SubjectConfirmationData@NotOnOrAfter in the past -# SP should not accept a Response with a AuthnStatement where -# SessionNotOnOrAfter is set in the past -# SP should not accept a Response with a AuthnStatement missing -# SP should not accept an IssueInstant far (24 hours) into the future -# SP should not accept an IssueInstant far (24 hours) into the past -# SP should accept xs:datetime with millisecond precision http://www.w3 -# .org/TR/xmlschema-2/#dateTime -# SP should accept xs:datetime with microsecond precision http://www.w3 -# .org/TR/xmlschema-2/#dateTime -# SP should not accept a Response with a Condition with a empty set of -# Audience. -# SP should not accept a Response with a Condition with a wrong Audience. -# SP should accept a Response with a Condition with an addition Audience -# prepended. -# SP should accept a Response with a Condition with an addition Audience -# appended. -# SP should not accept multiple AudienceRestrictions where the intersection -# is zero. (test 1 of 2) -# SP should not accept multiple AudienceRestrictions where the intersection -# is zero. (test 2 of 2) -# SP should accept multiple AudienceRestrictions where the intersection -# includes the correct audience. -# SP should accept that only the Assertion is signed instead of the Response. -# SP should accept that both the Response and the Assertion is signed. -# Do SP work when RelayState information is lost? -# Do SP accept an unknown Extensions element in the Response? -# SP MUST not accept response when the saml-namespace is invalid -# SP MUST NOT re-use the same ID in subsequent requests. -# SP MUST NOT accept a replayed Response. An identical Response/Assertion -# used a second time. [Profiles]: 4.1.4.5 POST-Specific Processing Rules ( -# test 1 of 2: s inresponseto) -# SP MUST NOT accept a replayed Response. An identical Response/Assertion -# used a second time. [Profiles]: 4.1.4.5 POST-Specific Processing Rules ( -# test 2 of 2: unsolicited response) -# SP SHOULD find attributes in a second AttributeStatement, not only in the -# first. -# SP SHOULD NOT accept an signed assertion embedded in an AttributeValue -# inside an unsigned assertion. -# SP SHOULD NOT accept an signed assertion embedded in an AttributeValue -# inside an unsigned assertion. (Signature moved out...) -# SP SHOULD NOT accept an signed assertion, where the signature is referring -# to another assertion. -# SP SHOULD find attributes in a second Assertion/AttributeStatement, -# not only in one of them (test 1 of 2 - attributes in first). -# SP SHOULD find attributes in a second Assertion/AttributeStatement, -# not only in one of them (test 2 of 2 - attributes in last). -# SP SHOULD NOT accept attributes in unsigned 2nd assertion. (test 1 of 2) -# SP SHOULD NOT accept attributes in unsigned 2nd assertion. (test 2 of 2) -# SP SHOULD NOT accept authnstatement in unsigned 2nd assertion. (test 1 of 2) -# SP SHOULD NOT accept authnstatement in unsigned 2nd assertion. (test 2 of 2) -# Basic SP-initated Logout Test -# Basic IdP-initated Logout Test -# SP MUST NOT accept LogoutRequest when NameID content is wrong -# SP MUST NOT accept LogoutRequest when NameID@Format is wrong -# SP MUST NOT accept LogoutRequest when NameID@SPNameQualifier is wrong -# SP MUST NOT logout user when invalid SessionIndex is sent -# SP MUST NOT accept LogoutRequest when Issuer is wrong -# SP MUST NOT accept LogoutRequest when Destination is wrong -# SP MUST NOT accept unsigned LogoutRequest -# SP MUST accept LogoutRequest with sessionindex in a separate session, -# ot relying on the session-cookie. -# SP MUST accept an LogoutRequest with no sessionindex (sent in separate -# session, no session-cookies) -# SP MUST accept an LogoutRequest with two sesionindexes (first valid) (sent -# in separate session, no session-cookies) -# SP MUST accept an LogoutRequest with two sesionindexes (second valid) ( -# sent in separate session, no session-cookies) -# Session fixtation check \ No newline at end of file