Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Jan 22, 2023
1 parent ffbba78 commit 8a836de
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/configuration_transformations_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) {
configObject = &transform.SetPkConfig{}
case "spEnrichedToJson":
configObject = &transform.EnrichedToJSONConfig{}
case "spCollectorPayloadThriftToRaw":
configObject = &transform.CollectorPayloadThriftToRawConfig{}
case "spRawToCollectorPayloadThrift":
configObject = &transform.RawToCollectorPayloadThriftConfig{}
case "js":
configObject = &engine.JSEngineConfig{}
case "lua":
Expand Down
76 changes: 76 additions & 0 deletions pkg/transform/snowplow_collector_payload_thrift_to_raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"context"
"errors"
"fmt"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"

collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
)

// CollectorPayloadThriftToRawConfig is a configuration object for the spCollectorPayloadThriftToRaw transformation
type CollectorPayloadThriftToRawConfig struct {
}

type collectorPayloadThriftToRawAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f collectorPayloadThriftToRawAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f collectorPayloadThriftToRawAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &CollectorPayloadThriftToRawConfig{}

return cfg, nil
}

// adapterGenerator returns a spCollectorPayloadThriftToRaw transformation adapter.
func collectorPayloadThriftToRawAdapterGenerator(f func(c *CollectorPayloadThriftToRawConfig) (TransformationFunction, error)) collectorPayloadThriftToRawAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*CollectorPayloadThriftToRawConfig)
if !ok {
return nil, errors.New("invalid input, expected collectorPayloadThriftToRawConfig")
}

return f(cfg)
}
}

// collectorPayloadThriftToRawConfigFunction returns an spCollectorPayloadThriftToRaw transformation function, from an collectorPayloadThriftToRawConfig.
func collectorPayloadThriftToRawConfigFunction(c *CollectorPayloadThriftToRawConfig) (TransformationFunction, error) {
return SpCollectorPayloadThriftToRaw, nil
}

// CollectorPayloadThriftToRawConfigPair is a configuration pair for the spCollectorPayloadThriftToRaw transformation
var CollectorPayloadThriftToRawConfigPair = config.ConfigurationPair{
Name: "spCollectorPayloadThriftToRaw",
Handle: collectorPayloadThriftToRawAdapterGenerator(collectorPayloadThriftToRawConfigFunction),
}

// SpCollectorPayloadThriftToRaw is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload
// so that it can be pushed directly into the egress stream of a Collector.
func SpCollectorPayloadThriftToRaw(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
ctx := context.Background()

res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data)
if deserializeErr != nil {
message.SetError(deserializeErr)
return nil, nil, message, nil
}

message.Data = []byte(fmt.Sprintf("%#v", res))
return message, nil, nil, intermediateState
}
2 changes: 2 additions & 0 deletions pkg/transform/transformconfig/transform_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var SupportedTransformations = []config.ConfigurationPair{
filter.ContextFilterConfigPair,
transform.SetPkConfigPair,
transform.EnrichedToJSONConfigPair,
transform.CollectorPayloadThriftToRawConfigPair,
transform.RawToCollectorPayloadThriftConfigPair,
engine.LuaConfigPair,
engine.JSConfigPair,
}
Expand Down

0 comments on commit 8a836de

Please sign in to comment.