Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing SecurityLevel of the EndpointDescription #1360

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion asyncua/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,63 @@ async def _setup_server_nodes(self):
ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key,
permission_ruleset=self._permission_ruleset))


@staticmethod
def lookup_security_level_for_policy_type( security_policy_type: ua.SecurityPolicyType ) -> ua.Byte:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if server.py is the right location for the two helper methods. The helpers are only relevant for the Server and server tests.
Maybe the there is better location in an other file with more helpers?

Considered the following locations:

  • uatypes: ua.SecurityPolicyType is implemented in there, but only contains type related stuff
  • uacrypto: looking at the other members is doesn't belong here also

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure either. we just leave it here..

"""Returns the security level for an ua.SecurityPolicyType.

This is endpoint & server implementation specific!

Returns:
ua.Byte: the found security level
"""

return ua.Byte({
ua.SecurityPolicyType.NoSecurity : 0,
ua.SecurityPolicyType.Basic128Rsa15_Sign : 1,
ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt : 2,
ua.SecurityPolicyType.Basic256_Sign : 11,
ua.SecurityPolicyType.Basic256_SignAndEncrypt : 21,
ua.SecurityPolicyType.Basic256Sha256_Sign : 50,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt : 70,
ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign : 55,
ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt : 75
}[security_policy_type])


@staticmethod
def determine_security_level(security_policy_uri:str, security_mode: ua.MessageSecurityMode) -> ua.Byte:
"""Determine the security level of an EndPoint.
The security level indicates how secure an EndPoint is, compared to other EndPoints of the same server.
Value 0 is a special value; EndPoint isn't recommended, typical for ua.MessageSecurityMode.None_.

See Part 4 7.10

To the determine the level the value of ua.SecurityPolicyType is used.
The enum values already correspond to and
The Enum ua.SecurityPolicyType already contains a value per Enum entry that already correspond to


Args:
security_policy (ua.SecurityPolicy): the used policy
security_mode (ua.MessageSecurityMode): the used security mode for the messages.

Returns:
ua.Byte: the returned security level
"""
security_level: ua.Byte = ua.Byte(0)

if security_mode != ua.MessageSecurityMode.None_:
security_policy_name = f'{security_policy_uri.split("#")[1].replace("_","")}_{security_mode.name}'

try:
security_policy_type: ua.SecurityPolicyType = ua.SecurityPolicyType[security_policy_name]
security_level = Server.lookup_security_level_for_policy_type(security_policy_type)
except KeyError:
_logger.error('"%s" isn\'t a valid security policy', security_policy_name)

return security_level

def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []
supported_token_classes = []
Expand Down Expand Up @@ -442,7 +499,7 @@ def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.N
edp.SecurityPolicyUri = policy.URI
edp.UserIdentityTokens = idtokens
edp.TransportProfileUri = "http://opcfoundation.org/UA-Profile/Transport/uatcp-uasc-uabinary"
edp.SecurityLevel = 0
edp.SecurityLevel = Server.determine_security_level(policy.URI, mode)
self.iserver.add_endpoint(edp)
self.iserver.supported_tokens = tuple(supported_token_classes)

Expand Down
24 changes: 24 additions & 0 deletions tests/test_crypto_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,27 @@ async def test_anonymous_rejection():
with pytest.raises(ua.uaerrors.BadIdentityTokenRejected):
await clt.connect()
await srv.stop()

async def test_security_level_all():
assert Server.determine_security_level(ua.SecurityPolicy.URI, ua.MessageSecurityMode.None_) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.NoSecurity)

assert Server.determine_security_level(security_policies.SecurityPolicyBasic256Sha256.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256Sha256_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic256Sha256.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt)

assert Server.determine_security_level(security_policies.SecurityPolicyAes128Sha256RsaOaep.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyAes128Sha256RsaOaep.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt)

# For the sake of completeness also the old, not recommended, protocols Basic128Rsa15 and Basic256
assert Server.determine_security_level(security_policies.SecurityPolicyBasic128Rsa15.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic128Rsa15_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic128Rsa15.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt)

assert Server.determine_security_level(security_policies.SecurityPolicyBasic256.URI, ua.MessageSecurityMode.Sign) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256_Sign)
assert Server.determine_security_level(security_policies.SecurityPolicyBasic256.URI, ua.MessageSecurityMode.SignAndEncrypt) == Server.lookup_security_level_for_policy_type(ua.SecurityPolicyType.Basic256_SignAndEncrypt)

async def test_security_level_endpoints(srv_crypto_all_certs):
srv = srv_crypto_all_certs[0]

end_points: list[ua.EndpointDescription] = await srv.get_endpoints()

for end_point in end_points:
assert end_point.SecurityLevel == Server.determine_security_level(end_point.SecurityPolicyUri, end_point.SecurityMode)