diff --git a/internal/sequencer/script/script.go b/internal/sequencer/script/script.go index f5f6db6a..0ed509e4 100644 --- a/internal/sequencer/script/script.go +++ b/internal/sequencer/script/script.go @@ -69,50 +69,33 @@ func (w *wrapper) Start( return nil, nil, err } - // Only inject if the source or any tables have a configuration. - sourceBindings, inject := scr.Sources.Get(opts.Group.Name) - if !inject { - for _, tbl := range opts.Group.Tables { - _, inject = scr.Targets.Get(tbl) - if inject { + // Install the target-phase acceptor into the options chain. This + // will be invoked for mutations which have passed through the + // sequencer stack. + if scr.Targets.Len() > 0 { + // If the userscript has defined an apply function, we need to + // ensure that a database transaction will be available to support + // the api.getTX() function. This is mainly relevant to immediate + // mode, in which the sequencer caller won't necessarily have + // provided a transaction. + ensureTX := false + for tgt := range scr.Targets.Values() { + if tgt.UserAcceptor != nil { + ensureTX = true break } } - } - if !inject { - return w.delegate.Start(ctx, opts) - } - - watcher, err := w.watchers.Get(opts.Group.Enclosing) - if err != nil { - return nil, nil, err - } - // If the userscript has defined any apply functions, we will - // need to ensure that a database transaction will be available - // to support the api.getTX() function. This is mainly relevant - // to immediate mode, in which the sequencer caller won't - // necessarily have created a transaction. - ensureTX := false - for target := range scr.Targets.Values() { - if target.UserAcceptor != nil { - ensureTX = true - break - } + opts = opts.Copy() + opts.Delegate = types.OrderedAcceptorFrom(&targetAcceptor{ + delegate: opts.Delegate, + ensureTX: ensureTX, + group: opts.Group, + targetPool: w.targetPool, + userScript: scr, + }, w.watchers) } - // Install the target-phase acceptor into the options chain. This - // will be invoked for mutations which have passed through the - // sequencer stack. - opts = opts.Copy() - opts.Delegate = types.OrderedAcceptorFrom(&targetAcceptor{ - delegate: opts.Delegate, - ensureTX: ensureTX, - group: opts.Group, - targetPool: w.targetPool, - userScript: scr, - }, w.watchers) - // Initialize downstream sequencer. acc, stat, err := w.delegate.Start(ctx, opts) if err != nil { @@ -122,11 +105,18 @@ func (w *wrapper) Start( // Install the source-phase acceptor. This provides the user with // the opportunity to rewrite mutations before they are presented to // the upstream sequencer. - acc = &sourceAcceptor{ - delegate: acc, - group: opts.Group, - sourceBindings: sourceBindings, - watcher: watcher, + if sourceBindings, ok := scr.Sources.Get(opts.Group.Name); ok { + watcher, err := w.watchers.Get(opts.Group.Enclosing) + if err != nil { + return nil, nil, err + } + + acc = &sourceAcceptor{ + delegate: acc, + group: opts.Group, + sourceBindings: sourceBindings, + watcher: watcher, + } } return acc, stat, nil } diff --git a/internal/sequencer/script/script_test.go b/internal/sequencer/script/script_test.go index 7bc34787..7474099d 100644 --- a/internal/sequencer/script/script_test.go +++ b/internal/sequencer/script/script_test.go @@ -21,6 +21,7 @@ package script_test import ( "encoding/json" "fmt" + "strings" "testing" "testing/fstest" "time" @@ -36,10 +37,146 @@ import ( "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" + "github.com/cockroachdb/replicator/internal/util/subfs" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) +func TestSourceWithoutTargets(t *testing.T) { + r := require.New(t) + + fixture, err := all.NewFixture(t) + r.NoError(err) + + ctx := fixture.Context + + tbl, err := fixture.CreateTargetTable(ctx, + `CREATE TABLE %s (pk INT PRIMARY KEY, v VARCHAR(2048) NOT NULL)`) + r.NoError(err) + + scriptCfg := &script.Config{ + MainPath: "/main.ts", + FS: &subfs.SubstitutingFS{ + FS: &fstest.MapFS{ + "main.ts": &fstest.MapFile{ + Data: []byte(` +import * as api from "replicator@v1"; +function disp(doc, meta) { + doc.v = "cowbell"; + return { "__TABLE__" : [ doc ] }; +} +api.configureSource("__SCHEMA__", { + deletesTo: disp, + dispatch: disp, +}); +`)}}, + Replacer: strings.NewReplacer( + "__SCHEMA__", tbl.Name().Schema().Raw(), + "__TABLE__", tbl.Name().Raw(), + )}} + + seqCfg := &sequencer.Config{ + Parallelism: 2, + QuiescentPeriod: 100 * time.Millisecond, + } + r.NoError(seqCfg.Preflight()) + seqFixture, err := seqtest.NewSequencerFixture(fixture, + seqCfg, + scriptCfg) + r.NoError(err) + + wrapped, err := seqFixture.Script.Wrap(ctx, seqFixture.Immediate) + r.NoError(err) + acc, _, err := wrapped.Start(ctx, &sequencer.StartOptions{ + Bounds: notify.VarOf[hlc.Range](hlc.RangeEmpty()), + Delegate: types.OrderedAcceptorFrom(fixture.ApplyAcceptor, fixture.Watchers), + Group: &types.TableGroup{ + Enclosing: fixture.TargetSchema.Schema(), + Name: ident.New(tbl.Name().Schema().Raw()), // Aligns with configureSource() call. + Tables: []ident.Table{tbl.Name()}, + }, + }) + r.NoError(err) + + r.NoError(acc.AcceptTableBatch(ctx, sinktest.TableBatchOf( + ident.NewTable(tbl.Name().Schema(), ident.New("ignored")), + hlc.New(1, 1), + []types.Mutation{ + {Data: json.RawMessage(`{"pk":1}`), Key: json.RawMessage(`[1]`)}, + }, + ), &types.AcceptOptions{})) + + ct, err := tbl.RowCount(ctx) + r.NoError(err) + r.Equal(1, ct) +} + +func TestTargetsWithoutSource(t *testing.T) { + r := require.New(t) + + fixture, err := all.NewFixture(t) + r.NoError(err) + + ctx := fixture.Context + + tbl, err := fixture.CreateTargetTable(ctx, + `CREATE TABLE %s (pk INT PRIMARY KEY, v VARCHAR(2048) NOT NULL)`) + r.NoError(err) + + scriptCfg := &script.Config{ + MainPath: "/main.ts", + FS: &subfs.SubstitutingFS{ + FS: &fstest.MapFS{ + "main.ts": &fstest.MapFile{ + Data: []byte(` +import * as api from "replicator@v1"; +api.configureTable("__TABLE__", { + map: (doc) => { + doc.v = "cowbell"; + return doc; + } +}); +`)}}, + Replacer: strings.NewReplacer( + "__TABLE__", tbl.Name().Raw(), + )}} + + seqCfg := &sequencer.Config{ + Parallelism: 2, + QuiescentPeriod: 100 * time.Millisecond, + } + r.NoError(seqCfg.Preflight()) + seqFixture, err := seqtest.NewSequencerFixture(fixture, + seqCfg, + scriptCfg) + r.NoError(err) + + wrapped, err := seqFixture.Script.Wrap(ctx, seqFixture.Immediate) + r.NoError(err) + acc, _, err := wrapped.Start(ctx, &sequencer.StartOptions{ + Bounds: notify.VarOf[hlc.Range](hlc.RangeEmpty()), + Delegate: types.OrderedAcceptorFrom(fixture.ApplyAcceptor, fixture.Watchers), + Group: &types.TableGroup{ + Enclosing: fixture.TargetSchema.Schema(), + Name: ident.New("ignored"), + Tables: []ident.Table{tbl.Name()}, + }, + }) + r.NoError(err) + + r.NoError(acc.AcceptTableBatch(ctx, sinktest.TableBatchOf( + tbl.Name(), + hlc.New(1, 1), + []types.Mutation{ + {Data: json.RawMessage(`{"pk":1}`), Key: json.RawMessage(`[1]`)}, + }, + ), &types.AcceptOptions{})) + + ct, err := tbl.RowCount(ctx) + r.NoError(err) + r.Equal(1, ct) +} + func TestUserScriptSequencer(t *testing.T) { for mode := switcher.MinMode; mode <= switcher.MaxMode; mode++ { t.Run(mode.String(), func(t *testing.T) {