Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Add Collector Payload Thrift encoding / decoding transformations #251

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
transform {
use "spCollectorPayloadThriftToJSON" {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
transform {
use "spCollectorPayloadThriftToJSON" {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
transform {
use "spJSONToCollectorPayloadThrift" {
# Whether the output thrift should be further encoded with base64
base_64_encode = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
transform {
use "spJSONToCollectorPayloadThrift" {
}
}
10 changes: 6 additions & 4 deletions docs/configuration_transformations_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,10 @@ func TestScriptTransformationCustomScripts(t *testing.T) {
testLuaScriptCompiles(t, file)
case ".hcl":
isFull := strings.Contains(file, "full-example")

testTransformationConfig(t, file, isFull)
case "":
// If there's no extension, fail the test.
assert.Fail("File with no extension found: %v", file)

default:
// Otherwise it's likely a typo or error.
assert.Fail("unexpected file extension found: %v", file)
Expand Down Expand Up @@ -141,6 +139,12 @@ func testTransformationConfig(t *testing.T, filepath string, fullExample bool) {
configObject = &transform.SetPkConfig{}
case "spEnrichedToJson":
configObject = &transform.EnrichedToJSONConfig{}
case "spCollectorPayloadThriftToJSON":
configObject = &transform.CollectorPayloadThriftToJSONConfig{}
case "spJSONToCollectorPayloadThrift":
configObject = &transform.JSONToCollectorPayloadThriftConfig{}
case "JSONManipulator":
configObject = &transform.JSONManipulatorConfig{}
case "js":
configObject = &engine.JSEngineConfig{}
case "lua":
Expand Down Expand Up @@ -183,9 +187,7 @@ func testJSScriptCompiles(t *testing.T, scriptPath string) {
assert.NotNil(jsTransformationFunc, scriptPath)
if err != nil {
t.Fatalf("JSConfigFunction failed with error: %s. Script: %s", err.Error(), string(scriptPath))

}

}

func testLuaScriptCompiles(t *testing.T, scriptPath string) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
)

require (
github.com/apache/thrift v0.17.0
github.com/davecgh/go-spew v1.1.1
github.com/dop251/goja v0.0.0-20220722151623-4765a9872229
github.com/hashicorp/hcl/v2 v2.13.0
Expand Down
52 changes: 2 additions & 50 deletions go.sum

Large diffs are not rendered by default.

212 changes: 212 additions & 0 deletions pkg/transform/json_manipulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
)

const (
snowplowPayloadDataSchema = "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4"
)

// JSONManipulatorConfig is a configuration object for the JSONManipulator transformation
type JSONManipulatorConfig struct {
KeyRename map[string]string `hcl:"key_rename"`
KeyValueFunc map[string]string `hcl:"key_value_func"`
}

// JSONManipulatorAdapter is a configuration object for the JSONManipulator transformation
type JSONManipulatorAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f JSONManipulatorAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f JSONManipulatorAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &JSONManipulatorConfig{
KeyRename: make(map[string]string),
KeyValueFunc: make(map[string]string),
}

return cfg, nil
}

// JSONManipulatorAdapterGenerator returns a JSONManipulator transformation adapter.
func JSONManipulatorAdapterGenerator(f func(c *JSONManipulatorConfig) (TransformationFunction, error)) JSONManipulatorAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*JSONManipulatorConfig)
if !ok {
return nil, errors.New("invalid input, expected JSONManipulatorConfig")
}

return f(cfg)
}
}

// JSONManipulatorConfigFunction returns an JSONManipulator transformation function, from an JSONManipulatorConfig.
func JSONManipulatorConfigFunction(c *JSONManipulatorConfig) (TransformationFunction, error) {
return NewJSONManipulator(
c.KeyRename,
c.KeyValueFunc,
)
}

// JSONManipulatorConfigPair is a configuration pair for the JSONManipulator transformation
var JSONManipulatorConfigPair = config.ConfigurationPair{
Name: "JSONManipulator",
Handle: JSONManipulatorAdapterGenerator(JSONManipulatorConfigFunction),
}

// --- Manipulator Value Functions

// toEpochMillis attempts to convert an RFC3339 string to a Unix Timestamp in milliseconds
func toEpochMillis(v interface{}) (int64, error) {
switch v.(type) {
case string:
vTime, err := time.Parse(time.RFC3339, v.(string))
if err != nil {
return -1, err
}
return vTime.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)), nil
default:
return -1, errors.New(fmt.Sprintf("input value for 'toEpochMillis' must be a 'string' was '%T'", v))
}
}

// stringValueFromMap tries to extract a value from a map and then casts the
// value to a string type or returns an error
func stringValueFromMap(input map[string]interface{}, key string) (string, error) {
keyValue, ok := input[key]
if !ok {
return "", errors.New(fmt.Sprintf("key '%s' does not exist in input map", key))
}
keyValueStr, ok := keyValue.(string)
if !ok {
return "", errors.New(fmt.Sprintf("key '%s' must be a 'string' was '%T'", key, keyValue))
}
return keyValueStr, nil
}

// toSnowplowPayloadData converts an input []map[string]string which contains
// a set of key-value pair objects into a valid Snowplow Payload data structure
// encoded as a JSON string
func toSnowplowPayloadData(v interface{}) (string, error) {
switch v.(type) {
case []interface{}:
dataMap := make(map[string]string)
for _, param := range v.([]interface{}) {
paramMap, ok := param.(map[string]interface{})
if !ok {
return "", errors.New(fmt.Sprintf("input values for 'toSnowplowPayloadData' within array must be a 'map[string]interface {}' was '%T'", param))
}
name, err := stringValueFromMap(paramMap, "name")
if err != nil {
return "", err
}
value, err := stringValueFromMap(paramMap, "value")
if err != nil {
return "", err
}
dataMap[name] = value
}

dataArray := make([]map[string]string, 0)
dataArray = append(dataArray, dataMap)

snowplowPayload := make(map[string]interface{})
snowplowPayload["schema"] = snowplowPayloadDataSchema
snowplowPayload["data"] = dataArray

snowplowPayloadStr, err := json.Marshal(snowplowPayload)
if err != nil {
return "", err
}
return string(snowplowPayloadStr), nil
default:
return "", errors.New(fmt.Sprintf("input value for 'toSnowplowPayloadData' must be a '[]interface {}' was '%T'", v))
}
}

// --- Manipulator Functions

// mapKeyRename takes an input map and renames keys it finds in the replace instructions
func mapKeyRename(input map[string]interface{}, keyRename map[string]string) map[string]interface{} {
for old, new := range keyRename {
if _, ok := input[old]; ok {
input[new] = input[old]
delete(input, old)
}
}
return input
}

// mapKeyValueFunc runs pre-defined functions against a value specified by the input key
func mapKeyValueFunc(input map[string]interface{}, keyValueFunc map[string]string) (map[string]interface{}, error) {
for key, funcToRun := range keyValueFunc {
var valNew interface{}
var err error
if val, ok := input[key]; ok {
switch funcToRun {
case "toEpochMillis":
valNew, err = toEpochMillis(val)
case "toSnowplowPayloadData":
valNew, err = toSnowplowPayloadData(val)
default:
return nil, errors.New(fmt.Sprintf("value func '%s' is not defined", funcToRun))
}
}
if err != nil {
return nil, err
}
input[key] = valNew
}
return input, nil
}

// NewJSONManipulator returns a transformation implementation to transform an input JSON string according to the configured manipulation
// instructions provided in the configuration
func NewJSONManipulator(keyRename map[string]string, keyValueFunc map[string]string) (TransformationFunction, error) {
return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
// 1. Unmarshal inbound message to a map
var input map[string]interface{}
unmarshallErr := json.Unmarshal(message.Data, &input)
if unmarshallErr != nil {
message.SetError(unmarshallErr)
return nil, nil, message, nil
}

// 2. Rename keys in input JSON
renamed := mapKeyRename(input, keyRename)

// 3. Apply value functions on renamed JSON
manipulated, valueFuncErr := mapKeyValueFunc(renamed, keyValueFunc)
if valueFuncErr != nil {
message.SetError(valueFuncErr)
return nil, nil, message, nil
}

// 4. Marshal back to a JSON string
res, jsonErr := json.Marshal(manipulated)
if jsonErr != nil {
message.SetError(jsonErr)
return nil, nil, message, nil
}
message.Data = res
return message, nil, nil, intermediateState
}, nil
}
83 changes: 83 additions & 0 deletions pkg/transform/snowplow_collector_payload_thrift_to_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"context"
"errors"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"

collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
)

// CollectorPayloadThriftToJSONConfig is a configuration object for the spCollectorPayloadThriftToJSON transformation
type CollectorPayloadThriftToJSONConfig struct {
}

type collectorPayloadThriftToJSONAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f collectorPayloadThriftToJSONAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f collectorPayloadThriftToJSONAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &CollectorPayloadThriftToJSONConfig{}

return cfg, nil
}

// adapterGenerator returns a spCollectorPayloadThriftToJSON transformation adapter.
func collectorPayloadThriftToJSONAdapterGenerator(f func(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error)) collectorPayloadThriftToJSONAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*CollectorPayloadThriftToJSONConfig)
if !ok {
return nil, errors.New("invalid input, expected collectorPayloadThriftToJSONConfig")
}

return f(cfg)
}
}

// collectorPayloadThriftToJSONConfigFunction returns an spCollectorPayloadThriftToJSON transformation function, from an collectorPayloadThriftToJSONConfig.
func collectorPayloadThriftToJSONConfigFunction(c *CollectorPayloadThriftToJSONConfig) (TransformationFunction, error) {
return SpCollectorPayloadThriftToJSON, nil
}

// CollectorPayloadThriftToJSONConfigPair is a configuration pair for the spCollectorPayloadThriftToJSON transformation
var CollectorPayloadThriftToJSONConfigPair = config.ConfigurationPair{
Name: "spCollectorPayloadThriftToJSON",
Handle: collectorPayloadThriftToJSONAdapterGenerator(collectorPayloadThriftToJSONConfigFunction),
}

// SpCollectorPayloadThriftToJSON is a specific transformation implementation to transform a Thrift encoded Collector Payload
// to a JSON string representation.
func SpCollectorPayloadThriftToJSON(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
ctx := context.Background()

// Deserialize the Collector Payload to a struct
res, deserializeErr := collectorpayload.BinaryDeserializer(ctx, message.Data)
if deserializeErr != nil {
message.SetError(deserializeErr)
return nil, nil, message, nil
}

// Re-encode as a JSON string to be able to leverage it downstream
resJSON, jsonErr := collectorpayload.ToJSON(res)
if jsonErr != nil {
message.SetError(jsonErr)
return nil, nil, message, nil
}

message.Data = resJSON
return message, nil, nil, intermediateState
}
Loading