Skip to content

Commit

Permalink
Add Collector Payload Thrift to/from JSON functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Jan 23, 2023
1 parent 7124e85 commit 767fd8a
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/configuration_transformations_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) {
configObject = &transform.SetPkConfig{}
case "spEnrichedToJson":
configObject = &transform.EnrichedToJSONConfig{}
case "spCollectorPayloadThriftToJSON":
configObject = &transform.CollectorPayloadThriftToJSONConfig{}
case "spJSONToCollectorPayloadThrift":
configObject = &transform.JSONToCollectorPayloadThriftConfig{}
case "js":
configObject = &engine.JSEngineConfig{}
case "lua":
Expand Down
83 changes: 83 additions & 0 deletions pkg/transform/snowplow_collector_payload_thrift_to_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"context"
"errors"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"

collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
)

// CollectorPayloadThriftToJSONConfig is a configuration object for the spCollectorPayloadThriftToJSON transformation
type CollectorPayloadThriftToJSONConfig struct {
}

type collectorPayloadThriftToJSONAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f collectorPayloadThriftToJSONAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f collectorPayloadThriftToJSONAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &CollectorPayloadThriftToJSONConfig{}

return cfg, nil
}

// adapterGenerator returns a spCollectorPayloadThriftToJSON transformation adapter.
func collectorPayloadThriftToJSONAdapterGenerator(f func(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error)) collectorPayloadThriftToJSONAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*CollectorPayloadThriftToJSONConfig)
if !ok {
return nil, errors.New("invalid input, expected collectorPayloadThriftToJSONConfig")
}

return f(cfg)
}
}

// collectorPayloadThriftToJSONConfigFunction returns an spCollectorPayloadThriftToJSON transformation function, from an collectorPayloadThriftToJSONConfig.
func collectorPayloadThriftToJSONConfigFunction(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error) {
return SpCollectorPayloadThriftToJSON, nil
}

// CollectorPayloadThriftToJSONConfigPair is a configuration pair for the spCollectorPayloadThriftToJSON transformation
var CollectorPayloadThriftToJSONConfigPair = config.ConfigurationPair{
Name: "spCollectorPayloadThriftToJSON",
Handle: collectorPayloadThriftToJSONAdapterGenerator(collectorPayloadThriftToJSONConfigFunction),
}

// SpCollectorPayloadThriftToJSON is a specific transformation implementation to transform a Thrift encoded Collector Payload
// to a JSON string representation.
func SpCollectorPayloadThriftToJSON(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
ctx := context.Background()

// Deserialize the Collector Payload to a struct
res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data)
if deserializeErr != nil {
message.SetError(deserializeErr)
return nil, nil, message, nil
}

// Re-encode as a JSON string to be able to leverage it downstream
resJSON, jsonErr := collectorpayload.ToJSON(res)
if jsonErr != nil {
message.SetError(jsonErr)
return nil, nil, message, nil
}

message.Data = resJSON
return message, nil, nil, intermediateState
}
85 changes: 85 additions & 0 deletions pkg/transform/snowplow_json_to_collector_payload_thrift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"context"
"encoding/json"
"errors"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"

collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
collectorpayloadmodel1 "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload/gen-go/model1"
)

// JSONToCollectorPayloadThriftConfig is a configuration object for the spJSONToCollectorPayloadThrift transformation
type JSONToCollectorPayloadThriftConfig struct {
}

// JSONToCollectorPayloadThriftAdapter is a configuration object for the spJSONToCollectorPayloadThrift transformation
type JSONToCollectorPayloadThriftAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f JSONToCollectorPayloadThriftAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f JSONToCollectorPayloadThriftAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &JSONToCollectorPayloadThriftConfig{}

return cfg, nil
}

// JSONToCollectorPayloadThriftAdapterGenerator returns a spJSONToCollectorPayloadThrift transformation adapter.
func JSONToCollectorPayloadThriftAdapterGenerator(f func(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error)) JSONToCollectorPayloadThriftAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*JSONToCollectorPayloadThriftConfig)
if !ok {
return nil, errors.New("invalid input, expected JSONToCollectorPayloadThriftConfig")
}

return f(cfg)
}
}

// JSONToCollectorPayloadThriftConfigFunction returns an spJSONToCollectorPayloadThrift transformation function, from an JSONToCollectorPayloadThriftConfig.
func JSONToCollectorPayloadThriftConfigFunction(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error) {
return SpJSONToCollectorPayloadThrift, nil
}

// JSONToCollectorPayloadThriftConfigPair is a configuration pair for the spJSONToCollectorPayloadThrift transformation
var JSONToCollectorPayloadThriftConfigPair = config.ConfigurationPair{
Name: "spJSONToCollectorPayloadThrift",
Handle: JSONToCollectorPayloadThriftAdapterGenerator(JSONToCollectorPayloadThriftConfigFunction),
}

// SpJSONToCollectorPayloadThrift is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload
// so that it can be pushed directly into the egress stream of a Collector.
func SpJSONToCollectorPayloadThrift(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
var p *collectorpayloadmodel1.CollectorPayload
unmarshallErr := json.Unmarshal(message.Data, &p)
if unmarshallErr != nil {
message.SetError(unmarshallErr)
return nil, nil, message, nil
}

ctx := context.Background()

res, serializeErr := collectorpayload.BinarySerializer(ctx, p)
if serializeErr != nil {
message.SetError(serializeErr)
return nil, nil, message, nil
}

message.Data = res
return message, nil, nil, intermediateState
}
2 changes: 2 additions & 0 deletions pkg/transform/transformconfig/transform_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var SupportedTransformations = []config.ConfigurationPair{
filter.ContextFilterConfigPair,
transform.SetPkConfigPair,
transform.EnrichedToJSONConfigPair,
transform.CollectorPayloadThriftToJSONConfigPair,
transform.JSONToCollectorPayloadThriftConfigPair,
engine.LuaConfigPair,
engine.JSConfigPair,
}
Expand Down

0 comments on commit 767fd8a

Please sign in to comment.