Skip to content

Commit

Permalink
refactor(plc4go/opcua): use keyed logging
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Aug 1, 2023
1 parent da34d61 commit a5a5e94
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 58 deletions.
2 changes: 1 addition & 1 deletion plc4go/internal/opcua/Configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func getFromOptions(localLog zerolog.Logger, options map[string][]string, key st
return ""
}
if len(optionValues) > 1 {
localLog.Warn().Msgf("Options %s must be unique", key)
localLog.Warn().Str("key", key).Msg("Options must be unique")
}
return optionValues[0]
}
Expand Down
8 changes: 4 additions & 4 deletions plc4go/internal/opcua/Connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRe
func (c *Connection) addSubscriber(subscriber *Subscriber) {
for _, sub := range c.subscribers {
if sub == subscriber {
c.log.Debug().Msgf("Subscriber %v already added", subscriber)
c.log.Debug().Msgf("Subscriber already added\n%s", subscriber)
return
}
}
Expand Down Expand Up @@ -214,16 +214,16 @@ func (c *Connection) startSubscriptionHandler() {
// TODO: dispatch subs
/*
for monitoredSal := range c.messageCodec.monitoredSALs {
salLogger.Trace().Msgf("got a SAL\n%v", monitoredSal)
salLogger.Trace().Msg("got a SAL\n%v", monitoredSal)
handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
salLogger.Debug().Msgf("\n%v handled\n%s", subscriber, monitoredSal)
salLogger.Debug().Msg("\n%v handled\n%s", subscriber, monitoredSal)
handled = true
}
}
if !handled {
salLogger.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
salLogger.Debug().Msg("SAL was not handled:\n%s", monitoredSal)
}
}*/
}
Expand Down
10 changes: 7 additions & 3 deletions plc4go/internal/opcua/Driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ func NewDriver(_options ...options.WithOption) plc4go.PlcDriver {
}

func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.URL, transports map[string]transports.Transport, driverOptions map[string][]string) <-chan plc4go.PlcConnectionConnectResult {
m.log.Debug().Stringer("transportUrl", &transportUrl).Msgf("Get connection for transport url with %d transport(s) and %d option(s)", len(transports), len(driverOptions))
m.log.Debug().
Stringer("transportUrl", &transportUrl).
Int("numberTransports", len(transports)).
Int("numberDriverOptions", len(driverOptions)).
Msg("Get connection for transport url")

// Get the transport specified in the url
transport, ok := transports[transportUrl.Scheme]
if !ok {
m.log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't find a transport for scheme %s", transportUrl.Scheme)
m.log.Error().Stringer("transportUrl", &transportUrl).Str("scheme", transportUrl.Scheme).Msg("We couldn't find a transport for scheme")
return m.reportError(errors.Errorf("couldn't find transport for given transport url %v", transportUrl))
}

Expand All @@ -80,7 +84,7 @@ func (m *Driver) GetConnectionWithContext(ctx context.Context, transportUrl url.
append(m._options, options.WithCustomLogger(m.log))...,
)
if err != nil {
m.log.Error().Err(err).Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", driverOptions["defaultTcpPort"])
m.log.Error().Err(err).Stringer("transportUrl", &transportUrl).Strs("defaultTcpPort", driverOptions["defaultTcpPort"]).Msg("We couldn't create a transport instance for port")
return m.reportError(errors.Wrapf(err, "couldn't initialize transport configuration for given transport url %s", transportUrl.String()))
}

Expand Down
2 changes: 1 addition & 1 deletion plc4go/internal/opcua/EncryptionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (h *EncryptionHandler) encodeMessage(ctx context.Context, pdu readWriteMode
}

func (h *EncryptionHandler) decodeMessage(ctx context.Context, pdu readWriteModel.OpcuaAPU) (readWriteModel.OpcuaAPU, error) {
h.log.Info().Msgf("Decoding Message with Security policy %s", h.securityPolicy)
h.log.Info().Str("securityPolicy", h.securityPolicy).Msg("Decoding Message with Security policy")
switch h.securityPolicy {
case "None":
return pdu, nil
Expand Down
100 changes: 63 additions & 37 deletions plc4go/internal/opcua/SecureChannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDi
} else {
messageBuffer = opcuaResponse.GetMessage()
if !(s.senderSequenceNumber.Add(1) == (opcuaResponse.GetSequenceNumber())) {
s.log.Error().Msgf("Sequence number isn't as expected, we might have missed a packet. - %d != %d", s.senderSequenceNumber.Add(1), opcuaResponse.GetSequenceNumber())
s.log.Error().
Int32("senderSequenceNumber", s.senderSequenceNumber.Load()).
Int32("responseSequenceNumber", opcuaResponse.GetSequenceNumber()).
Msg("Sequence number isn't as expected, we might have missed a packet. - senderSequenceNumber != responseSequenceNumber")
codec.fireDisconnected()
}
}
Expand Down Expand Up @@ -263,7 +266,7 @@ func (s *SecureChannel) submit(ctx context.Context, codec *MessageCodec, errorDi
}
}

s.log.Debug().Msgf("Submitting Transaction to TransactionManager %v", transactionId)
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting Transaction to TransactionManager")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
Expand Down Expand Up @@ -439,9 +442,10 @@ func (s *SecureChannel) onConnectOpenSecureChannel(ctx context.Context, codec *M
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().Msgf("Failed to connect to opc ua server for the following reason:- %v, %v",
statusCode,
statusCodeByValue)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
Expand All @@ -460,7 +464,7 @@ func (s *SecureChannel) onConnectOpenSecureChannel(ctx context.Context, codec *M
s.log.Debug().Err(err).Msg("a error")
}
}
s.log.Debug().Msgf("Submitting OpenSecureChannel with id of %d", transactionId)
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting OpenSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
Expand Down Expand Up @@ -540,9 +544,10 @@ func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, codec
if fault, ok := message.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().Msgf("Failed to connect to opc ua server for the following reason:- %v, %v",
statusCode,
statusCodeByValue)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Create Session Response Connection Response")

Expand All @@ -559,7 +564,9 @@ func (s *SecureChannel) onConnectCreateSessionRequest(ctx context.Context, codec
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().Msgf("Subscription ServiceFault returned from server with error code, '%s'", header.GetServiceResult())
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code, '%s'")
}
}
}
Expand Down Expand Up @@ -655,9 +662,10 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, cod
if fault, ok := message.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().Msgf("Failed to connect to opc ua server for the following reason:- %v, %v",
statusCode,
statusCodeByValue)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Activate Session Response Connection Response")

Expand All @@ -670,7 +678,10 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, cod
if responseMessage, ok := unknownExtensionObject.(readWriteModel.ActivateSessionResponseExactly); ok {
returnedRequestHandle := responseMessage.GetResponseHeader().(readWriteModel.ResponseHeader).GetRequestHandle()
if !(requestHandle == returnedRequestHandle) {
s.log.Error().Msgf("Request handle isn't as expected, we might have missed a packet. %d != %d", requestHandle, returnedRequestHandle)
s.log.Error().
Uint32("requestHandle", requestHandle).
Uint32("returnedRequestHandle", returnedRequestHandle).
Msg("Request handle isn't as expected, we might have missed a packet. requestHandle != returnedRequestHandle")
}

// Send an event that connection setup is complete.
Expand All @@ -679,7 +690,9 @@ func (s *SecureChannel) onConnectActivateSessionRequest(ctx context.Context, cod
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().Msgf("Subscription ServiceFault returned from server with error code, '%s'", header.GetServiceResult())
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
}
}
}
Expand Down Expand Up @@ -738,9 +751,10 @@ func (s *SecureChannel) onDisconnect(ctx context.Context, codec *MessageCodec) {
if fault, ok := message.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().Msgf("Failed to connect to opc ua server for the following reason:- %v, %v",
statusCode,
statusCodeByValue)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Close Session Response Connection Response")

Expand All @@ -755,7 +769,9 @@ func (s *SecureChannel) onDisconnect(ctx context.Context, codec *MessageCodec) {
} else {
serviceFault := unknownExtensionObject.(readWriteModel.ServiceFault)
header := serviceFault.GetResponseHeader().(readWriteModel.ResponseHeader)
s.log.Error().Msgf("Subscription ServiceFault returned from server with error code, '%s'", header.GetServiceResult())
s.log.Error().
Stringer("serviceResult", header.GetServiceResult()).
Msg("Subscription ServiceFault returned from server with error code")
}
}
}
Expand Down Expand Up @@ -842,7 +858,7 @@ func (s *SecureChannel) onDisconnectCloseSecureChannel(ctx context.Context, code
s.log.Debug().Err(err).Msg("a error")
}
}
s.log.Debug().Msgf("Submitting CloseSecureChannel with id of %d", transactionId)
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting CloseSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
Expand Down Expand Up @@ -993,9 +1009,10 @@ func (s *SecureChannel) onDiscoverOpenSecureChannel(ctx context.Context, codec *
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().Msgf("Failed to connect to opc ua server for the following reason:- %v, %v",
statusCode,
statusCodeByValue)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
Expand Down Expand Up @@ -1027,7 +1044,10 @@ func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec
nextRequestId := opcuaOpenResponse.GetRequestId() + 1

if !(transactionId == nextSequenceNumber) {
s.log.Error().Msgf("Sequence number isn't as expected, we might have missed a packet. - %d != %d", transactionId, nextSequenceNumber)
s.log.Error().
Int32("transactionId", transactionId).
Int32("nextSequenceNumber", nextSequenceNumber).
Msg("Sequence number isn't as expected, we might have missed a packet. - transactionId != nextSequenceNumber")
return
}

Expand Down Expand Up @@ -1115,9 +1135,10 @@ func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().Msgf("Failed to connect to opc ua server for the following reason:- %v, %v",
statusCode,
statusCodeByValue)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
response := extensionObject.GetBody().(readWriteModel.GetEndpointsResponse)
Expand All @@ -1126,7 +1147,7 @@ func (s *SecureChannel) onDiscoverGetEndpointsRequest(ctx context.Context, codec
for _, endpoint := range endpoints {
endpointDescription := endpoint.(readWriteModel.EndpointDescription)
if endpointDescription.GetEndpointUrl().GetStringValue() == (s.endpoint.GetStringValue()) && endpointDescription.GetSecurityPolicyUri().GetStringValue() == (s.securityPolicy) {
s.log.Info().Msgf("Found OPC UA endpoint %s", s.endpoint.GetStringValue())
s.log.Info().Str("stringValue", s.endpoint.GetStringValue()).Msg("Found OPC UA endpoint")
s.configuration.senderCertificate = endpointDescription.GetServerCertificate().GetStringValue()
}
}
Expand Down Expand Up @@ -1227,7 +1248,7 @@ func (s *SecureChannel) onDiscoverCloseSecureChannel(ctx context.Context, codec
s.log.Debug().Err(err).Msg("a error")
}
}
s.log.Debug().Msgf("Submitting CloseSecureChannel with id of %d", transactionId)
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting CloseSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
Expand Down Expand Up @@ -1364,9 +1385,10 @@ func (s *SecureChannel) keepAlive() {
if fault, ok := extensionObject.GetBody().(readWriteModel.ServiceFaultExactly); ok {
statusCode := fault.GetResponseHeader().(readWriteModel.ResponseHeader).GetServiceResult().GetStatusCode()
statusCodeByValue, _ := readWriteModel.OpcuaStatusCodeByValue(statusCode)
s.log.Error().Msgf("Failed to connect to opc ua server for the following reason:- %v, %v",
statusCode,
statusCodeByValue)
s.log.Error().
Uint32("statusCode", statusCode).
Stringer("statusCodeByValue", statusCodeByValue).
Msg("Failed to connect to opc ua server for the following reason")
} else {
s.log.Debug().Msg("Got Secure Response Connection Response")
openSecureChannelResponse := extensionObject.GetBody().(readWriteModel.OpenSecureChannelResponse)
Expand All @@ -1386,7 +1408,7 @@ func (s *SecureChannel) keepAlive() {
s.log.Debug().Err(err).Msg("a error")
}
}
s.log.Debug().Msgf("Submitting OpenSecureChannel with id of %d", transactionId)
s.log.Debug().Int32("transactionId", transactionId).Msg("Submitting OpenSecureChannel with id")
if err := s.channelTransactionManager.submit(requestConsumer, transactionId); err != nil {
s.log.Debug().Err(err).Msg("error submitting")
}
Expand Down Expand Up @@ -1439,12 +1461,12 @@ func (s *SecureChannel) selectEndpoint(sessionResponse readWriteModel.CreateSess
}

if s.policyId == nil {
s.log.Error().Msgf("Unable to find endpoint - %s", s.endpoints[0])
s.log.Error().Str("endpoint", s.endpoints[0]).Msg("Unable to find endpoint")
return
}

if s.tokenType == 0xffffffff { // TODO: what did we use as undefined
s.log.Error().Msgf("Unable to find Security Policy for endpoint - %s", s.endpoints[0])
s.log.Error().Str("endpoint", s.endpoints[0]).Msg("Unable to find Security Policy for endpoint")
return
}
}
Expand All @@ -1458,10 +1480,14 @@ func (s *SecureChannel) isEndpoint(endpoint readWriteModel.EndpointDescription)
// Split up the connection string into its individual segments.
matches := utils.GetSubgroupMatches(URI_PATTERN, endpoint.GetEndpointUrl().GetStringValue())
if len(matches) == 0 {
s.log.Error().Msgf("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'")
s.log.Error().Msg("Endpoint returned from the server doesn't match the format '{protocol-code}:({transport-code})?//{transport-host}(:{transport-port})(/{transport-endpoint})'")
return false
}
s.log.Trace().Msgf("Using Endpoint %s %s %s", matches["transportHost"], matches["transportPort"], matches["transportEndpoint"])
s.log.Trace().
Str("transportHost", matches["transportHost"]).
Str("transportPort", matches["transportPort"]).
Str("transportEndpoint", matches["transportEndpoint"]).
Msg("Using Endpoint")

if s.configuration.discovery && !slices.Contains(s.endpoints, matches["transportHost"]) {
return false
Expand Down
10 changes: 5 additions & 5 deletions plc4go/internal/opcua/SecureChannelTransactionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ func NewSecureChannelTransactionManager(log zerolog.Logger) *SecureChannelTransa
func (m *SecureChannelTransactionManager) submit(onSend func(transactionId int32), transactionId int32) error {
m.lock.Lock()
defer m.lock.Unlock()
m.log.Info().Msgf("Active transaction Number %d", m.activeTransactionId.Load())
m.log.Info().Int32("activeTransactionId", m.activeTransactionId.Load()).Msg("Active transaction Number")
if m.activeTransactionId.Load() == transactionId {
onSend(transactionId)
newTransactionId := m.getActiveTransactionIdentifier()
if len(m.queue) > 0 {
t, ok := m.queue[newTransactionId]
if !ok {
m.log.Info().Msgf("Length of Queue is %d", len(m.queue))
m.log.Info().Msgf("Transaction ID is %d", newTransactionId)
m.log.Info().Msgf("Map is %v", m.queue)
m.log.Info().Int("queueLength", len(m.queue)).Msg("Length of Queue")
m.log.Info().Int32("newTransactionId", newTransactionId).Msg("Transaction ID")
m.log.Info().Interface("map", m.queue).Msg("Map is")
return errors.Errorf("Transaction Id not found in queued messages %v", m.queue)
}
delete(m.queue, newTransactionId)
Expand All @@ -67,7 +67,7 @@ func (m *SecureChannelTransactionManager) submit(onSend func(transactionId int32
}
}
} else {
m.log.Info().Msgf("Storing out of order transaction %d", transactionId)
m.log.Info().Int32("transactionId", transactionId).Msg("Storing out of order transaction")
m.queue[transactionId] = SecureChannelTransactionManagerTransaction{consumer: onSend, transactionId: transactionId}
}
return nil
Expand Down
Loading

0 comments on commit a5a5e94

Please sign in to comment.