Skip to content

Commit

Permalink
Add custom transformation layer (closes #146)
Browse files Browse the repository at this point in the history
  • Loading branch information
adatzer authored and colmsnowplow committed Jul 1, 2022
1 parent ef3cd6a commit 2e319b5
Show file tree
Hide file tree
Showing 22 changed files with 5,639 additions and 125 deletions.
3 changes: 2 additions & 1 deletion cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/target/targetiface"
"github.com/snowplow-devops/stream-replicator/pkg/telemetry"
"github.com/snowplow-devops/stream-replicator/pkg/transform"
"github.com/snowplow-devops/stream-replicator/pkg/transform/transformconfig"
)

const (
Expand Down Expand Up @@ -80,7 +81,7 @@ func RunCli(supportedSourceConfigPairs []sourceconfig.ConfigPair) {
return err
}

tr, err := cfg.GetTransformations()
tr, err := transformconfig.GetTransformations(cfg)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/snowplow-devops/stream-replicator/pkg/models"
"github.com/snowplow-devops/stream-replicator/pkg/transform/transformconfig"
)

// ServerlessRequestHandler is a common function for all
Expand All @@ -34,7 +35,7 @@ func ServerlessRequestHandler(messages []*models.Message) error {
}
t.Open()

tr, err := cfg.GetTransformations()
tr, err := transformconfig.GetTransformations(cfg)
if err != nil {
return err
}
Expand Down
93 changes: 39 additions & 54 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/statsreceiver/statsreceiveriface"
"github.com/snowplow-devops/stream-replicator/pkg/target"
"github.com/snowplow-devops/stream-replicator/pkg/target/targetiface"
"github.com/snowplow-devops/stream-replicator/pkg/transform"
)

// Config holds the configuration data along with the decoder to decode them
Expand All @@ -36,15 +35,15 @@ type Config struct {

// ConfigurationData for holding all configuration options
type ConfigurationData struct {
Source *Component `hcl:"source,block" envPrefix:"SOURCE_"`
Target *Component `hcl:"target,block" envPrefix:"TARGET_"`
FailureTarget *FailureConfig `hcl:"failure_target,block"`
Sentry *SentryConfig `hcl:"sentry,block"`
StatsReceiver *StatsConfig `hcl:"stats_receiver,block"`
Transformation string `hcl:"message_transformation,optional" env:"MESSAGE_TRANSFORMATION"`
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
GoogleServiceAccountB64 string `hcl:"google_application_credentials_b64,optional" env:"GOOGLE_APPLICATION_CREDENTIALS_B64"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
Source *Component `hcl:"source,block" envPrefix:"SOURCE_"`
Target *Component `hcl:"target,block" envPrefix:"TARGET_"`
FailureTarget *FailureConfig `hcl:"failure_target,block"`
Sentry *SentryConfig `hcl:"sentry,block"`
StatsReceiver *StatsConfig `hcl:"stats_receiver,block"`
Transform *TransformConfig `hcl:"transform,block"`
LogLevel string `hcl:"log_level,optional" env:"LOG_LEVEL"`
GoogleServiceAccountB64 string `hcl:"google_application_credentials_b64,optional" env:"GOOGLE_APPLICATION_CREDENTIALS_B64"`
UserProvidedID string `hcl:"user_provided_id,optional" env:"USER_PROVIDED_ID"`
}

// Component is a type to abstract over configuration blocks.
Expand Down Expand Up @@ -80,6 +79,12 @@ type StatsConfig struct {
BufferSec int `hcl:"buffer_sec,optional" env:"STATS_RECEIVER_BUFFER_SEC"`
}

// TransformConfig holds configuration for tranformations.
type TransformConfig struct {
Message string `hcl:"message_transformation,optional" env:"MESSAGE_TRANSFORMATION"`
Layer *Use `hcl:"use,block" envPrefix:"TRANSFORMATION_LAYER_"`
}

// defaultConfigData returns the initial main configuration target.
func defaultConfigData() *ConfigurationData {
return &ConfigurationData{
Expand All @@ -98,8 +103,11 @@ func defaultConfigData() *ConfigurationData {
TimeoutSec: 1,
BufferSec: 15,
},
Transformation: "none",
LogLevel: "info",
Transform: &TransformConfig{
Message: "none",
Layer: &Use{},
},
LogLevel: "info",
}
}

Expand Down Expand Up @@ -299,48 +307,6 @@ func (c *Config) GetFailureTarget(AppName string, AppVersion string) (failureifa
return nil, fmt.Errorf("could not interpret failure target configuration for %q", useFailureTarget.Name)
}

// GetTransformations builds and returns transformationApplyFunction from the transformations configured
func (c *Config) GetTransformations() (transform.TransformationApplyFunction, error) {
funcs := make([]transform.TransformationFunction, 0, 0)

// Parse list of transformations
transformations := strings.Split(c.Data.Transformation, ",")

for _, transformation := range transformations {
// Parse function name-option sets
funcOpts := strings.Split(transformation, ":")

switch funcOpts[0] {
case "spEnrichedToJson":
funcs = append(funcs, transform.SpEnrichedToJSON)
case "spEnrichedSetPk":
funcs = append(funcs, transform.NewSpEnrichedSetPkFunction(funcOpts[1]))
case "spEnrichedFilter":
filterFunc, err := transform.NewSpEnrichedFilterFunction(funcOpts[1])
if err != nil {
return nil, err
}
funcs = append(funcs, filterFunc)
case "spEnrichedFilterContext":
filterFunc, err := transform.NewSpEnrichedFilterFunctionContext(funcOpts[1])
if err != nil {
return nil, err
}
funcs = append(funcs, filterFunc)
case "spEnrichedFilterUnstructEvent":
filterFunc, err := transform.NewSpEnrichedFilterFunctionUnstructEvent(funcOpts[1])
if err != nil {
return nil, err
}
funcs = append(funcs, filterFunc)
case "none":
default:
return nil, errors.New(fmt.Sprintf("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got '%s'", c.Data.Transformation))
}
}
return transform.NewTransformation(funcs...), nil
}

// GetTags returns a list of tags to use in identifying this instance of stream-replicator with enough
// entropy so as to avoid collisions as it should not be possible to have both the host and process_id be
// the same.
Expand Down Expand Up @@ -398,3 +364,22 @@ func (c *Config) GetStatsReceiver(tags map[string]string) (statsreceiveriface.St
return nil, errors.New(fmt.Sprintf("Invalid stats receiver found; expected one of 'statsd' and got '%s'", useReceiver.Name))
}
}

// ProvideTransformMessage implements transformconfig.configProvider
func (c *Config) ProvideTransformMessage() string {
return c.Data.Transform.Message
}

// ProvideTransformLayerName implements transformconfig.configProvider
func (c *Config) ProvideTransformLayerName() string {
return c.Data.Transform.Layer.Name
}

// ProvideTransformComponent implements transformconfig.configProvider
func (c *Config) ProvideTransformComponent(p Pluggable) (interface{}, error) {
decoderOpts := &DecoderOptions{
Input: c.Data.Transform.Layer.Body,
}

return c.CreateComponent(p, decoderOpts)
}
117 changes: 66 additions & 51 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"testing"

"github.com/snowplow-devops/stream-replicator/pkg/transform"
"github.com/stretchr/testify/assert"
)

Expand All @@ -26,7 +27,7 @@ func TestNewConfig(t *testing.T) {

assert.Equal("info", c.Data.LogLevel)
assert.Equal("stdout", c.Data.Target.Use.Name)
assert.Equal("none", c.Data.Transform.Transformation)
assert.Equal("none", c.Data.Transform.Message)
assert.Equal("stdin", c.Data.Source.Use.Name)

// Tests on sources moved to the source package.
Expand All @@ -35,10 +36,6 @@ func TestNewConfig(t *testing.T) {
assert.NotNil(target)
assert.Nil(err)

transformation, err := c.GetTransformations()
assert.NotNil(transformation)
assert.Nil(err)

failureTarget, err := c.GetFailureTarget("testAppName", "0.0.0")
assert.NotNil(failureTarget)
assert.Nil(err)
Expand Down Expand Up @@ -83,44 +80,6 @@ func TestNewConfig_FromEnvInvalid(t *testing.T) {
assert.NotNil(err)
}

func TestNewConfig_InvalidTransformation(t *testing.T) {
assert := assert.New(t)

defer os.Unsetenv("MESSAGE_TRANSFORMATION")

os.Setenv("MESSAGE_TRANSFORMATION", "fake")

c, err := NewConfig()
assert.NotNil(c)
if err != nil {
t.Fatalf("function NewConfig failed with error: %q", err.Error())
}

transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got 'fake'", err.Error())
}

func TestNewConfig_FilterFailure(t *testing.T) {
assert := assert.New(t)

defer os.Unsetenv("MESSAGE_TRANSFORMATION")

os.Setenv("MESSAGE_TRANSFORMATION", "spEnrichedFilter:incompatibleArg")

c, err := NewConfig()
assert.NotNil(c)
if err != nil {
t.Fatalf("function NewConfig failed with error: %q", err.Error())
}

transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal(`invalid filter function config, must be of the format {field name}=={value}[|{value}|...] or {field name}!={value}[|{value}|...]`, err.Error())
}

func TestNewConfig_InvalidTarget(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -230,13 +189,6 @@ func TestNewConfig_Hcl_invalids(t *testing.T) {
t.Fatalf("function NewConfig failed with error: %q", err.Error())
}

t.Run("invalid_transformation", func(t *testing.T) {
transformation, err := c.GetTransformations()
assert.Nil(transformation)
assert.NotNil(err)
assert.Equal("Invalid transformation found; expected one of 'spEnrichedToJson', 'spEnrichedSetPk:{option}', spEnrichedFilter:{option} and got 'fakeHCL'", err.Error())
})

t.Run("invalid_target", func(t *testing.T) {
target, err := c.GetTarget()
assert.Nil(target)
Expand Down Expand Up @@ -273,7 +225,7 @@ func TestNewConfig_Hcl_defaults(t *testing.T) {
assert.Equal(c.Data.Sentry.Debug, false)
assert.Equal(c.Data.StatsReceiver.TimeoutSec, 1)
assert.Equal(c.Data.StatsReceiver.BufferSec, 15)
assert.Equal(c.Data.Transform.Transformation, "none")
assert.Equal(c.Data.Transform.Message, "none")
assert.Equal(c.Data.LogLevel, "info")
}

Expand All @@ -293,3 +245,66 @@ func TestNewConfig_Hcl_sentry(t *testing.T) {
assert.Equal(c.Data.Sentry.Tags, "{\"testKey\":\"testValue\"}")
assert.Equal(c.Data.Sentry.Dsn, "testDsn")
}

func TestDefaultTransformation(t *testing.T) {
assert := assert.New(t)

t.Setenv("STREAM_REPLICATOR_CONFIG_FILE", "")
t.Setenv("MESSAGE_TRANSFORMATION", "")

c, err := NewConfig()
assert.NotNil(c)
if err != nil {
t.Fatalf("function NewConfig failed with error: %q", err.Error())
}

assert.Equal("none", c.Data.Transform.Message)
assert.Equal("none", c.ProvideTransformMessage())
assert.Equal("", c.ProvideTransformLayerName())
}

func TestTransformationProviderImplementation(t *testing.T) {
testFixPath := "./test-fixtures"
testCases := []struct {
File string
Plug Pluggable
Message string
LayerName string
}{
{
File: "transform-lua-simple.hcl",
Plug: transform.LuaLayer().(Pluggable),
Message: "lua:fun",
LayerName: "lua",
},
{
File: "transform-js-simple.hcl",
Plug: transform.JSLayer().(Pluggable),
Message: "js:fun",
LayerName: "js",
},
}

for _, tt := range testCases {
t.Run(tt.File, func(t *testing.T) {
assert := assert.New(t)

configFile := filepath.Join(testFixPath, tt.File)
t.Setenv("STREAM_REPLICATOR_CONFIG_FILE", configFile)

c, err := NewConfig()
assert.NotNil(c)
if err != nil {
t.Fatalf("function NewConfig failed with error: %q", err.Error())
}

assert.Equal(tt.Message, c.ProvideTransformMessage())
assert.Equal(tt.LayerName, c.ProvideTransformLayerName())

component, err := c.ProvideTransformComponent(tt.Plug)
assert.Nil(err)
assert.NotNil(component)

})
}
}
18 changes: 16 additions & 2 deletions config/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ stats_receiver {
// block for configuring sentry
sentry {}
// string to configure message transformation (default: "none")
message_transformation = "none"
// block for configuring transformations
transform {
// string to configure message transformation (default: "none")
message_transformation = "none"
}
// log level configuration (default: "info")
log_level = "info"
Expand Down Expand Up @@ -99,6 +102,17 @@ sentry {
debug = true
}
transform {
message_transformation = "spEnrichedFilter:app_id==myApp,js:customFunction"
use "js" {
source_b64 = "CmZ1bmN0aW9uIGN1c3RvbUZ1bmN0aW9uKGlucHV0KSB7CiAgICByZXR1cm4gaW5wdXQ7Cn0K"
timeout_sec = 2
disable_source_maps = false
snowplow_mode = true
}
}
log_level = "debug"
user_provided_id = "my-example-id"
Expand Down
4 changes: 3 additions & 1 deletion config/test-fixtures/invalids.hcl
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# configuration with various invalid options

message_transformation = "fakeHCL"
transform {
message_transformation = "fakeHCL"
}

target {
use "fakeHCL" {}
Expand Down
7 changes: 7 additions & 0 deletions config/test-fixtures/transform-invalid-layer-js.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# transform configuration

transform {
message_transformation = "js:fun"

use "fake" {}
}
7 changes: 7 additions & 0 deletions config/test-fixtures/transform-invalid-layer-lua.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# transform configuration

transform {
message_transformation = "lua:fun"

use "fake" {}
}
12 changes: 12 additions & 0 deletions config/test-fixtures/transform-js-extended.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# transform configuration - js - extended

transform {
message_transformation = "js:fun"

use "js" {
source_b64 = "CglmdW5jdGlvbiBmb28oeCkgewoJICAgIHJldHVybiB4OwoJfQoJ"
timeout_sec = 10
disable_source_maps = false
snowplow_mode = true
}
}
Loading

0 comments on commit 2e319b5

Please sign in to comment.