Skip to content

Commit

Permalink
as: Publish uplink normalized message
Browse files Browse the repository at this point in the history
  • Loading branch information
johanstokking committed Aug 26, 2022
1 parent 5804763 commit d91ed1b
Show file tree
Hide file tree
Showing 2 changed files with 337 additions and 37 deletions.
117 changes: 80 additions & 37 deletions pkg/applicationserver/applicationserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,9 @@ func (as *ApplicationServer) handleUp(ctx context.Context, up *ttnpb.Application
case *ttnpb.ApplicationUp_JoinAccept:
return true, as.handleJoinAccept(ctx, up.EndDeviceIds, p.JoinAccept, link)
case *ttnpb.ApplicationUp_UplinkMessage:
return true, as.handleUplink(ctx, up.EndDeviceIds, p.UplinkMessage, link, up.ReceivedAt)
return true, as.handleUplink(ctx, uplinkInfo{up.EndDeviceIds, up.ReceivedAt, p.UplinkMessage, false, link})
case *ttnpb.ApplicationUp_UplinkNormalized:
return true, nil
case *ttnpb.ApplicationUp_DownlinkQueueInvalidated:
return as.handleDownlinkQueueInvalidated(ctx, up.EndDeviceIds, p.DownlinkQueueInvalidated, link)
case *ttnpb.ApplicationUp_DownlinkSent:
Expand All @@ -821,7 +823,7 @@ func (as *ApplicationServer) handleUp(ctx context.Context, up *ttnpb.Application
func (as *ApplicationServer) handleSimulatedUp(ctx context.Context, up *ttnpb.ApplicationUp, link *ttnpb.ApplicationLink) error {
switch p := up.Up.(type) {
case *ttnpb.ApplicationUp_UplinkMessage:
return as.handleSimulatedUplink(ctx, up.EndDeviceIds, p.UplinkMessage, link)
return as.handleSimulatedUplink(ctx, uplinkInfo{up.EndDeviceIds, up.ReceivedAt, p.UplinkMessage, true, link})
default:
return nil
}
Expand Down Expand Up @@ -1051,17 +1053,51 @@ func (as *ApplicationServer) setActivated(ctx context.Context, ids *ttnpb.EndDev
}
}

func (as *ApplicationServer) handleUplink(
ctx context.Context,
ids *ttnpb.EndDeviceIdentifiers,
uplink *ttnpb.ApplicationUplink,
link *ttnpb.ApplicationLink,
receivedAt *pbtypes.Timestamp,
) error {
func (as *ApplicationServer) publishNormalizedUplink(ctx context.Context, info uplinkInfo) error {
for _, measurement := range info.uplink.NormalizedPayload {
if err := as.Publish(ctx, &ttnpb.ApplicationUp{
EndDeviceIds: info.ids,
CorrelationIds: events.CorrelationIDsFromContext(ctx),
ReceivedAt: info.receivedAt,
Up: &ttnpb.ApplicationUp_UplinkNormalized{
UplinkNormalized: &ttnpb.ApplicationUplinkNormalized{
SessionKeyId: info.uplink.SessionKeyId,
FPort: info.uplink.FPort,
FCnt: info.uplink.FCnt,
FrmPayload: info.uplink.FrmPayload,
NormalizedPayload: measurement,
NormalizedPayloadWarnings: info.uplink.NormalizedPayloadWarnings,
RxMetadata: info.uplink.RxMetadata,
Settings: info.uplink.Settings,
ReceivedAt: info.uplink.ReceivedAt,
Confirmed: info.uplink.Confirmed,
ConsumedAirtime: info.uplink.ConsumedAirtime,
Locations: info.uplink.Locations,
VersionIds: info.uplink.VersionIds,
NetworkIds: info.uplink.NetworkIds,
},
},
Simulated: info.simulated,
}); err != nil {
return err
}
}
return nil
}

type uplinkInfo struct {
ids *ttnpb.EndDeviceIdentifiers
receivedAt *pbtypes.Timestamp
uplink *ttnpb.ApplicationUplink
simulated bool
link *ttnpb.ApplicationLink
}

func (as *ApplicationServer) handleUplink(ctx context.Context, info uplinkInfo) error {
defer trace.StartRegion(ctx, "handle uplink").End()

ctx = log.NewContextWithField(ctx, "session_key_id", uplink.SessionKeyId)
dev, err := as.deviceRegistry.Set(ctx, ids,
ctx = log.NewContextWithField(ctx, "session_key_id", info.uplink.SessionKeyId)
dev, err := as.deviceRegistry.Set(ctx, info.ids,
[]string{
"activated_at",
"formatters",
Expand All @@ -1072,9 +1108,9 @@ func (as *ApplicationServer) handleUplink(
},
func(dev *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) {
if dev == nil {
return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, ids))
return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, info.ids))
}
mask, err := as.matchSession(ctx, ids, dev, link, uplink.SessionKeyId)
mask, err := as.matchSession(ctx, info.ids, dev, info.link, info.uplink.SessionKeyId)
if err != nil {
return nil, nil, err
}
Expand All @@ -1087,40 +1123,44 @@ func (as *ApplicationServer) handleUplink(
if err != nil {
return err
}
if !as.skipPayloadCrypto(ctx, link, dev, dev.Session) {
if err := as.decryptAndDecodeUplink(ctx, dev, uplink, link.DefaultFormatters); err != nil {

if !as.skipPayloadCrypto(ctx, info.link, dev, dev.Session) {
if err := as.decryptAndDecodeUplink(ctx, dev, info.uplink, info.link.DefaultFormatters); err != nil {
return err
}
if err := as.publishNormalizedUplink(ctx, info); err != nil {
return err
}
if err := as.storeUplink(ctx, ids, uplink); err != nil {
if err := as.storeUplink(ctx, info.ids, info.uplink); err != nil {
return err
}
} else if appSKey := dev.GetSession().GetKeys().GetAppSKey(); appSKey != nil {
uplink.AppSKey = appSKey
uplink.LastAFCntDown = dev.Session.LastAFCntDown
info.uplink.AppSKey = appSKey
info.uplink.LastAFCntDown = dev.Session.LastAFCntDown
}

registerUplinkLatency(ctx, uplink)
registerUplinkLatency(ctx, info.uplink)

if dev.VersionIds != nil {
uplink.VersionIds = dev.VersionIds
info.uplink.VersionIds = dev.VersionIds
}

// Set location in message and publish location solved if the payload contains location information.
if locations, err := as.locationRegistry.Get(ctx, ids); err != nil {
if locations, err := as.locationRegistry.Get(ctx, info.ids); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations")
} else {
uplink.Locations = locations
info.uplink.Locations = locations
}
loc := as.locationFromPayload(uplink)
loc := as.locationFromPayload(info.uplink)
if loc != nil {
if uplink.Locations == nil {
uplink.Locations = make(map[string]*ttnpb.Location, 1)
if info.uplink.Locations == nil {
info.uplink.Locations = make(map[string]*ttnpb.Location, 1)
}
uplink.Locations["frm-payload"] = loc
info.uplink.Locations["frm-payload"] = loc
if err := as.Publish(ctx, &ttnpb.ApplicationUp{
EndDeviceIds: ids,
EndDeviceIds: info.ids,
CorrelationIds: events.CorrelationIDsFromContext(ctx),
ReceivedAt: receivedAt,
ReceivedAt: info.receivedAt,
Up: &ttnpb.ApplicationUp_LocationSolved{
LocationSolved: &ttnpb.ApplicationLocation{
Service: "frm-payload",
Expand All @@ -1134,27 +1174,27 @@ func (as *ApplicationServer) handleUplink(

// If the device has not been activated before, publish the activation event.
if dev.ActivatedAt == nil {
if err := as.activationPool.Publish(ctx, ids); err != nil {
if err := as.activationPool.Publish(ctx, info.ids); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to publish activation event")
}
}

// Publish last seen event.
if err := as.deviceLastSeenPool.Publish(ctx, lastSeenAtInfo{
ids: ids,
lastSeenAt: uplink.ReceivedAt,
ids: info.ids,
lastSeenAt: info.uplink.ReceivedAt,
}); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to publish last seen event")
}

return nil
}

func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, uplink *ttnpb.ApplicationUplink, link *ttnpb.ApplicationLink) error {
func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, item uplinkInfo) error {
defer trace.StartRegion(ctx, "handle simulated uplink").End()

ctx = log.NewContextWithField(ctx, "session_key_id", uplink.SessionKeyId)
dev, err := as.deviceRegistry.Get(ctx, ids,
ctx = log.NewContextWithField(ctx, "session_key_id", item.uplink.SessionKeyId)
dev, err := as.deviceRegistry.Get(ctx, item.ids,
[]string{
"formatters",
"version_ids",
Expand All @@ -1164,13 +1204,16 @@ func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, ids *ttn
return err
}

if locations, err := as.locationRegistry.Get(ctx, ids); err != nil {
if locations, err := as.locationRegistry.Get(ctx, item.ids); err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations")
} else {
uplink.Locations = locations
item.uplink.Locations = locations
}

return as.decodeUplink(ctx, dev, uplink, link.DefaultFormatters)
if err := as.decodeUplink(ctx, dev, item.uplink, item.link.DefaultFormatters); err != nil {
return err
}
return as.publishNormalizedUplink(ctx, item)
}

func (as *ApplicationServer) handleDownlinkQueueInvalidated(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, invalid *ttnpb.ApplicationInvalidatedDownlinks, link *ttnpb.ApplicationLink) (pass bool, err error) {
Expand Down
Loading

0 comments on commit d91ed1b

Please sign in to comment.