From 5a5ed862a73433a5a09baa0e8a5cbcee73960e6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20R=C3=BChl?= Date: Mon, 7 Aug 2023 14:34:18 +0200 Subject: [PATCH] fix(plc4go/opcua): fixed several small issues in SecureChannel implementation --- plc4go/internal/opcua/MessageCodec.go | 2 +- plc4go/internal/opcua/SecureChannel.go | 160 ++++++++++++------------- 2 files changed, 80 insertions(+), 82 deletions(-) diff --git a/plc4go/internal/opcua/MessageCodec.go b/plc4go/internal/opcua/MessageCodec.go index 962fb4e66eb..29a5c689143 100644 --- a/plc4go/internal/opcua/MessageCodec.go +++ b/plc4go/internal/opcua/MessageCodec.go @@ -127,6 +127,6 @@ func (m *MessageCodec) Receive() (spi.Message, error) { if err != nil { return nil, errors.New("Could not parse pdu") } - + m.log.Debug().Stringer("opcuaAPU", opcuaAPU).Msg("got message") return opcuaAPU, nil } diff --git a/plc4go/internal/opcua/SecureChannel.go b/plc4go/internal/opcua/SecureChannel.go index 03f842003f3..639ff9b859a 100644 --- a/plc4go/internal/opcua/SecureChannel.go +++ b/plc4go/internal/opcua/SecureChannel.go @@ -75,14 +75,9 @@ var ( readWriteModel.NewNullExtension(), false) // Body - INET_ADDRESS_PATTERN = regexp.MustCompile(`(.(?Ptcp))?://` + - `(?P[\\w.-]+)(:` + - `(?P\\d*))?`) + INET_ADDRESS_PATTERN = regexp.MustCompile(`(.(?Ptcp))?://(?P[\w.-]+)(:(?P\d*))?`) - URI_PATTERN = regexp.MustCompile(`^(?Popc)` + - INET_ADDRESS_PATTERN.String() + - `(?P[\\w/=]*)[\\?]?`, - ) + URI_PATTERN = regexp.MustCompile(`^(?Popc)` + INET_ADDRESS_PATTERN.String() + `(?P[\w/=]*)[?]?`) APPLICATION_URI = readWriteModel.NewPascalString("urn:apache:plc4x:client") PRODUCT_URI = readWriteModel.NewPascalString("urn:apache:plc4x:client") APPLICATION_TEXT = readWriteModel.NewPascalString("OPCUA client for the Apache PLC4X:PLC4J project") @@ -320,7 +315,7 @@ func (s *SecureChannel) onConnect(ctx context.Context, connection *Connection, c opcuaAPU := message.(readWriteModel.OpcuaAPU) messagePDU := opcuaAPU.GetMessage() opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse) - s.onConnectOpenSecureChannel(ctx, connection, ch, opcuaAcknowledgeResponse) + go s.onConnectOpenSecureChannel(ctx, connection, ch, opcuaAcknowledgeResponse) return nil }, func(err error) error { @@ -463,13 +458,14 @@ func (s *SecureChannel) onConnectOpenSecureChannel(ctx context.Context, connecti Uint32("statusCode", statusCode). Stringer("statusCodeByValue", statusCodeByValue). Msg("Failed to connect to opc ua server for the following reason") - } else { - s.log.Debug().Msg("Got Secure Response Connection Response") - openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse) - s.tokenId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())) // TODO: strange that int32 and uint32 missmatch - s.channelId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId())) - s.onConnectCreateSessionRequest(ctx, connection, ch) + connection.fireConnectionError(errors.New("service fault received"), ch) + return nil } + s.log.Debug().Msg("Got Secure Response Connection Response") + openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse) + s.tokenId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())) // TODO: strange that int32 and uint32 missmatch + s.channelId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId())) + go s.onConnectCreateSessionRequest(ctx, connection, ch) return nil }, func(err error) error { @@ -571,26 +567,27 @@ func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, conne Uint32("statusCode", statusCode). Stringer("statusCodeByValue", statusCodeByValue). Msg("Failed to connect to opc ua server for the following reason") - } else { - s.log.Debug().Msg("Got Create Session Response Connection Response") + connection.fireConnectionError(errors.New("service fault received"), ch) + return + } + s.log.Debug().Msg("Got Create Session Response Connection Response") - extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false) - if err != nil { - s.log.Error().Err(err).Msg("error parsing") - return - } - unknownExtensionObject := extensionObject.GetBody() - if responseMessage, ok := unknownExtensionObject.(readWriteModel.CreateSessionResponseExactly); ok { - s.authenticationToken = responseMessage.GetAuthenticationToken().GetNodeId() + extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false) + if err != nil { + s.log.Error().Err(err).Msg("error parsing") + return + } + unknownExtensionObject := extensionObject.GetBody() + if responseMessage, ok := unknownExtensionObject.(readWriteModel.CreateSessionResponseExactly); ok { + s.authenticationToken = responseMessage.GetAuthenticationToken().GetNodeId() - s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, message.GetBody().(readWriteModel.CreateSessionResponse)) - } else { - serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault) - header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader) - s.log.Error(). - Stringer("serviceResult", header.GetServiceResult()). - Msg("Subscription ServiceFault returned from server with error code, '%s'") - } + go s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, message.GetBody().(readWriteModel.CreateSessionResponse)) + } else { + serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault) + header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader) + s.log.Error(). + Stringer("serviceResult", header.GetServiceResult()). + Msg("Subscription ServiceFault returned from server with error code, '%s'") } } @@ -694,34 +691,35 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, con Uint32("statusCode", statusCode). Stringer("statusCodeByValue", statusCodeByValue). Msg("Failed to connect to opc ua server for the following reason") - } else { - s.log.Debug().Msg("Got Activate Session Response Connection Response") - - extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false) - if err != nil { - s.log.Error().Err(err).Msg("error parsing") - return - } - unknownExtensionObject := extensionObject.GetBody() - if responseMessage, ok := unknownExtensionObject.(readWriteModel.ActivateSessionResponseExactly); ok { - returnedRequestHandle := responseMessage.GetResponseHeader().(readWriteModel.ResponseHeader).GetRequestHandle() - if !(requestHandle == returnedRequestHandle) { - s.log.Error(). - Uint32("requestHandle", requestHandle). - Uint32("returnedRequestHandle", returnedRequestHandle). - Msg("Request handle isn't as expected, we might have missed a packet. requestHandle != returnedRequestHandle") - } + connection.fireConnectionError(errors.New("service fault received"), ch) + return + } + s.log.Debug().Msg("Got Activate Session Response Connection Response") - // Send an event that connection setup is complete. - s.keepAlive() - connection.fireConnected(ch) - } else { - serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault) - header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader) + extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false) + if err != nil { + s.log.Error().Err(err).Msg("error parsing") + return + } + unknownExtensionObject := extensionObject.GetBody() + if responseMessage, ok := unknownExtensionObject.(readWriteModel.ActivateSessionResponseExactly); ok { + returnedRequestHandle := responseMessage.GetResponseHeader().(readWriteModel.ResponseHeader).GetRequestHandle() + if !(requestHandle == returnedRequestHandle) { s.log.Error(). - Stringer("serviceResult", header.GetServiceResult()). - Msg("Subscription ServiceFault returned from server with error code") + Uint32("requestHandle", requestHandle). + Uint32("returnedRequestHandle", returnedRequestHandle). + Msg("Request handle isn't as expected, we might have missed a packet. requestHandle != returnedRequestHandle") } + + // Send an event that connection setup is complete. + s.keepAlive() + connection.fireConnected(ch) + } else { + serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault) + header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader) + s.log.Error(). + Stringer("serviceResult", header.GetServiceResult()). + Msg("Subscription ServiceFault returned from server with error code") } } @@ -789,24 +787,24 @@ func (s *SecureChannel) onDisconnect(ctx context.Context, connection *Connection Uint32("statusCode", statusCode). Stringer("statusCodeByValue", statusCodeByValue). Msg("Failed to connect to opc ua server for the following reason") - } else { - s.log.Debug().Msg("Got Close Session Response Connection Response") + return + } + s.log.Debug().Msg("Got Close Session Response Connection Response") - extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false) - if err != nil { - s.log.Error().Err(err).Msg("error parsing") - return - } - unknownExtensionObject := extensionObject.GetBody() - if responseMessage, ok := unknownExtensionObject.(readWriteModel.CloseSessionResponseExactly); ok { - s.onDisconnectCloseSecureChannel(ctx, connection, responseMessage, message.GetBody().(readWriteModel.CloseSessionResponse)) - } else { - serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault) - header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader) - s.log.Error(). - Stringer("serviceResult", header.GetServiceResult()). - Msg("Subscription ServiceFault returned from server with error code") - } + extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false) + if err != nil { + s.log.Error().Err(err).Msg("error parsing") + return + } + unknownExtensionObject := extensionObject.GetBody() + if responseMessage, ok := unknownExtensionObject.(readWriteModel.CloseSessionResponseExactly); ok { + go s.onDisconnectCloseSecureChannel(ctx, connection, responseMessage, message.GetBody().(readWriteModel.CloseSessionResponse)) + } else { + serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault) + header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader) + s.log.Error(). + Stringer("serviceResult", header.GetServiceResult()). + Msg("Subscription ServiceFault returned from server with error code") } } @@ -937,7 +935,7 @@ func (s *SecureChannel) onDiscover(ctx context.Context, codec *MessageCodec) { messagePDU := opcuaAPU.GetMessage() opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse) s.log.Trace().Stringer("opcuaAcknowledgeResponse", opcuaAcknowledgeResponse).Msg("Got Hello Response Connection Response") - s.onDiscoverOpenSecureChannel(ctx, codec, opcuaAcknowledgeResponse) + go s.onDiscoverOpenSecureChannel(ctx, codec, opcuaAcknowledgeResponse) return nil }, func(err error) error { @@ -1049,11 +1047,11 @@ func (s *SecureChannel) onDiscoverOpenSecureChannel(ctx context.Context, codec * Uint32("statusCode", statusCode). Stringer("statusCodeByValue", statusCodeByValue). Msg("Failed to connect to opc ua server for the following reason") - } else { - s.log.Debug().Msg("Got Secure Response Connection Response") - openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse) - s.onDiscoverGetEndpointsRequest(ctx, codec, opcuaOpenResponse, openSecureChannelResponse) + return nil } + s.log.Debug().Msg("Got Secure Response Connection Response") + openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse) + go s.onDiscoverGetEndpointsRequest(ctx, codec, opcuaOpenResponse, openSecureChannelResponse) return nil }, func(err error) error { @@ -1192,7 +1190,7 @@ func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec digest := sha1.Sum(s.configuration.senderCertificate) s.thumbprint = readWriteModel.NewPascalByteString(int32(len(digest)), digest[:]) - s.onDiscoverCloseSecureChannel(ctx, codec, response) + go s.onDiscoverCloseSecureChannel(ctx, codec, response) } return nil }, @@ -1523,7 +1521,7 @@ func (s *SecureChannel) isEndpoint(endpoint readWriteModel.EndpointDescription) // Split up the connection string into its individual segments. matches := utils.GetSubgroupMatches(URI_PATTERN, endpoint.GetEndpointUrl().GetStringValue()) if len(matches) == 0 { - s.log.Error().Msg("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'") + s.log.Error().Stringer("endpoint", endpoint).Msg("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'") return false } s.log.Trace().