-
Notifications
You must be signed in to change notification settings - Fork 1
/
client_base_spid.py
286 lines (251 loc) · 11.8 KB
/
client_base_spid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import saml2
from saml2 import BINDING_PAOS, saml, attributemaps, samlp, SamlBase, NAMESPACE
from saml2.client_base import Base
from saml2.extension import sp_type, requested_attributes
from saml2.samlp import AuthnRequest
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.samlp import RequestedAuthnContext, Extensions
class SpidBase(Base):
def create_authn_request_spid(self, destination, vorg="", scoping=None,
binding=saml2.BINDING_HTTP_POST,
nameid_format=None,
service_url_binding=None, message_id=0,
consent=None, extensions=None, sign=None,
allow_create=None, sign_prepare=False, sign_alg=None,
digest_alg=None, **kwargs):
""" Creates an authentication request.
:param destination: Where the request should be sent.
:param vorg: The virtual organization the service belongs to.
:param scoping: The scope of the request
:param binding: The protocol to use for the Response !!
:param nameid_format: Format of the NameID
:param service_url_binding: Where the reply should be sent dependent
on reply binding.
:param message_id: The identifier for this request
:param consent: Whether the principal have given her consent
:param extensions: Possible extensions
:param sign: Whether the request should be signed or not.
:param sign_prepare: Whether the signature should be prepared or not.
:param allow_create: If the identity provider is allowed, in the course
of fulfilling the request, to create a new identifier to represent
the principal.
:param kwargs: Extra key word arguments
:return: tuple of request ID and <samlp:AuthnRequest> instance
"""
client_crt = None
if "client_crt" in kwargs:
client_crt = kwargs["client_crt"]
args = {}
if self.config.getattr('hide_assertion_consumer_service', 'sp'):
args["assertion_consumer_service_url"] = None
binding = None
else:
try:
args["assertion_consumer_service_url"] = kwargs[
"assertion_consumer_service_urls"][0]
del kwargs["assertion_consumer_service_urls"]
except KeyError:
try:
args["assertion_consumer_service_url"] = kwargs[
"assertion_consumer_service_url"]
del kwargs["assertion_consumer_service_url"]
except KeyError:
try:
args["assertion_consumer_service_index"] = str(
kwargs["assertion_consumer_service_index"])
del kwargs["assertion_consumer_service_index"]
except KeyError:
if service_url_binding is None:
service_urls = self.service_urls(binding)
else:
service_urls = self.service_urls(service_url_binding)
args["assertion_consumer_service_url"] = service_urls[0]
try:
args["provider_name"] = kwargs["provider_name"]
except KeyError:
if binding == BINDING_PAOS:
pass
else:
args["provider_name"] = self._my_name()
# Allow argument values either as class instances or as dictionaries
# all of these have cardinality 0..1
_msg = AuthnRequest()
for param in ["scoping", "requested_authn_context", "conditions",
"subject"]:
try:
_item = kwargs[param]
except KeyError:
pass
else:
del kwargs[param]
# either class instance or argument dictionary
if isinstance(_item, _msg.child_class(param)):
args[param] = _item
elif isinstance(_item, dict):
args[param] = RequestedAuthnContext(**_item)
else:
raise ValueError("%s or wrong type expected %s" % (_item,
param))
try:
args["name_id_policy"] = kwargs["name_id_policy"]
del kwargs["name_id_policy"]
except KeyError:
if allow_create is None:
allow_create = self.config.getattr("name_id_format_allow_create", "sp")
if allow_create is None:
allow_create = "false"
else:
if allow_create is True:
allow_create = "true"
else:
allow_create = "false"
if nameid_format == "":
name_id_policy = None
else:
if nameid_format is None:
nameid_format = self.config.getattr("name_id_format", "sp")
# If no nameid_format has been set in the configuration
# or passed in then transient is the default.
if nameid_format is None:
nameid_format = NAMEID_FORMAT_TRANSIENT
# If a list has been configured or passed in choose the
# first since NameIDPolicy can only have one format specified.
elif isinstance(nameid_format, list):
nameid_format = nameid_format[0]
# Allow a deployer to signal that no format should be specified
# in the NameIDPolicy by passing in or configuring the string 'None'.
elif nameid_format == 'None':
nameid_format = None
name_id_policy = NameIDPolicySpid(format=nameid_format)
if name_id_policy and vorg:
try:
name_id_policy.sp_name_qualifier = vorg
name_id_policy.format = saml.NAMEID_FORMAT_PERSISTENT
except KeyError:
pass
args["name_id_policy"] = name_id_policy
try:
nsprefix = kwargs["nsprefix"]
except KeyError:
nsprefix = None
try:
force_authn = kwargs['force_authn']
except KeyError:
force_authn = self.config.getattr('force_authn', 'sp')
finally:
if force_authn:
args['force_authn'] = 'true'
conf_sp_type = self.config.getattr('sp_type', 'sp')
conf_sp_type_in_md = self.config.getattr('sp_type_in_metadata', 'sp')
if conf_sp_type and conf_sp_type_in_md is False:
if not extensions:
extensions = Extensions()
item = sp_type.SPType(text=conf_sp_type)
extensions.add_extension_element(item)
requested_attrs = self.config.getattr('requested_attributes', 'sp')
if requested_attrs:
if not extensions:
extensions = Extensions()
attributemapsmods = []
for modname in attributemaps.__all__:
attributemapsmods.append(getattr(attributemaps, modname))
items = []
for attr in requested_attrs:
friendly_name = attr.get('friendly_name')
name = attr.get('name')
name_format = attr.get('name_format')
is_required = str(attr.get('required', False)).lower()
if not name and not friendly_name:
raise ValueError(
"Missing required attribute: '{}' or '{}'".format(
'name', 'friendly_name'))
if not name:
for mod in attributemapsmods:
try:
name = mod.MAP['to'][friendly_name]
except KeyError:
continue
else:
if not name_format:
name_format = mod.MAP['identifier']
break
if not friendly_name:
for mod in attributemapsmods:
try:
friendly_name = mod.MAP['fro'][name]
except KeyError:
continue
else:
if not name_format:
name_format = mod.MAP['identifier']
break
items.append(requested_attributes.RequestedAttribute(
is_required=is_required,
name_format=name_format,
friendly_name=friendly_name,
name=name))
item = requested_attributes.RequestedAttributes(
extension_elements=items)
extensions.add_extension_element(item)
if kwargs:
_args, extensions = self._filter_args(AuthnRequest(), extensions,
**kwargs)
args.update(_args)
try:
del args["id"]
except KeyError:
pass
if sign is None:
sign = self.authn_requests_signed
if (sign and self.sec.cert_handler.generate_cert()) or \
client_crt is not None:
with self.lock:
self.sec.cert_handler.update_cert(True, client_crt)
if client_crt is not None:
sign_prepare = True
return self._message(AuthnRequest, destination, message_id,
consent, extensions, sign, sign_prepare,
protocol_binding=binding,
scoping=scoping, nsprefix=nsprefix,
sign_alg=sign_alg, digest_alg=digest_alg,
**args)
return self._message(AuthnRequest, destination, message_id, consent,
extensions, sign, sign_prepare,
protocol_binding=binding,
scoping=scoping, nsprefix=nsprefix,
sign_alg=sign_alg, digest_alg=digest_alg, **args)
class Saml2ClientSpid(SpidBase):
def __init__(self, *args, **kwargs):
super(Saml2ClientSpid, self).__init__(*args, **kwargs)
class NameIDPolicyTypeSpid_(SamlBase):
"""The urn:oasis:names:tc:SAML:2.0:protocol:NameIDPolicyType element """
c_tag = 'NameIDPolicyType'
c_namespace = NAMESPACE
c_children = SamlBase.c_children.copy()
c_attributes = SamlBase.c_attributes.copy()
c_child_order = SamlBase.c_child_order[:]
c_cardinality = SamlBase.c_cardinality.copy()
c_attributes['Format'] = ('format', 'anyURI', False)
c_attributes['SPNameQualifier'] = ('sp_name_qualifier', 'string', False)
def __init__(self,
format=None,
sp_name_qualifier=None,
text=None,
extension_elements=None,
extension_attributes=None,
):
SamlBase.__init__(self,
text=text,
extension_elements=extension_elements,
extension_attributes=extension_attributes,
)
self.format = format
self.sp_name_qualifier = sp_name_qualifier
class NameIDPolicySpid(NameIDPolicyTypeSpid_):
"""The urn:oasis:names:tc:SAML:2.0:protocol:NameIDPolicy element """
c_tag = 'NameIDPolicy'
c_namespace = samlp.NAMESPACE
c_children = NameIDPolicyTypeSpid_.c_children.copy()
c_attributes = NameIDPolicyTypeSpid_.c_attributes.copy()
c_child_order = NameIDPolicyTypeSpid_.c_child_order[:]
c_cardinality = NameIDPolicyTypeSpid_.c_cardinality.copy()