Skip to content

Commit

Permalink
sequencer/script: Clean up conditional stack injection
Browse files Browse the repository at this point in the history
This change corrects a nil dereference if a userscript does not call
configureSource(). The code is restructured to inject the source- or
target-phase acceptors only if required by the userscript. Tests are added to
validate source-only and target-only configurations.
  • Loading branch information
bobvawter committed Nov 1, 2024
1 parent bc5fdfa commit ee8e289
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 43 deletions.
76 changes: 33 additions & 43 deletions internal/sequencer/script/script.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,50 +69,33 @@ func (w *wrapper) Start(
return nil, nil, err
}

// Only inject if the source or any tables have a configuration.
sourceBindings, inject := scr.Sources.Get(opts.Group.Name)
if !inject {
for _, tbl := range opts.Group.Tables {
_, inject = scr.Targets.Get(tbl)
if inject {
// Install the target-phase acceptor into the options chain. This
// will be invoked for mutations which have passed through the
// sequencer stack.
if scr.Targets.Len() > 0 {
// If the userscript has defined an apply function, we need to
// ensure that a database transaction will be available to support
// the api.getTX() function. This is mainly relevant to immediate
// mode, in which the sequencer caller won't necessarily have
// provided a transaction.
ensureTX := false
for tgt := range scr.Targets.Values() {
if tgt.UserAcceptor != nil {
ensureTX = true
break
}
}
}
if !inject {
return w.delegate.Start(ctx, opts)
}

watcher, err := w.watchers.Get(opts.Group.Enclosing)
if err != nil {
return nil, nil, err
}

// If the userscript has defined any apply functions, we will
// need to ensure that a database transaction will be available
// to support the api.getTX() function. This is mainly relevant
// to immediate mode, in which the sequencer caller won't
// necessarily have created a transaction.
ensureTX := false
for target := range scr.Targets.Values() {
if target.UserAcceptor != nil {
ensureTX = true
break
}
opts = opts.Copy()
opts.Delegate = types.OrderedAcceptorFrom(&targetAcceptor{
delegate: opts.Delegate,
ensureTX: ensureTX,
group: opts.Group,
targetPool: w.targetPool,
userScript: scr,
}, w.watchers)
}

// Install the target-phase acceptor into the options chain. This
// will be invoked for mutations which have passed through the
// sequencer stack.
opts = opts.Copy()
opts.Delegate = types.OrderedAcceptorFrom(&targetAcceptor{
delegate: opts.Delegate,
ensureTX: ensureTX,
group: opts.Group,
targetPool: w.targetPool,
userScript: scr,
}, w.watchers)

// Initialize downstream sequencer.
acc, stat, err := w.delegate.Start(ctx, opts)
if err != nil {
Expand All @@ -122,11 +105,18 @@ func (w *wrapper) Start(
// Install the source-phase acceptor. This provides the user with
// the opportunity to rewrite mutations before they are presented to
// the upstream sequencer.
acc = &sourceAcceptor{
delegate: acc,
group: opts.Group,
sourceBindings: sourceBindings,
watcher: watcher,
if sourceBindings, ok := scr.Sources.Get(opts.Group.Name); ok {
watcher, err := w.watchers.Get(opts.Group.Enclosing)
if err != nil {
return nil, nil, err
}

acc = &sourceAcceptor{
delegate: acc,
group: opts.Group,
sourceBindings: sourceBindings,
watcher: watcher,
}
}
return acc, stat, nil
}
Expand Down
137 changes: 137 additions & 0 deletions internal/sequencer/script/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package script_test
import (
"encoding/json"
"fmt"
"strings"
"testing"
"testing/fstest"
"time"
Expand All @@ -36,10 +37,146 @@ import (
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
"github.com/cockroachdb/replicator/internal/util/subfs"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

func TestSourceWithoutTargets(t *testing.T) {
r := require.New(t)

fixture, err := all.NewFixture(t)
r.NoError(err)

ctx := fixture.Context

tbl, err := fixture.CreateTargetTable(ctx,
`CREATE TABLE %s (pk INT PRIMARY KEY, v VARCHAR(2048) NOT NULL)`)
r.NoError(err)

scriptCfg := &script.Config{
MainPath: "/main.ts",
FS: &subfs.SubstitutingFS{
FS: &fstest.MapFS{
"main.ts": &fstest.MapFile{
Data: []byte(`
import * as api from "replicator@v1";
function disp(doc, meta) {
doc.v = "cowbell";
return { "__TABLE__" : [ doc ] };
}
api.configureSource("__SCHEMA__", {
deletesTo: disp,
dispatch: disp,
});
`)}},
Replacer: strings.NewReplacer(
"__SCHEMA__", tbl.Name().Schema().Raw(),
"__TABLE__", tbl.Name().Raw(),
)}}

seqCfg := &sequencer.Config{
Parallelism: 2,
QuiescentPeriod: 100 * time.Millisecond,
}
r.NoError(seqCfg.Preflight())
seqFixture, err := seqtest.NewSequencerFixture(fixture,
seqCfg,
scriptCfg)
r.NoError(err)

wrapped, err := seqFixture.Script.Wrap(ctx, seqFixture.Immediate)
r.NoError(err)
acc, _, err := wrapped.Start(ctx, &sequencer.StartOptions{
Bounds: notify.VarOf[hlc.Range](hlc.RangeEmpty()),
Delegate: types.OrderedAcceptorFrom(fixture.ApplyAcceptor, fixture.Watchers),
Group: &types.TableGroup{
Enclosing: fixture.TargetSchema.Schema(),
Name: ident.New(tbl.Name().Schema().Raw()), // Aligns with configureSource() call.
Tables: []ident.Table{tbl.Name()},
},
})
r.NoError(err)

r.NoError(acc.AcceptTableBatch(ctx, sinktest.TableBatchOf(
ident.NewTable(tbl.Name().Schema(), ident.New("ignored")),
hlc.New(1, 1),
[]types.Mutation{
{Data: json.RawMessage(`{"pk":1}`), Key: json.RawMessage(`[1]`)},
},
), &types.AcceptOptions{}))

ct, err := tbl.RowCount(ctx)
r.NoError(err)
r.Equal(1, ct)
}

func TestTargetsWithoutSource(t *testing.T) {
r := require.New(t)

fixture, err := all.NewFixture(t)
r.NoError(err)

ctx := fixture.Context

tbl, err := fixture.CreateTargetTable(ctx,
`CREATE TABLE %s (pk INT PRIMARY KEY, v VARCHAR(2048) NOT NULL)`)
r.NoError(err)

scriptCfg := &script.Config{
MainPath: "/main.ts",
FS: &subfs.SubstitutingFS{
FS: &fstest.MapFS{
"main.ts": &fstest.MapFile{
Data: []byte(`
import * as api from "replicator@v1";
api.configureTable("__TABLE__", {
map: (doc) => {
doc.v = "cowbell";
return doc;
}
});
`)}},
Replacer: strings.NewReplacer(
"__TABLE__", tbl.Name().Raw(),
)}}

seqCfg := &sequencer.Config{
Parallelism: 2,
QuiescentPeriod: 100 * time.Millisecond,
}
r.NoError(seqCfg.Preflight())
seqFixture, err := seqtest.NewSequencerFixture(fixture,
seqCfg,
scriptCfg)
r.NoError(err)

wrapped, err := seqFixture.Script.Wrap(ctx, seqFixture.Immediate)
r.NoError(err)
acc, _, err := wrapped.Start(ctx, &sequencer.StartOptions{
Bounds: notify.VarOf[hlc.Range](hlc.RangeEmpty()),
Delegate: types.OrderedAcceptorFrom(fixture.ApplyAcceptor, fixture.Watchers),
Group: &types.TableGroup{
Enclosing: fixture.TargetSchema.Schema(),
Name: ident.New("ignored"),
Tables: []ident.Table{tbl.Name()},
},
})
r.NoError(err)

r.NoError(acc.AcceptTableBatch(ctx, sinktest.TableBatchOf(
tbl.Name(),
hlc.New(1, 1),
[]types.Mutation{
{Data: json.RawMessage(`{"pk":1}`), Key: json.RawMessage(`[1]`)},
},
), &types.AcceptOptions{}))

ct, err := tbl.RowCount(ctx)
r.NoError(err)
r.Equal(1, ct)
}

func TestUserScriptSequencer(t *testing.T) {
for mode := switcher.MinMode; mode <= switcher.MaxMode; mode++ {
t.Run(mode.String(), func(t *testing.T) {
Expand Down

0 comments on commit ee8e289

Please sign in to comment.