From d91ed1bd39d03c26507dc369ee2b750f0690dc62 Mon Sep 17 00:00:00 2001 From: Johan Stokking Date: Fri, 26 Aug 2022 22:00:44 +0200 Subject: [PATCH] as: Publish uplink normalized message --- pkg/applicationserver/applicationserver.go | 117 +++++--- .../applicationserver_test.go | 257 ++++++++++++++++++ 2 files changed, 337 insertions(+), 37 deletions(-) diff --git a/pkg/applicationserver/applicationserver.go b/pkg/applicationserver/applicationserver.go index 055658d0f68..fbcf668cb3c 100644 --- a/pkg/applicationserver/applicationserver.go +++ b/pkg/applicationserver/applicationserver.go @@ -798,7 +798,9 @@ func (as *ApplicationServer) handleUp(ctx context.Context, up *ttnpb.Application case *ttnpb.ApplicationUp_JoinAccept: return true, as.handleJoinAccept(ctx, up.EndDeviceIds, p.JoinAccept, link) case *ttnpb.ApplicationUp_UplinkMessage: - return true, as.handleUplink(ctx, up.EndDeviceIds, p.UplinkMessage, link, up.ReceivedAt) + return true, as.handleUplink(ctx, uplinkInfo{up.EndDeviceIds, up.ReceivedAt, p.UplinkMessage, false, link}) + case *ttnpb.ApplicationUp_UplinkNormalized: + return true, nil case *ttnpb.ApplicationUp_DownlinkQueueInvalidated: return as.handleDownlinkQueueInvalidated(ctx, up.EndDeviceIds, p.DownlinkQueueInvalidated, link) case *ttnpb.ApplicationUp_DownlinkSent: @@ -821,7 +823,7 @@ func (as *ApplicationServer) handleUp(ctx context.Context, up *ttnpb.Application func (as *ApplicationServer) handleSimulatedUp(ctx context.Context, up *ttnpb.ApplicationUp, link *ttnpb.ApplicationLink) error { switch p := up.Up.(type) { case *ttnpb.ApplicationUp_UplinkMessage: - return as.handleSimulatedUplink(ctx, up.EndDeviceIds, p.UplinkMessage, link) + return as.handleSimulatedUplink(ctx, uplinkInfo{up.EndDeviceIds, up.ReceivedAt, p.UplinkMessage, true, link}) default: return nil } @@ -1051,17 +1053,51 @@ func (as *ApplicationServer) setActivated(ctx context.Context, ids *ttnpb.EndDev } } -func (as *ApplicationServer) handleUplink( - ctx context.Context, - ids *ttnpb.EndDeviceIdentifiers, - uplink *ttnpb.ApplicationUplink, - link *ttnpb.ApplicationLink, - receivedAt *pbtypes.Timestamp, -) error { +func (as *ApplicationServer) publishNormalizedUplink(ctx context.Context, info uplinkInfo) error { + for _, measurement := range info.uplink.NormalizedPayload { + if err := as.Publish(ctx, &ttnpb.ApplicationUp{ + EndDeviceIds: info.ids, + CorrelationIds: events.CorrelationIDsFromContext(ctx), + ReceivedAt: info.receivedAt, + Up: &ttnpb.ApplicationUp_UplinkNormalized{ + UplinkNormalized: &ttnpb.ApplicationUplinkNormalized{ + SessionKeyId: info.uplink.SessionKeyId, + FPort: info.uplink.FPort, + FCnt: info.uplink.FCnt, + FrmPayload: info.uplink.FrmPayload, + NormalizedPayload: measurement, + NormalizedPayloadWarnings: info.uplink.NormalizedPayloadWarnings, + RxMetadata: info.uplink.RxMetadata, + Settings: info.uplink.Settings, + ReceivedAt: info.uplink.ReceivedAt, + Confirmed: info.uplink.Confirmed, + ConsumedAirtime: info.uplink.ConsumedAirtime, + Locations: info.uplink.Locations, + VersionIds: info.uplink.VersionIds, + NetworkIds: info.uplink.NetworkIds, + }, + }, + Simulated: info.simulated, + }); err != nil { + return err + } + } + return nil +} + +type uplinkInfo struct { + ids *ttnpb.EndDeviceIdentifiers + receivedAt *pbtypes.Timestamp + uplink *ttnpb.ApplicationUplink + simulated bool + link *ttnpb.ApplicationLink +} + +func (as *ApplicationServer) handleUplink(ctx context.Context, info uplinkInfo) error { defer trace.StartRegion(ctx, "handle uplink").End() - ctx = log.NewContextWithField(ctx, "session_key_id", uplink.SessionKeyId) - dev, err := as.deviceRegistry.Set(ctx, ids, + ctx = log.NewContextWithField(ctx, "session_key_id", info.uplink.SessionKeyId) + dev, err := as.deviceRegistry.Set(ctx, info.ids, []string{ "activated_at", "formatters", @@ -1072,9 +1108,9 @@ func (as *ApplicationServer) handleUplink( }, func(dev *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { if dev == nil { - return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, ids)) + return nil, nil, errDeviceNotFound.WithAttributes("device_uid", unique.ID(ctx, info.ids)) } - mask, err := as.matchSession(ctx, ids, dev, link, uplink.SessionKeyId) + mask, err := as.matchSession(ctx, info.ids, dev, info.link, info.uplink.SessionKeyId) if err != nil { return nil, nil, err } @@ -1087,40 +1123,44 @@ func (as *ApplicationServer) handleUplink( if err != nil { return err } - if !as.skipPayloadCrypto(ctx, link, dev, dev.Session) { - if err := as.decryptAndDecodeUplink(ctx, dev, uplink, link.DefaultFormatters); err != nil { + + if !as.skipPayloadCrypto(ctx, info.link, dev, dev.Session) { + if err := as.decryptAndDecodeUplink(ctx, dev, info.uplink, info.link.DefaultFormatters); err != nil { + return err + } + if err := as.publishNormalizedUplink(ctx, info); err != nil { return err } - if err := as.storeUplink(ctx, ids, uplink); err != nil { + if err := as.storeUplink(ctx, info.ids, info.uplink); err != nil { return err } } else if appSKey := dev.GetSession().GetKeys().GetAppSKey(); appSKey != nil { - uplink.AppSKey = appSKey - uplink.LastAFCntDown = dev.Session.LastAFCntDown + info.uplink.AppSKey = appSKey + info.uplink.LastAFCntDown = dev.Session.LastAFCntDown } - registerUplinkLatency(ctx, uplink) + registerUplinkLatency(ctx, info.uplink) if dev.VersionIds != nil { - uplink.VersionIds = dev.VersionIds + info.uplink.VersionIds = dev.VersionIds } // Set location in message and publish location solved if the payload contains location information. - if locations, err := as.locationRegistry.Get(ctx, ids); err != nil { + if locations, err := as.locationRegistry.Get(ctx, info.ids); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations") } else { - uplink.Locations = locations + info.uplink.Locations = locations } - loc := as.locationFromPayload(uplink) + loc := as.locationFromPayload(info.uplink) if loc != nil { - if uplink.Locations == nil { - uplink.Locations = make(map[string]*ttnpb.Location, 1) + if info.uplink.Locations == nil { + info.uplink.Locations = make(map[string]*ttnpb.Location, 1) } - uplink.Locations["frm-payload"] = loc + info.uplink.Locations["frm-payload"] = loc if err := as.Publish(ctx, &ttnpb.ApplicationUp{ - EndDeviceIds: ids, + EndDeviceIds: info.ids, CorrelationIds: events.CorrelationIDsFromContext(ctx), - ReceivedAt: receivedAt, + ReceivedAt: info.receivedAt, Up: &ttnpb.ApplicationUp_LocationSolved{ LocationSolved: &ttnpb.ApplicationLocation{ Service: "frm-payload", @@ -1134,15 +1174,15 @@ func (as *ApplicationServer) handleUplink( // If the device has not been activated before, publish the activation event. if dev.ActivatedAt == nil { - if err := as.activationPool.Publish(ctx, ids); err != nil { + if err := as.activationPool.Publish(ctx, info.ids); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to publish activation event") } } // Publish last seen event. if err := as.deviceLastSeenPool.Publish(ctx, lastSeenAtInfo{ - ids: ids, - lastSeenAt: uplink.ReceivedAt, + ids: info.ids, + lastSeenAt: info.uplink.ReceivedAt, }); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to publish last seen event") } @@ -1150,11 +1190,11 @@ func (as *ApplicationServer) handleUplink( return nil } -func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, uplink *ttnpb.ApplicationUplink, link *ttnpb.ApplicationLink) error { +func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, item uplinkInfo) error { defer trace.StartRegion(ctx, "handle simulated uplink").End() - ctx = log.NewContextWithField(ctx, "session_key_id", uplink.SessionKeyId) - dev, err := as.deviceRegistry.Get(ctx, ids, + ctx = log.NewContextWithField(ctx, "session_key_id", item.uplink.SessionKeyId) + dev, err := as.deviceRegistry.Get(ctx, item.ids, []string{ "formatters", "version_ids", @@ -1164,13 +1204,16 @@ func (as *ApplicationServer) handleSimulatedUplink(ctx context.Context, ids *ttn return err } - if locations, err := as.locationRegistry.Get(ctx, ids); err != nil { + if locations, err := as.locationRegistry.Get(ctx, item.ids); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to retrieve end device locations") } else { - uplink.Locations = locations + item.uplink.Locations = locations } - return as.decodeUplink(ctx, dev, uplink, link.DefaultFormatters) + if err := as.decodeUplink(ctx, dev, item.uplink, item.link.DefaultFormatters); err != nil { + return err + } + return as.publishNormalizedUplink(ctx, item) } func (as *ApplicationServer) handleDownlinkQueueInvalidated(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, invalid *ttnpb.ApplicationInvalidatedDownlinks, link *ttnpb.ApplicationLink) (pass bool, err error) { diff --git a/pkg/applicationserver/applicationserver_test.go b/pkg/applicationserver/applicationserver_test.go index 2421a921861..89bf4759019 100644 --- a/pkg/applicationserver/applicationserver_test.go +++ b/pkg/applicationserver/applicationserver_test.go @@ -949,6 +949,7 @@ func TestApplicationServer(t *testing.T) { Up: &ttnpb.ApplicationUp_JoinAccept{ JoinAccept: &ttnpb.ApplicationJoinAccept{ SessionKeyId: []byte{0x11}, + ReceivedAt: ttnpb.ProtoTimePtr(now), }, }, }, @@ -959,6 +960,7 @@ func TestApplicationServer(t *testing.T) { Up: &ttnpb.ApplicationUp_JoinAccept{ JoinAccept: &ttnpb.ApplicationJoinAccept{ SessionKeyId: []byte{0x11}, + ReceivedAt: up.GetJoinAccept().ReceivedAt, }, }, CorrelationIds: up.CorrelationIds, @@ -995,6 +997,7 @@ func TestApplicationServer(t *testing.T) { EncryptedKey: []byte{0x39, 0x11, 0x40, 0x98, 0xa1, 0x5d, 0x6f, 0x92, 0xd7, 0xf0, 0x13, 0x21, 0x5b, 0x5b, 0x41, 0xa8, 0x98, 0x2d, 0xac, 0x59, 0x34, 0x76, 0x36, 0x18}, KekLabel: "test", }, + ReceivedAt: ttnpb.ProtoTimePtr(now), }, }, }, @@ -1005,6 +1008,7 @@ func TestApplicationServer(t *testing.T) { Up: &ttnpb.ApplicationUp_JoinAccept{ JoinAccept: &ttnpb.ApplicationJoinAccept{ SessionKeyId: []byte{0x22}, + ReceivedAt: up.GetJoinAccept().ReceivedAt, }, }, CorrelationIds: up.CorrelationIds, @@ -1116,6 +1120,7 @@ func TestApplicationServer(t *testing.T) { FrmPayload: []byte{0xb, 0x8f, 0x94, 0xe6}, }, }, + ReceivedAt: ttnpb.ProtoTimePtr(now), }, }, }, @@ -1126,6 +1131,7 @@ func TestApplicationServer(t *testing.T) { Up: &ttnpb.ApplicationUp_JoinAccept{ JoinAccept: &ttnpb.ApplicationJoinAccept{ SessionKeyId: []byte{0x33}, + ReceivedAt: up.GetJoinAccept().ReceivedAt, }, }, CorrelationIds: up.CorrelationIds, @@ -1509,6 +1515,7 @@ func TestApplicationServer(t *testing.T) { FrmPayload: []byte{0x2f, 0x3f, 0x31, 0x2c}, }, }, + ReceivedAt: ttnpb.ProtoTimePtr(now), }, }, }, @@ -1520,6 +1527,7 @@ func TestApplicationServer(t *testing.T) { JoinAccept: &ttnpb.ApplicationJoinAccept{ SessionKeyId: []byte{0x44}, PendingSession: true, + ReceivedAt: up.GetJoinAccept().ReceivedAt, }, }, CorrelationIds: up.CorrelationIds, @@ -1894,6 +1902,69 @@ func TestApplicationServer(t *testing.T) { a.So(queue, should.Resemble, []*ttnpb.ApplicationDownlink{}) }, }, + { + Name: "RegisteredDevice/UplinkMessage/KnownSession/", + IDs: registeredDevice.Ids, + Message: &ttnpb.ApplicationUp{ + EndDeviceIds: withDevAddr(registeredDevice.Ids, types.DevAddr{0x55, 0x55, 0x55, 0x55}), + Up: &ttnpb.ApplicationUp_UplinkMessage{ + UplinkMessage: &ttnpb.ApplicationUplink{ + RxMetadata: []*ttnpb.RxMetadata{{GatewayIds: &ttnpb.GatewayIdentifiers{GatewayId: "gtw"}}}, + Settings: &ttnpb.TxSettings{DataRate: &ttnpb.DataRate{Modulation: &ttnpb.DataRate_Lora{Lora: &ttnpb.LoRaDataRate{}}}}, + SessionKeyId: []byte{0x55}, + FPort: 42, + FCnt: 42, + FrmPayload: []byte{0xd1, 0x43, 0x6a}, + ReceivedAt: ttnpb.ProtoTimePtr(now), + }, + }, + }, + AssertUp: func(t *testing.T, up *ttnpb.ApplicationUp) { + a := assertions.New(t) + a.So(up, should.Resemble, &ttnpb.ApplicationUp{ + EndDeviceIds: withDevAddr(registeredDevice.Ids, types.DevAddr{0x55, 0x55, 0x55, 0x55}), + Up: &ttnpb.ApplicationUp_UplinkMessage{ + UplinkMessage: &ttnpb.ApplicationUplink{ + RxMetadata: []*ttnpb.RxMetadata{{GatewayIds: &ttnpb.GatewayIdentifiers{GatewayId: "gtw"}}}, + Settings: &ttnpb.TxSettings{DataRate: &ttnpb.DataRate{Modulation: &ttnpb.DataRate_Lora{Lora: &ttnpb.LoRaDataRate{}}}}, + SessionKeyId: []byte{0x55}, + FPort: 42, + FCnt: 42, + FrmPayload: []byte{0x2a, 0x2a, 0x2a}, + DecodedPayload: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "sum": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: 126, // Payload formatter sums the bytes in FRMPayload. + }, + }, + }, + }, + VersionIds: registeredDevice.VersionIds, + ReceivedAt: up.GetUplinkMessage().ReceivedAt, + }, + }, + CorrelationIds: up.CorrelationIds, + ReceivedAt: up.ReceivedAt, + }) + }, + AssertDevice: func(t *testing.T, dev *ttnpb.EndDevice, queue []*ttnpb.ApplicationDownlink) { + a := assertions.New(t) + a.So(dev.Session, should.Resemble, &ttnpb.Session{ + DevAddr: types.DevAddr{0x55, 0x55, 0x55, 0x55}.Bytes(), + Keys: &ttnpb.SessionKeys{ + SessionKeyId: []byte{0x55}, + AppSKey: &ttnpb.KeyEnvelope{ + EncryptedKey: []byte{0x56, 0x15, 0xaa, 0x22, 0xb7, 0x5f, 0xc, 0x24, 0x79, 0x6, 0x84, 0x68, 0x89, 0x0, 0xa6, 0x16, 0x4a, 0x9c, 0xef, 0xdb, 0xbf, 0x61, 0x6f, 0x0}, + KekLabel: "test", + }, + }, + LastAFCntDown: 0, + }) + a.So(dev.PendingSession, should.BeNil) + a.So(queue, should.Resemble, []*ttnpb.ApplicationDownlink{}) + }, + }, { Name: "UnregisteredDevice/JoinAccept", IDs: unregisteredDeviceID, @@ -1908,6 +1979,7 @@ func TestApplicationServer(t *testing.T) { EncryptedKey: []byte{0x56, 0x15, 0xaa, 0x22, 0xb7, 0x5f, 0xc, 0x24, 0x79, 0x6, 0x84, 0x68, 0x89, 0x0, 0xa6, 0x16, 0x4a, 0x9c, 0xef, 0xdb, 0xbf, 0x61, 0x6f, 0x0}, KekLabel: "test", }, + ReceivedAt: ttnpb.ProtoTimePtr(now), }, }, }, @@ -2440,6 +2512,7 @@ func TestSkipPayloadCrypto(t *testing.T) { EncryptedKey: []byte{0x39, 0x11, 0x40, 0x98, 0xa1, 0x5d, 0x6f, 0x92, 0xd7, 0xf0, 0x13, 0x21, 0x5b, 0x5b, 0x41, 0xa8, 0x98, 0x2d, 0xac, 0x59, 0x34, 0x76, 0x36, 0x18}, KekLabel: kekLabel, }, + ReceivedAt: ttnpb.ProtoTimePtr(now), }, }, }, @@ -2457,6 +2530,7 @@ func TestSkipPayloadCrypto(t *testing.T) { EncryptedKey: []byte{0x39, 0x11, 0x40, 0x98, 0xa1, 0x5d, 0x6f, 0x92, 0xd7, 0xf0, 0x13, 0x21, 0x5b, 0x5b, 0x41, 0xa8, 0x98, 0x2d, 0xac, 0x59, 0x34, 0x76, 0x36, 0x18}, KekLabel: kekLabel, }, + ReceivedAt: up.GetJoinAccept().ReceivedAt, }, }, CorrelationIds: up.CorrelationIds, @@ -2468,6 +2542,7 @@ func TestSkipPayloadCrypto(t *testing.T) { Up: &ttnpb.ApplicationUp_JoinAccept{ JoinAccept: &ttnpb.ApplicationJoinAccept{ SessionKeyId: []byte{0x22}, + ReceivedAt: up.GetJoinAccept().ReceivedAt, }, }, CorrelationIds: up.CorrelationIds, @@ -2904,6 +2979,188 @@ func TestLocationFromPayload(t *testing.T) { } } +func TestUplinkNormalized(t *testing.T) { + a, ctx := test.New(t) + + registeredApplicationID := ttnpb.ApplicationIdentifiers{ApplicationId: "foo-app"} + + // This device gets registered in the device registry of the Application Server. + registeredDevice := &ttnpb.EndDevice{ + Ids: &ttnpb.EndDeviceIdentifiers{ + ApplicationIds: ®isteredApplicationID, + DeviceId: "foo-device", + JoinEui: types.EUI64{0x42, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}.Bytes(), + DevEui: types.EUI64{0x42, 0x42, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}.Bytes(), + }, + Session: &ttnpb.Session{ + DevAddr: types.DevAddr{0x11, 0x11, 0x11, 0x11}.Bytes(), + Keys: &ttnpb.SessionKeys{ + SessionKeyId: []byte{0x11}, + AppSKey: &ttnpb.KeyEnvelope{ + Key: types.AES128Key{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11}.Bytes(), //nolint:lll + }, + }, + }, + Formatters: &ttnpb.MessagePayloadFormatters{ + UpFormatter: ttnpb.PayloadFormatter_FORMATTER_JAVASCRIPT, + UpFormatterParameter: `function decodeUplink(input) { + return { + data: { + air: { + temperature: 21.5, + } + } + }; + }`, + }, + } + + is, isAddr, closeIS := mockis.New(ctx) + defer closeIS() + is.EndDeviceRegistry().Add(ctx, registeredDevice) + + devsRedisClient, devsFlush := test.NewRedis(ctx, "applicationserver_test", "devices") + defer devsFlush() + defer devsRedisClient.Close() + deviceRegistry := &redis.DeviceRegistry{Redis: devsRedisClient, LockTTL: test.Delay << 10} + if err := deviceRegistry.Init(ctx); !a.So(err, should.BeNil) { + t.FailNow() + } + _, err := deviceRegistry.Set(ctx, registeredDevice.Ids, nil, func(ed *ttnpb.EndDevice) (*ttnpb.EndDevice, []string, error) { + return registeredDevice, []string{"ids", "session", "formatters"}, nil + }) + if err != nil { + t.Fatalf("Failed to set device in registry: %s", err) + } + + linksRedisClient, linksFlush := test.NewRedis(ctx, "applicationserver_test", "links") + defer linksFlush() + defer linksRedisClient.Close() + linkRegistry := &redis.LinkRegistry{Redis: linksRedisClient, LockTTL: test.Delay << 10} + if err := linkRegistry.Init(ctx); !a.So(err, should.BeNil) { + t.FailNow() + } + _, err = linkRegistry.Set(ctx, ®isteredApplicationID, nil, func(_ *ttnpb.ApplicationLink) (*ttnpb.ApplicationLink, []string, error) { + return &ttnpb.ApplicationLink{}, nil, nil + }) + if err != nil { + t.Fatalf("Failed to set link in registry: %s", err) + } + + distribRedisClient, distribFlush := test.NewRedis(ctx, "applicationserver_test", "traffic") + defer distribFlush() + defer distribRedisClient.Close() + distribPubSub := distribredis.PubSub{Redis: distribRedisClient} + + applicationUpsRedisClient, applicationUpsFlush := test.NewRedis(ctx, "applicationserver_test", "applicationups") + defer applicationUpsFlush() + defer applicationUpsRedisClient.Close() + applicationUpsRegistry := &redis.ApplicationUplinkRegistry{ + Redis: applicationUpsRedisClient, + Limit: 16, + } + + c := componenttest.NewComponent(t, &component.Config{ + ServiceBase: config.ServiceBase{ + GRPC: config.GRPC{ + Listen: ":9189", + AllowInsecureForCredentials: true, + }, + Cluster: cluster.Config{ + IdentityServer: isAddr, + }, + HTTP: config.HTTP{ + Listen: ":8100", + }, + }, + }) + config := &applicationserver.Config{ + Devices: deviceRegistry, + Links: linkRegistry, + UplinkStorage: applicationserver.UplinkStorageConfig{ + Registry: applicationUpsRegistry, + Limit: 16, + }, + Distribution: applicationserver.DistributionConfig{ + Global: applicationserver.GlobalDistributorConfig{ + PubSub: distribPubSub, + }, + }, + EndDeviceMetadataStorage: applicationserver.EndDeviceMetadataStorageConfig{ + Location: applicationserver.EndDeviceLocationStorageConfig{ + Registry: metadata.NewClusterEndDeviceLocationRegistry(c, (1<<4)*Timeout), + }, + }, + } + as, err := applicationserver.New(c, config) + if !a.So(err, should.BeNil) { + t.FailNow() + } + + roles := as.Roles() + a.So(len(roles), should.Equal, 1) + a.So(roles[0], should.Equal, ttnpb.ClusterRole_APPLICATION_SERVER) + + componenttest.StartComponent(t, c) + defer c.Close() + + mustHavePeer(ctx, c, ttnpb.ClusterRole_ENTITY_REGISTRY) + + sub, err := as.Subscribe(ctx, "test", nil, false) + a.So(err, should.BeNil) + + now := time.Now().UTC() + err = as.Publish(ctx, &ttnpb.ApplicationUp{ + EndDeviceIds: registeredDevice.Ids, + Up: &ttnpb.ApplicationUp_UplinkMessage{ + UplinkMessage: &ttnpb.ApplicationUplink{ + RxMetadata: []*ttnpb.RxMetadata{{GatewayIds: &ttnpb.GatewayIdentifiers{GatewayId: "gtw"}}}, + Settings: &ttnpb.TxSettings{DataRate: &ttnpb.DataRate{Modulation: &ttnpb.DataRate_Lora{Lora: &ttnpb.LoRaDataRate{}}}}, + SessionKeyId: []byte{0x11}, + FPort: 11, + FCnt: 11, + FrmPayload: []byte{0x11}, + ReceivedAt: ttnpb.ProtoTimePtr(now), + }, + }, + }) + a.So(err, should.BeNil) + + // The uplink message and the normalized payload message may come out of order. + // Expect exactly two messages. + var normalized *ttnpb.ApplicationUplinkNormalized + for i := 0; i < 2; i++ { + select { + case msg := <-sub.Up(): + if n := msg.GetUplinkNormalized(); n != nil { + normalized = n + } + case <-time.After(Timeout): + t.Fatalf("Expected upstream message %d timed out", i) + } + } + if normalized == nil { + t.Fatalf("Expected uplink normalized message") + } + a.So(normalized.NormalizedPayload, should.Resemble, &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "air": { + Kind: &pbtypes.Value_StructValue{ + StructValue: &pbtypes.Struct{ + Fields: map[string]*pbtypes.Value{ + "temperature": { + Kind: &pbtypes.Value_NumberValue{ + NumberValue: 21.5, + }, + }, + }, + }, + }, + }, + }, + }) +} + func TestApplicationServerCleanup(t *testing.T) { a, ctx := test.New(t)