Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Jan 22, 2023
1 parent 8a836de commit f2d34c8
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 167 deletions.
8 changes: 4 additions & 4 deletions docs/configuration_transformations_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) {
configObject = &transform.SetPkConfig{}
case "spEnrichedToJson":
configObject = &transform.EnrichedToJSONConfig{}
case "spCollectorPayloadThriftToRaw":
configObject = &transform.CollectorPayloadThriftToRawConfig{}
case "spRawToCollectorPayloadThrift":
configObject = &transform.RawToCollectorPayloadThriftConfig{}
case "spCollectorPayloadThriftToJSON":
configObject = &transform.CollectorPayloadThriftToJSONConfig{}
case "spJSONToCollectorPayloadThrift":
configObject = &transform.jsonToCollectorPayloadThriftConfig{}
case "js":
configObject = &engine.JSEngineConfig{}
case "lua":
Expand Down
83 changes: 83 additions & 0 deletions pkg/transform/snowplow_collector_payload_thrift_to_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"context"
"errors"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"

collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
)

// CollectorPayloadThriftToJSONConfig is a configuration object for the spCollectorPayloadThriftToJSON transformation
type CollectorPayloadThriftToJSONConfig struct {
}

type collectorPayloadThriftToJSONAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f collectorPayloadThriftToJSONAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f collectorPayloadThriftToJSONAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &CollectorPayloadThriftToJSONConfig{}

return cfg, nil
}

// adapterGenerator returns a spCollectorPayloadThriftToJSON transformation adapter.
func collectorPayloadThriftToJSONAdapterGenerator(f func(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error)) collectorPayloadThriftToJSONAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*CollectorPayloadThriftToJSONConfig)
if !ok {
return nil, errors.New("invalid input, expected collectorPayloadThriftToJSONConfig")
}

return f(cfg)
}
}

// collectorPayloadThriftToJSONConfigFunction returns an spCollectorPayloadThriftToJSON transformation function, from an collectorPayloadThriftToJSONConfig.
func collectorPayloadThriftToJSONConfigFunction(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error) {
return SpCollectorPayloadThriftToJSON, nil
}

// CollectorPayloadThriftToJSONConfigPair is a configuration pair for the spCollectorPayloadThriftToJSON transformation
var CollectorPayloadThriftToJSONConfigPair = config.ConfigurationPair{
Name: "spCollectorPayloadThriftToJSON",
Handle: collectorPayloadThriftToJSONAdapterGenerator(collectorPayloadThriftToJSONConfigFunction),
}

// SpCollectorPayloadThriftToJSON is a specific transformation implementation to transform a Thrift encoded Collector Payload
// to a JSON string representation.
func SpCollectorPayloadThriftToJSON(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
ctx := context.Background()

// Deserialize the Collector Payload to a struct
res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data)
if deserializeErr != nil {
message.SetError(deserializeErr)
return nil, nil, message, nil
}

// Re-encode as a JSON string to be able to leverage it downstream
resJSON, jsonErr := collectorpayload.ToJSON(res)
if jsonErr != nil {
message.SetError(jsonErr)
return nil, nil, message, nil
}

message.Data = resJSON
return message, nil, nil, intermediateState
}
76 changes: 0 additions & 76 deletions pkg/transform/snowplow_collector_payload_thrift_to_raw.go

This file was deleted.

84 changes: 84 additions & 0 deletions pkg/transform/snowplow_json_to_collector_payload_thrift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"context"
"encoding/json"
"errors"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"

collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
collectorpayloadmodel1 "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload/gen-go/model1"
)

// JSONToCollectorPayloadThriftConfig is a configuration object for the spJSONToCollectorPayloadThrift transformation
type JSONToCollectorPayloadThriftConfig struct {
}

type jsonToCollectorPayloadThriftAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f jsonToCollectorPayloadThriftAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f jsonToCollectorPayloadThriftAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &JSONToCollectorPayloadThriftConfig{}

return cfg, nil
}

// adapterGenerator returns a spJSONToCollectorPayloadThrift transformation adapter.
func jsonToCollectorPayloadThriftAdapterGenerator(f func(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error)) jsonToCollectorPayloadThriftAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*JSONToCollectorPayloadThriftConfig)
if !ok {
return nil, errors.New("invalid input, expected jsonToCollectorPayloadThriftConfig")
}

return f(cfg)
}
}

// jsonToCollectorPayloadThriftConfigFunction returns an spJSONToCollectorPayloadThrift transformation function, from an jsonToCollectorPayloadThriftConfig.
func jsonToCollectorPayloadThriftConfigFunction(c *JSONToCollectorPayloadThriftConfig) (TransformationFunction, error) {
return SpJSONToCollectorPayloadThrift, nil
}

// JSONToCollectorPayloadThriftConfigPair is a configuration pair for the spJSONToCollectorPayloadThrift transformation
var JSONToCollectorPayloadThriftConfigPair = config.ConfigurationPair{
Name: "spJSONToCollectorPayloadThrift",
Handle: jsonToCollectorPayloadThriftAdapterGenerator(jsonToCollectorPayloadThriftConfigFunction),
}

// SpJSONToCollectorPayloadThrift is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload
// so that it can be pushed directly into the egress stream of a Collector.
func SpJSONToCollectorPayloadThrift(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
var p *collectorpayloadmodel1.CollectorPayload
unmarshallErr := json.Unmarshal(message.Data, &p)
if unmarshallErr != nil {
message.SetError(unmarshallErr)
return nil, nil, message, nil
}

ctx := context.Background()

res, serializeErr := collectorpayload.BinarySerializer(ctx, p)
if serializeErr != nil {
message.SetError(serializeErr)
return nil, nil, message, nil
}

message.Data = res
return message, nil, nil, intermediateState
}
84 changes: 0 additions & 84 deletions pkg/transform/snowplow_raw_to_collector_payload_thrift.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/transform/transformconfig/transform_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ var SupportedTransformations = []config.ConfigurationPair{
filter.ContextFilterConfigPair,
transform.SetPkConfigPair,
transform.EnrichedToJSONConfigPair,
transform.CollectorPayloadThriftToRawConfigPair,
transform.RawToCollectorPayloadThriftConfigPair,
transform.CollectorPayloadThriftToJSONConfigPair,
transform.JSONToCollectorPayloadThriftConfigPair,
engine.LuaConfigPair,
engine.JSConfigPair,
}
Expand Down
19 changes: 18 additions & 1 deletion third_party/snowplow/collectorpayload/collector_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package collectorpayload

import (
"context"
"encoding/base64"
"encoding/json"

thrift "github.com/apache/thrift/lib/go/thrift"

Expand All @@ -30,6 +32,16 @@ func BinarySerializer(ctx context.Context, collectorPayload *model1.CollectorPay

// BinaryDeserializer deserializes a CollectorPayload byte array back to a struct
func BinaryDeserializer(ctx context.Context, collectorPayloadBytes []byte) (*model1.CollectorPayload, error) {
var inputBytes []byte

// Attempt to decode from base64 as most payloads will arrive with the thrift string re-encoded
base64DecodedCollectorPayload, base64Err := base64.StdEncoding.DecodeString(string(collectorPayloadBytes))
if base64Err != nil {
inputBytes = collectorPayloadBytes
} else {
inputBytes = []byte(base64DecodedCollectorPayload)
}

t := thrift.NewTMemoryBufferLen(1024)
p := thrift.NewTBinaryProtocolFactoryDefault().GetProtocol(t)

Expand All @@ -39,7 +51,12 @@ func BinaryDeserializer(ctx context.Context, collectorPayloadBytes []byte) (*mod
}

collectorPayload := model1.NewCollectorPayload()
err := deserializer.Read(ctx, collectorPayload, collectorPayloadBytes)
err := deserializer.Read(ctx, collectorPayload, inputBytes)

return collectorPayload, err
}

// ToJSON converts the collector payload struct to a JSON representation for simpler portability
func ToJSON(collectorPayload *model1.CollectorPayload) ([]byte, error) {
return json.Marshal(collectorPayload)
}

0 comments on commit f2d34c8

Please sign in to comment.