You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am using kepserver as an opcua server and using go opcua to connect to server, i was checking the reconnection functionality after network down or interruption, for me program is getting stuck , so i set the debug.Enable as true, to check where the code is getting stuck.
What i found out is after reconnection, it was executing restoreSubscriptions, in that steps, it calling an republishSubscription and sendRepublishRequests in client_sub.go
I've been reviewing the republishSubscription and sendRepublishRequests functions, and I noticed that the sequence number (sub.nextSeq) used in the RepublishRequest does not change within the loop. It seems like the same sequence number is being repeatedly requested without incrementing sub.nextSeq after each successful republish.
func (c *Client) sendRepublishRequests(ctx context.Context, sub *Subscription, availableSeq []uint32) error {
// todo(fs): check if sub.nextSeq is in the available sequence numbers
// todo(fs): if not then we need to decide whether we fail b/c of data loss
// todo(fs): or whether we log it and continue.
if len(availableSeq) > 0 && !slices.Contains(availableSeq, sub.nextSeq) {
log.Printf("sub %d: next sequence number %d not in retransmission buffer %v", sub.SubscriptionID, sub.nextSeq, availableSeq)
}
for {
req := &ua.RepublishRequest{
SubscriptionID: sub.SubscriptionID,
RetransmitSequenceNumber: sub.nextSeq,
}
debug.Printf("Republishing subscription %d and sequence number %d",
req.SubscriptionID,
req.RetransmitSequenceNumber,
)
s := c.Session()
if s == nil {
debug.Printf("Republishing subscription %d aborted", req.SubscriptionID)
return ua.StatusBadSessionClosed
}
sc := c.SecureChannel()
if sc == nil {
debug.Printf("Republishing subscription %d aborted", req.SubscriptionID)
return ua.StatusBadNotConnected
}
debug.Printf("RepublishRequest: req=%s", debug.ToJSON(req))
var res *ua.RepublishResponse
err := c.SecureChannel().SendRequest(ctx, req, c.Session().resp.AuthenticationToken, func(v interface{}) error {
return safeAssign(v, &res)
})
debug.Printf("RepublishResponse: res=%s err=%v", debug.ToJSON(res), err)
switch {
case err == ua.StatusBadMessageNotAvailable:
// No more message to restore
debug.Printf("Republishing subscription %d OK", req.SubscriptionID)
return nil
case err != nil:
debug.Printf("Republishing subscription %d failed: %v", req.SubscriptionID, err)
return err
default:
status := ua.StatusBad
if res != nil {
status = res.ResponseHeader.ServiceResult
}
if status != ua.StatusOK {
debug.Printf("Republishing subscription %d failed: %v", req.SubscriptionID, status)
return status
}
}
time.Sleep(time.Second)
}
}
Should the sequence number be incremented after each successful republish to retrieve the next message, or is there a reason why the same sequence number is being requested repeatedly?
If this is an incomplete implementation, could you please provide guidance on how to handle the sequence number progression correctly?
Logs of the execution is given below, it is doing an infinite loop
res={"ResponseHeader":{"Timestamp":"2024-08-13T10:24:10.4754351Z","RequestHandle":26,"ServiceResult":0,"ServiceDiagnostics":{"EncodingMask":0,"SymbolicID":0,"NamespaceURI":0,"Locale":0,"LocalizedText":0,"AdditionalInfo":"","InnerStatusCode":0,"InnerDiagnosticInfo":null},"StringTable":[],"AdditionalHeader":{"EncodingMask":0,"TypeID":{"NodeID":"i=0","NamespaceURI":"","ServerIndex":0},"Value":null}},"NotificationMessage":{"SequenceNumber":41,"PublishTime":"2024-08-13T09:40:40.7015645Z","NotificationData":[{"EncodingMask":1,"TypeID":{"NodeID":"i=811","NamespaceURI":"","ServerIndex":0},"Value":{"MonitoredItems":[{"ClientHandle":101,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:40.123835Z","ServerPicoseconds":0}},{"ClientHandle":102,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:39.9962928Z","ServerPicoseconds":0}}],"DiagnosticInfos":[]}}]}} err=<nil>
debug: Republishing subscription 12 and sequence number 41
debug: RepublishRequest: req={"RequestHeader":null,"SubscriptionID":12,"RetransmitSequenceNumber":41}
debug: uasc 2/27: send *ua.RepublishRequest with 70 bytes
debug: uacp 2: recv MSGF with 117 bytes
debug: uasc 2/27: recv MSGF with 117 bytes
debug: uasc 2/27: recv *ua.RepublishResponse
debug: uasc 2/27: sending *ua.RepublishResponse to handler
debug: RepublishResponse: res={"ResponseHeader":{"Timestamp":"2024-08-13T10:24:11.5314205Z","RequestHandle":27,"ServiceResult":0,"ServiceDiagnostics":{"EncodingMask":0,"SymbolicID":0,"NamespaceURI":0,"Locale":0,"LocalizedText":0,"AdditionalInfo":"","InnerStatusCode":0,"InnerDiagnosticInfo":null},"StringTable":[],"AdditionalHeader":{"EncodingMask":0,"TypeID":{"NodeID":"i=0","NamespaceURI":"","ServerIndex":0},"Value":null}},"NotificationMessage":{"SequenceNumber":41,"PublishTime":"2024-08-13T09:40:40.7015645Z","NotificationData":[{"EncodingMask":1,"TypeID":{"NodeID":"i=811","NamespaceURI":"","ServerIndex":0},"Value":{"MonitoredItems":[{"ClientHandle":101,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:40.123835Z","ServerPicoseconds":0}},{"ClientHandle":102,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:39.9962928Z","ServerPicoseconds":0}}],"DiagnosticInfos":[]}}]}} err=<nil>
debug: Republishing subscription 12 and sequence number 41
debug: RepublishRequest: req={"RequestHeader":null,"SubscriptionID":12,"RetransmitSequenceNumber":41}
debug: uasc 2/28: send *ua.RepublishRequest with 70 bytes
debug: uacp 2: recv MSGF with 117 bytes
debug: uasc 2/28: recv MSGF with 117 bytes
debug: uasc 2/28: recv *ua.RepublishResponse
debug: uasc 2/28: sending *ua.RepublishResponse to handler
debug: RepublishResponse: res={"ResponseHeader":{"Timestamp":"2024-08-13T10:24:12.584875Z","RequestHandle":28,"ServiceResult":0,"ServiceDiagnostics":{"EncodingMask":0,"SymbolicID":0,"NamespaceURI":0,"Locale":0,"LocalizedText":0,"AdditionalInfo":"","InnerStatusCode":0,"InnerDiagnosticInfo":null},"StringTable":[],"AdditionalHeader":{"EncodingMask":0,"TypeID":{"NodeID":"i=0","NamespaceURI":"","ServerIndex":0},"Value":null}},"NotificationMessage":{"SequenceNumber":41,"PublishTime":"2024-08-13T09:40:40.7015645Z","NotificationData":[{"EncodingMask":1,"TypeID":{"NodeID":"i=811","NamespaceURI":"","ServerIndex":0},"Value":{"MonitoredItems":[{"ClientHandle":101,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:40.123835Z","ServerPicoseconds":0}},{"ClientHandle":102,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:39.9962928Z","ServerPicoseconds":0}}],"DiagnosticInfos":[]}}]}} err=<nil>
debug: Republishing subscription 12 and sequence number 41
debug: RepublishRequest: req={"RequestHeader":null,"SubscriptionID":12,"RetransmitSequenceNumber":41}
debug: uasc 2/29: send *ua.RepublishRequest with 70 bytes
debug: uacp 2: recv MSGF with 117 bytes
debug: uasc 2/29: recv MSGF with 117 bytes
debug: uasc 2/29: recv *ua.RepublishResponse
debug: uasc 2/29: sending *ua.RepublishResponse to handler
debug: RepublishResponse: res={"ResponseHeader":{"Timestamp":"2024-08-13T10:24:13.7120039Z","RequestHandle":29,"ServiceResult":0,"ServiceDiagnostics":{"EncodingMask":0,"SymbolicID":0,"NamespaceURI":0,"Locale":0,"LocalizedText":0,"AdditionalInfo":"","InnerStatusCode":0,"InnerDiagnosticInfo":null},"StringTable":[],"AdditionalHeader":{"EncodingMask":0,"TypeID":{"NodeID":"i=0","NamespaceURI":"","ServerIndex":0},"Value":null}},"NotificationMessage":{"SequenceNumber":41,"PublishTime":"2024-08-13T09:40:40.7015645Z","NotificationData":[{"EncodingMask":1,"TypeID":{"NodeID":"i=811","NamespaceURI":"","ServerIndex":0},"Value":{"MonitoredItems":[{"ClientHandle":101,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:40.123835Z","ServerPicoseconds":0}},{"ClientHandle":102,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:39.9962928Z","ServerPicoseconds":0}}],"DiagnosticInfos":[]}}]}} err=<nil>
debug: Republishing subscription 12 and sequence number 41
debug: RepublishRequest: req={"RequestHeader":null,"SubscriptionID":12,"RetransmitSequenceNumber":41}
debug: uasc 2/30: send *ua.RepublishRequest with 70 bytes
debug: uacp 2: recv MSGF with 117 bytes
debug: uasc 2/30: recv MSGF with 117 bytes
debug: uasc 2/30: recv *ua.RepublishResponse
debug: uasc 2/30: sending *ua.RepublishResponse to handler
debug: RepublishResponse: res={"ResponseHeader":{"Timestamp":"2024-08-13T10:24:14.7408042Z","RequestHandle":30,"ServiceResult":0,"ServiceDiagnostics":{"EncodingMask":0,"SymbolicID":0,"NamespaceURI":0,"Locale":0,"LocalizedText":0,"AdditionalInfo":"","InnerStatusCode":0,"InnerDiagnosticInfo":null},"StringTable":[],"AdditionalHeader":{"EncodingMask":0,"TypeID":{"NodeID":"i=0","NamespaceURI":"","ServerIndex":0},"Value":null}},"NotificationMessage":{"SequenceNumber":41,"PublishTime":"2024-08-13T09:40:40.7015645Z","NotificationData":[{"EncodingMask":1,"TypeID":{"NodeID":"i=811","NamespaceURI":"","ServerIndex":0},"Value":{"MonitoredItems":[{"ClientHandle":101,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:40.123835Z","ServerPicoseconds":0}},{"ClientHandle":102,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:39.9962928Z","ServerPicoseconds":0}}],"DiagnosticInfos":[]}}]}} err=<nil>
debug: Republishing subscription 12 and sequence number 41
debug: RepublishRequest: req={"RequestHeader":null,"SubscriptionID":12,"RetransmitSequenceNumber":41}
debug: uasc 2/31: send *ua.RepublishRequest with 70 bytes
debug: uacp 2: recv MSGF with 117 bytes
debug: uasc 2/31: recv MSGF with 117 bytes
debug: uasc 2/31: recv *ua.RepublishResponse
debug: uasc 2/31: sending *ua.RepublishResponse to handler
debug: RepublishResponse: res={"ResponseHeader":{"Timestamp":"2024-08-13T10:24:15.7662299Z","RequestHandle":31,"ServiceResult":0,"ServiceDiagnostics":{"EncodingMask":0,"SymbolicID":0,"NamespaceURI":0,"Locale":0,"LocalizedText":0,"AdditionalInfo":"","InnerStatusCode":0,"InnerDiagnosticInfo":null},"StringTable":[],"AdditionalHeader":{"EncodingMask":0,"TypeID":{"NodeID":"i=0","NamespaceURI":"","ServerIndex":0},"Value":null}},"NotificationMessage":{"SequenceNumber":41,"PublishTime":"2024-08-13T09:40:40.7015645Z","NotificationData":[{"EncodingMask":1,"TypeID":{"NodeID":"i=811","NamespaceURI":"","ServerIndex":0},"Value":{"MonitoredItems":[{"ClientHandle":101,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:40.123835Z","ServerPicoseconds":0}},{"ClientHandle":102,"Value":{"EncodingMask":9,"Value":{},"Status":0,"SourceTimestamp":"0001-01-01T00:00:00Z","SourcePicoseconds":0,"ServerTimestamp":"2024-08-13T09:40:39.9962928Z","ServerPicoseconds":0}}],"DiagnosticInfos":[]}}]}} err=<nil>
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
I am using kepserver as an opcua server and using go opcua to connect to server, i was checking the reconnection functionality after network down or interruption, for me program is getting stuck , so i set the
debug.Enable
as true, to check where the code is getting stuck.What i found out is after reconnection, it was executing
restoreSubscriptions
, in that steps, it calling anrepublishSubscription
andsendRepublishRequests
inclient_sub.go
I've been reviewing the republishSubscription and sendRepublishRequests functions, and I noticed that the sequence number (sub.nextSeq) used in the RepublishRequest does not change within the loop. It seems like the same sequence number is being repeatedly requested without incrementing sub.nextSeq after each successful republish.
Should the sequence number be incremented after each successful republish to retrieve the next message, or is there a reason why the same sequence number is being requested repeatedly?
If this is an incomplete implementation, could you please provide guidance on how to handle the sequence number progression correctly?
Logs of the execution is given below, it is doing an infinite loop
Thank you!
Beta Was this translation helpful? Give feedback.
All reactions