Skip to content

Commit

Permalink
fix(plc4go/opcua): fixed several small issues in SecureChannel implem…
Browse files Browse the repository at this point in the history
…entation
  • Loading branch information
sruehl committed Aug 7, 2023
1 parent f1f08fb commit 5a5ed86
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 82 deletions.
2 changes: 1 addition & 1 deletion plc4go/internal/opcua/MessageCodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
if err != nil {
return nil, errors.New("Could not parse pdu")
}

m.log.Debug().Stringer("opcuaAPU", opcuaAPU).Msg("got message")
return opcuaAPU, nil
}
160 changes: 79 additions & 81 deletions plc4go/internal/opcua/SecureChannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,9 @@ var (
readWriteModel.NewNullExtension(),
false) // Body

INET_ADDRESS_PATTERN = regexp.MustCompile(`(.(?P<transportCode>tcp))?://` +
`(?P<transportHost>[\\w.-]+)(:` +
`(?P<transportPort>\\d*))?`)
INET_ADDRESS_PATTERN = regexp.MustCompile(`(.(?P<transportCode>tcp))?://(?P<transportHost>[\w.-]+)(:(?P<transportPort>\d*))?`)

URI_PATTERN = regexp.MustCompile(`^(?P<protocolCode>opc)` +
INET_ADDRESS_PATTERN.String() +
`(?P<transportEndpoint>[\\w/=]*)[\\?]?`,
)
URI_PATTERN = regexp.MustCompile(`^(?P<protocolCode>opc)` + INET_ADDRESS_PATTERN.String() + `(?P<transportEndpoint>[\w/=]*)[?]?`)
APPLICATION_URI = readWriteModel.NewPascalString("urn:apache:plc4x:client")
PRODUCT_URI = readWriteModel.NewPascalString("urn:apache:plc4x:client")
APPLICATION_TEXT = readWriteModel.NewPascalString("OPCUA client for the Apache PLC4X:PLC4J project")
Expand Down Expand Up @@ -320,7 +315,7 @@ func (s *SecureChannel) onConnect(ctx context.Context, connection *Connection, c
opcuaAPU := message.(readWriteModel.OpcuaAPU)
messagePDU := opcuaAPU.GetMessage()
opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
s.onConnectOpenSecureChannel(ctx, connection, ch, opcuaAcknowledgeResponse)
go s.onConnectOpenSecureChannel(ctx, connection, ch, opcuaAcknowledgeResponse)
return nil
},
func(err error) error {
Expand Down Expand Up @@ -463,13 +458,14 @@ func (s *SecureChannel) onConnectOpenSecureChannel(ctx context.Context, connecti
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
s.tokenId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())) // TODO: strange that int32 and uint32 missmatch
s.channelId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId()))
s.onConnectCreateSessionRequest(ctx, connection, ch)
connection.fireConnectionError(errors.New("service fault received"), ch)
return nil
}
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
s.tokenId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetTokenId())) // TODO: strange that int32 and uint32 missmatch
s.channelId.Store(int32(openSecureChannelResponse.GetSecurityToken().(readWriteModel.ChannelSecurityToken).GetChannelId()))
go s.onConnectCreateSessionRequest(ctx, connection, ch)
return nil
},
func(err error) error {
Expand Down Expand Up @@ -571,26 +567,27 @@ func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, conne
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Create Session Response Connection Response")
connection.fireConnectionError(errors.New("service fault received"), ch)
return
}
s.log.Debug().Msg("Got Create Session Response Connection Response")

extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.CreateSessionResponseExactly); ok {
s.authenticationToken = responseMessage.GetAuthenticationToken().GetNodeId()
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.CreateSessionResponseExactly); ok {
s.authenticationToken = responseMessage.GetAuthenticationToken().GetNodeId()

s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, message.GetBody().(readWriteModel.CreateSessionResponse))
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code, '%s'")
}
go s.onConnectActivateSessionRequest(ctx, connection, ch, responseMessage, message.GetBody().(readWriteModel.CreateSessionResponse))
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code, '%s'")
}
}

Expand Down Expand Up @@ -694,34 +691,35 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, con
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Activate Session Response Connection Response")

extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.ActivateSessionResponseExactly); ok {
returnedRequestHandle := responseMessage.GetResponseHeader().(readWriteModel.ResponseHeader).GetRequestHandle()
if !(requestHandle == returnedRequestHandle) {
s.log.Error().
Uint32("requestHandle", requestHandle).
Uint32("returnedRequestHandle", returnedRequestHandle).
Msg("Request handle isn't as expected, we might have missed a packet. requestHandle != returnedRequestHandle")
}
connection.fireConnectionError(errors.New("service fault received"), ch)
return
}
s.log.Debug().Msg("Got Activate Session Response Connection Response")

// Send an event that connection setup is complete.
s.keepAlive()
connection.fireConnected(ch)
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.ActivateSessionResponseExactly); ok {
returnedRequestHandle := responseMessage.GetResponseHeader().(readWriteModel.ResponseHeader).GetRequestHandle()
if !(requestHandle == returnedRequestHandle) {
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
Uint32("requestHandle", requestHandle).
Uint32("returnedRequestHandle", returnedRequestHandle).
Msg("Request handle isn't as expected, we might have missed a packet. requestHandle != returnedRequestHandle")
}

// Send an event that connection setup is complete.
s.keepAlive()
connection.fireConnected(ch)
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
}
}

Expand Down Expand Up @@ -789,24 +787,24 @@ func (s *SecureChannel) onDisconnect(ctx context.Context, connection *Connection
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Close Session Response Connection Response")
return
}
s.log.Debug().Msg("Got Close Session Response Connection Response")

extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.CloseSessionResponseExactly); ok {
s.onDisconnectCloseSecureChannel(ctx, connection, responseMessage, message.GetBody().(readWriteModel.CloseSessionResponse))
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
}
extensionObject, err := readWriteModel.ExtensionObjectParseWithBuffer(ctx, utils.NewReadBufferByteBased(opcuaResponse, utils.WithByteOrderForReadBufferByteBased(binary.LittleEndian)), false)
if err != nil {
s.log.Error().Err(err).Msg("error parsing")
return
}
unknownExtensionObject := extensionObject.GetBody()
if responseMessage, ok := unknownExtensionObject.(readWriteModel.CloseSessionResponseExactly); ok {
go s.onDisconnectCloseSecureChannel(ctx, connection, responseMessage, message.GetBody().(readWriteModel.CloseSessionResponse))
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
}
}

Expand Down Expand Up @@ -937,7 +935,7 @@ func (s *SecureChannel) onDiscover(ctx context.Context, codec *MessageCodec) {
messagePDU := opcuaAPU.GetMessage()
opcuaAcknowledgeResponse := messagePDU.(readWriteModel.OpcuaAcknowledgeResponse)
s.log.Trace().Stringer("opcuaAcknowledgeResponse", opcuaAcknowledgeResponse).Msg("Got Hello Response Connection Response")
s.onDiscoverOpenSecureChannel(ctx, codec, opcuaAcknowledgeResponse)
go s.onDiscoverOpenSecureChannel(ctx, codec, opcuaAcknowledgeResponse)
return nil
},
func(err error) error {
Expand Down Expand Up @@ -1049,11 +1047,11 @@ func (s *SecureChannel) onDiscoverOpenSecureChannel(ctx context.Context, codec *
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
s.onDiscoverGetEndpointsRequest(ctx, codec, opcuaOpenResponse, openSecureChannelResponse)
return nil
}
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
go s.onDiscoverGetEndpointsRequest(ctx, codec, opcuaOpenResponse, openSecureChannelResponse)
return nil
},
func(err error) error {
Expand Down Expand Up @@ -1192,7 +1190,7 @@ func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec
digest := sha1.Sum(s.configuration.senderCertificate)
s.thumbprint = readWriteModel.NewPascalByteString(int32(len(digest)), digest[:])

s.onDiscoverCloseSecureChannel(ctx, codec, response)
go s.onDiscoverCloseSecureChannel(ctx, codec, response)
}
return nil
},
Expand Down Expand Up @@ -1523,7 +1521,7 @@ func (s *SecureChannel) isEndpoint(endpoint readWriteModel.EndpointDescription)
// Split up the connection string into its individual segments.
matches := utils.GetSubgroupMatches(URI_PATTERN, endpoint.GetEndpointUrl().GetStringValue())
if len(matches) == 0 {
s.log.Error().Msg("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'")
s.log.Error().Stringer("endpoint", endpoint).Msg("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'")
return false
}
s.log.Trace().
Expand Down

0 comments on commit 5a5ed86

Please sign in to comment.