Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat normalized payload validation errors as warnings #5752

Merged
merged 4 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions pkg/messageprocessors/javascript/javascript.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,24 @@ func (h *host) DecodeUplink(
return h.decodeUplink(ctx, msg, run)
}

func appendValidationErrors(dst []string, measurements []normalizedpayload.ParsedMeasurement) []string {
for i, m := range measurements {
for _, err := range m.ValidationErrors {
var (
errString string
ttnErr *errors.Error
)
if errors.As(err, &ttnErr) {
errString = ttnErr.FormatMessage(ttnErr.PublicAttributes())
} else {
errString = err.Error()
}
dst = append(dst, fmt.Sprintf("measurement %d: %s", i+1, errString))
}
}
return dst
}

func (*host) decodeUplink(
ctx context.Context,
msg *ttnpb.ApplicationUplink,
Expand Down Expand Up @@ -327,21 +345,31 @@ func (*host) decodeUplink(
normalizedPayload[i] = pb
}
// Validate the normalized payload.
_, err := normalizedpayload.Parse(normalizedPayload)
normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload)
if err != nil {
return errOutput.WithCause(err)
}
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, normalized.Warnings
msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements))
for i, measurement := range normalizedMeasurements {
msg.NormalizedPayload[i] = measurement.Valid
}
msg.NormalizedPayloadWarnings = make([]string, 0, len(normalized.Warnings))
msg.NormalizedPayloadWarnings = append(msg.NormalizedPayloadWarnings, normalized.Warnings...)
msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements)
} else {
// If the normalizer is not set, the decoder may return already normalized payload.
// This is a best effort attempt to parse the decoded payload as normalized payload.
// If that does not return an error, the decoded payload is assumed to be normalized.
normalizedPayload := []*pbtypes.Struct{
decodedPayload,
}
_, err := normalizedpayload.Parse(normalizedPayload)
normalizedMeasurements, err := normalizedpayload.Parse(normalizedPayload)
if err == nil {
msg.NormalizedPayload, msg.NormalizedPayloadWarnings = normalizedPayload, nil
msg.NormalizedPayload = make([]*pbtypes.Struct, len(normalizedMeasurements))
for i, measurement := range normalizedMeasurements {
msg.NormalizedPayload[i] = measurement.Valid
}
msg.NormalizedPayloadWarnings = appendValidationErrors(msg.NormalizedPayloadWarnings, normalizedMeasurements)
}
}
return nil
Expand Down
29 changes: 24 additions & 5 deletions pkg/messageprocessors/javascript/javascript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func TestDecodeUplink(t *testing.T) {
a.So(message.NormalizedPayload, should.HaveLength, 1)
measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{
a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{
Air: &normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestDecodeUplink(t *testing.T) {

measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
a.So(measurements[0], should.Resemble, normalizedpayload.Measurement{
a.So(measurements[0].Measurement, should.Resemble, normalizedpayload.Measurement{
Air: &normalizedpayload.Air{
Temperature: float64Ptr(-21.3),
},
Expand Down Expand Up @@ -450,8 +450,12 @@ func TestDecodeUplink(t *testing.T) {
})

a.So(message.NormalizedPayload, should.HaveLength, 2)
measurements, err := normalizedpayload.Parse(message.NormalizedPayload)
parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements))
for i, m := range parsedMeasurements {
measurements[i] = m.Measurement
}
a.So(measurements, should.Resemble, []normalizedpayload.Measurement{
{
Air: &normalizedpayload.Air{
Expand Down Expand Up @@ -539,8 +543,23 @@ func TestDecodeUplink(t *testing.T) {
}
`
err := host.DecodeUplink(ctx, ids, nil, message, script)
a.So(err, should.NotBeNil)
a.So(errors.IsInvalidArgument(err), should.BeTrue)
a.So(err, should.BeNil)

a.So(message.NormalizedPayload, should.HaveLength, 1)
parsedMeasurements, err := normalizedpayload.Parse(message.NormalizedPayload)
a.So(err, should.BeNil)
measurements := make([]normalizedpayload.Measurement, len(parsedMeasurements))
for i, m := range parsedMeasurements {
measurements[i] = m.Measurement
}
a.So(measurements, should.Resemble, []normalizedpayload.Measurement{
{
Air: &normalizedpayload.Air{},
johanstokking marked this conversation as resolved.
Show resolved Hide resolved
},
})
a.So(message.NormalizedPayloadWarnings, should.Resemble, []string{
"measurement 1: `air.temperature` should be equal or greater than `-273.15`",
})
}

// The Things Node example.
Expand Down
82 changes: 57 additions & 25 deletions pkg/messageprocessors/normalizedpayload/uplink.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,34 @@ type Measurement struct {

var (
errFieldType = errors.DefineInvalidArgument("field_type", "invalid field type of `{path}`")
errFieldMinimum = errors.DefineInvalidArgument(
errFieldMinimum = errors.DefineDataLoss(
"field_minimum",
"`{path}` should be equal or greater than `{minimum}`",
)
//nolint:unused
errFieldExclusiveMinimum = errors.DefineInvalidArgument(
errFieldExclusiveMinimum = errors.DefineDataLoss(
"field_exclusive_minimum",
"`{path}` should be greater than `{minimum}`",
)
errFieldMaximum = errors.DefineInvalidArgument(
errFieldMaximum = errors.DefineDataLoss(
"field_maximum",
"`{path}` should be equal or less than `{maximum}`",
)
errFieldExclusiveMaximum = errors.DefineInvalidArgument(
errFieldExclusiveMaximum = errors.DefineDataLoss(
"field_exclusive_maximum",
"`{path}` should be less than `{maximum}`",
)
errUnknownField = errors.DefineInvalidArgument("unknown_field", "unknown field `{path}`")
)

type fieldParser func(dst *Measurement, src *pbtypes.Value, path string) error
type fieldParser func(dst *Measurement, src *pbtypes.Value, path string) []error

// object validates that the path is a structure and sets the target to an empty value.
func object[T any](selector func(*Measurement) **T) fieldParser {
return func(dst *Measurement, src *pbtypes.Value, path string) error {
return func(dst *Measurement, src *pbtypes.Value, path string) []error {
_, ok := src.Kind.(*pbtypes.Value_StructValue)
if !ok {
return errFieldType.WithAttributes("path", path)
return []error{errFieldType.WithAttributes("path", path)}
}
*selector(dst) = new(T)
return nil
Expand All @@ -82,28 +82,28 @@ func object[T any](selector func(*Measurement) **T) fieldParser {

type fieldValidator[T any] func(v T, path string) error

func validate[T any](val T, validators []fieldValidator[T], path string) error {
func validate[T any](val T, validators []fieldValidator[T], path string) (errs []error) {
for _, v := range validators {
if err := v(val, path); err != nil {
return err
errs = append(errs, err)
}
}
return nil
return errs
}

// parseTime parses and validates the time. The input value must be RFC3339.
func parseTime(selector func(dst *Measurement) **time.Time, vals ...fieldValidator[time.Time]) fieldParser {
return func(dst *Measurement, src *pbtypes.Value, path string) error {
return func(dst *Measurement, src *pbtypes.Value, path string) []error {
val, ok := src.Kind.(*pbtypes.Value_StringValue)
if !ok {
return errFieldType.WithAttributes("path", path)
return []error{errFieldType.WithAttributes("path", path)}
}
t, err := time.Parse(time.RFC3339Nano, val.StringValue)
if err != nil {
return err
return []error{err}
}
if err := validate(t, vals, path); err != nil {
return err
if validateErrs := validate(t, vals, path); len(validateErrs) > 0 {
return validateErrs
}
*selector(dst) = &t
return nil
Expand All @@ -112,14 +112,14 @@ func parseTime(selector func(dst *Measurement) **time.Time, vals ...fieldValidat

// parseNumber parses and validates a number.
func parseNumber(selector func(dst *Measurement) **float64, vals ...fieldValidator[float64]) fieldParser {
return func(dst *Measurement, src *pbtypes.Value, path string) error {
return func(dst *Measurement, src *pbtypes.Value, path string) []error {
val, ok := src.Kind.(*pbtypes.Value_NumberValue)
if !ok {
return errFieldType.WithAttributes("path", path)
return []error{errFieldType.WithAttributes("path", path)}
}
n := val.NumberValue
if err := validate(n, vals, path); err != nil {
return err
if validateErrs := validate(n, vals, path); len(validateErrs) > 0 {
return validateErrs
}
*selector(dst) = &n
return nil
Expand Down Expand Up @@ -231,10 +231,22 @@ var fieldParsers = map[string]fieldParser{
),
}

// ParsedMeasurement is the result of parsing measurements with Parse.
type ParsedMeasurement struct {
Measurement
// ValidationErrors contains any errors that occurred during field validation.
ValidationErrors []error
// Valid only contains the valid fields, for which there were no validation errors.
Valid *pbtypes.Struct
}

// Parse parses and validates the measurements.
func Parse(measurements []*pbtypes.Struct) ([]Measurement, error) {
res := make([]Measurement, len(measurements))
func Parse(measurements []*pbtypes.Struct) ([]ParsedMeasurement, error) {
res := make([]ParsedMeasurement, len(measurements))
for i, src := range measurements {
res[i].Valid = &pbtypes.Struct{
Fields: make(map[string]*pbtypes.Value),
}
err := parse(&res[i], src, "")
if err != nil {
return nil, err
Expand All @@ -243,21 +255,41 @@ func Parse(measurements []*pbtypes.Struct) ([]Measurement, error) {
return res, nil
}

func parse(dst *Measurement, src *pbtypes.Struct, prefix string) error {
func parse(dst *ParsedMeasurement, src *pbtypes.Struct, prefix string) error {
for k, v := range src.GetFields() {
path := fmt.Sprintf("%s%s", prefix, k)
parser, ok := fieldParsers[path]
if !ok {
return errUnknownField.WithAttributes("path", path)
}
if err := parser(dst, v, path); err != nil {
return err
if errs := parser(&dst.Measurement, v, path); errs != nil {
for _, err := range errs {
if !errors.IsDataLoss(err) {
return err
}
dst.ValidationErrors = append(dst.ValidationErrors, err)
}
continue
}
validFieldValue := v
if s, ok := v.Kind.(*pbtypes.Value_StructValue); ok {
if err := parse(dst, s.StructValue, path+"."); err != nil {
nested := &ParsedMeasurement{
Measurement: dst.Measurement,
Valid: &pbtypes.Struct{
Fields: make(map[string]*pbtypes.Value),
},
}
if err := parse(nested, s.StructValue, path+"."); err != nil {
return err
}
dst.ValidationErrors = append(dst.ValidationErrors, nested.ValidationErrors...)
validFieldValue = &pbtypes.Value{
Kind: &pbtypes.Value_StructValue{
StructValue: nested.Valid,
},
}
}
dst.Valid.Fields[k] = validFieldValue
}
return nil
}
22 changes: 22 additions & 0 deletions pkg/messageprocessors/normalizedpayload/uplink_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright © 2022 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package normalizedpayload

var (
ErrFieldMinimum = errFieldMinimum
ErrFieldExclusiveMinimum = errFieldExclusiveMinimum
ErrFieldMaximum = errFieldMaximum
ErrFieldExclusiveMaximum = errFieldExclusiveMaximum
)
51 changes: 44 additions & 7 deletions pkg/messageprocessors/normalizedpayload/uplink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ func TestUplink(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
normalizedPayload []*pbtypes.Struct
expected []normalizedpayload.Measurement
errorAssertion func(error) bool
name string
normalizedPayload []*pbtypes.Struct
expected []normalizedpayload.Measurement
expectedValidationErrors [][]error
errorAssertion func(error) bool
}{
{
name: "single timestamp",
Expand Down Expand Up @@ -133,7 +134,19 @@ func TestUplink(t *testing.T) {
},
},
},
errorAssertion: errors.IsInvalidArgument,
expected: []normalizedpayload.Measurement{
{
Air: &normalizedpayload.Air{},
},
},
expectedValidationErrors: [][]error{
{
normalizedpayload.ErrFieldMinimum.WithAttributes(
"path", "air.temperature",
"minimum", -273.15,
),
},
},
},
{
name: "invalid direction",
Expand All @@ -156,7 +169,19 @@ func TestUplink(t *testing.T) {
},
},
},
errorAssertion: errors.IsInvalidArgument,
expected: []normalizedpayload.Measurement{
{
Wind: &normalizedpayload.Wind{},
},
},
expectedValidationErrors: [][]error{
{
normalizedpayload.ErrFieldExclusiveMaximum.WithAttributes(
"path", "wind.direction",
"maximum", 360.0,
),
},
},
},
{
name: "invalid type",
Expand Down Expand Up @@ -216,7 +241,19 @@ func TestUplink(t *testing.T) {
a.So(tc.errorAssertion(err), should.BeTrue)
} else {
a.So(err, should.BeNil)
a.So(measurements, should.Resemble, tc.expected)
if !a.So(measurements, should.HaveLength, len(tc.expected)) {
t.FailNow()
}
for i, parsed := range measurements {
if len(parsed.ValidationErrors) > 0 {
a.So(len(tc.expectedValidationErrors), should.BeGreaterThanOrEqualTo, i+1)
a.So(parsed.ValidationErrors, should.HaveLength, len(tc.expectedValidationErrors[i]))
for j, err := range parsed.ValidationErrors {
a.So(err, should.EqualErrorOrDefinition, tc.expectedValidationErrors[i][j])
}
}
a.So(parsed.Measurement, should.Resemble, tc.expected[i])
}
}
})
}
Expand Down
Loading