Skip to content

Commit

Permalink
WIP transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Jan 20, 2023
1 parent 9b0d64a commit ffbba78
Showing 1 changed file with 84 additions and 0 deletions.
84 changes: 84 additions & 0 deletions pkg/transform/snowplow_raw_to_collector_payload_thrift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
//
// Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Snowplow Community License Version 1.0,
// and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
// You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0

package transform

import (
"context"
"errors"
"encoding/json"

"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"

collectorpayloadmodel1 "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload/gen-go/model1"
collectorpayload "github.com/snowplow/snowbridge/third_party/snowplow/collectorpayload"
)

// RawToCollectorPayloadThriftConfig is a configuration object for the spRawToCollectorPayloadThrift transformation
type RawToCollectorPayloadThriftConfig struct {
}

type rawToCollectorPayloadThriftAdapter func(i interface{}) (interface{}, error)

// Create implements the ComponentCreator interface.
func (f rawToCollectorPayloadThriftAdapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

// ProvideDefault implements the ComponentConfigurable interface
func (f rawToCollectorPayloadThriftAdapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &RawToCollectorPayloadThriftConfig{}

return cfg, nil
}

// adapterGenerator returns a spRawToCollectorPayloadThrift transformation adapter.
func rawToCollectorPayloadThriftAdapterGenerator(f func(c *RawToCollectorPayloadThriftConfig) (TransformationFunction, error)) rawToCollectorPayloadThriftAdapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*RawToCollectorPayloadThriftConfig)
if !ok {
return nil, errors.New("invalid input, expected rawToCollectorPayloadThriftConfig")
}

return f(cfg)
}
}

// rawToCollectorPayloadThriftConfigFunction returns an spRawToCollectorPayloadThrift transformation function, from an rawToCollectorPayloadThriftConfig.
func rawToCollectorPayloadThriftConfigFunction(c *RawToCollectorPayloadThriftConfig) (TransformationFunction, error) {
return SpRawToCollectorPayloadThrift, nil
}

// RawToCollectorPayloadThriftConfigPair is a configuration pair for the spRawToCollectorPayloadThrift transformation
var RawToCollectorPayloadThriftConfigPair = config.ConfigurationPair{
Name: "spRawToCollectorPayloadThrift",
Handle: rawToCollectorPayloadThriftAdapterGenerator(rawToCollectorPayloadThriftConfigFunction),
}

// SpRawToCollectorPayloadThrift is a specific transformation implementation to transform a raw message into a valid Thrift encoded Collector Payload
// so that it can be pushed directly into the egress stream of a Collector.
func SpRawToCollectorPayloadThrift(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
var p *collectorpayloadmodel1.CollectorPayload
unmarshallErr := json.Unmarshal(message.Data, &p)
if unmarshallErr != nil {
message.SetError(unmarshallErr)
return nil, nil, message, nil
}

ctx := context.Background()

res, serializeErr := collectorpayload.BinarySerializer(ctx, p)
if serializeErr != nil {
message.SetError(serializeErr)
return nil, nil, message, nil
}

message.Data = res
return message, nil, nil, intermediateState
}

0 comments on commit ffbba78

Please sign in to comment.