Skip to content

Commit

Permalink
Adding a test to ensure that tx commit order is properly tested
Browse files Browse the repository at this point in the history
Tracers can expect that that `OnTxCommit` is called in proper transaction order so that it have the certitude that the current `OnTxCommit` is that correct one to happen relative to all other transaction.

This is already the case, this test case only add a certain confidence that this assertion is going to be respected if any refactor happen in the future.
  • Loading branch information
maoueh committed Jul 10, 2024
1 parent 2a87e89 commit a0b8b7d
Showing 1 changed file with 90 additions and 17 deletions.
107 changes: 90 additions & 17 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func TestProcessAll(t *testing.T) {
http.ListenAndServe("localhost:6060", nil)
}()

var testTxTracerCommitIndexes = new([]int)

tests := []struct {
name string
workers int
Expand Down Expand Up @@ -400,7 +402,7 @@ func TestProcessAll(t *testing.T) {
workers: 10,
runs: 1,
addStores: true,
requests: addTxTracerToTxEntries(requestList(250)),
requests: addTxTracerToTxEntries(requestList(250), newTestResetTxTracer),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
Expand All @@ -413,7 +415,7 @@ func TestProcessAll(t *testing.T) {
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

if v, ok := ctx.Context().Value("test_tracer").(*testTxTracer); ok {
if v, ok := ctx.Context().Value("test_tracer").(*testResetTxTracer); ok {
v.OnTxExecute()
}

Expand All @@ -434,6 +436,39 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test tx Commit done in requests order",
workers: 10,
runs: 1,
addStores: true,
requests: addTxTracerToTxEntries(requestList(250), func(txIndex int) sdk.TxTracer { return newTestCommitTxTracer(txIndex, testTxTracerCommitIndexes) }),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
time.Sleep(time.Duration(wait) * time.Millisecond)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
time.Sleep(time.Duration(wait) * time.Millisecond)
// write to the store with this tx's index
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: newVal,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
expectedCommitTxIndexes := make([]int, 250)
for i := 0; i < 250; i++ {
expectedCommitTxIndexes[i] = i
}

require.Equal(t, expectedCommitTxIndexes, *testTxTracerCommitIndexes)
},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -468,41 +503,79 @@ func TestProcessAll(t *testing.T) {
}
}

func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry) []*sdk.DeliverTxEntry {
func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry, factory func(txIndex int) sdk.TxTracer) []*sdk.DeliverTxEntry {
for _, txEntry := range txEntries {
txEntry.TxTracer = newTestTxTracer(txEntry.AbsoluteIndex)
txEntry.TxTracer = factory(txEntry.AbsoluteIndex)
}

return txEntries
}

var _ sdk.TxTracer = &testTxTracer{}

func newTestTxTracer(txIndex int) *testTxTracer {
return &testTxTracer{txIndex: txIndex, canExecute: true}
}
var _ sdk.TxTracer = testTxTracer{}

type testTxTracer struct {
txIndex int
canExecute bool
onCommit func()
onInjectInContext func(ctx sdk.Context) sdk.Context
onReset func()
}

func (t *testTxTracer) Commit() {
t.canExecute = false
func (t testTxTracer) Commit() {
if t.onCommit != nil {
t.onCommit()
}
}

func (t *testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context {
func (t testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context {
if t.onInjectInContext != nil {
return t.onInjectInContext(ctx)
}

return ctx.WithContext(context.WithValue(ctx.Context(), "test_tracer", t))
}

func (t *testTxTracer) Reset() {
t.canExecute = true
func (t testTxTracer) Reset() {
if t.onReset != nil {
t.onReset()
}
}

func newTestResetTxTracer(txIndex int) sdk.TxTracer {
var tracer *testResetTxTracer
tracer = &testResetTxTracer{
testTxTracer: testTxTracer{
onCommit: func() {
tracer.canExecute = false
},
onReset: func() {
tracer.canExecute = true
},
},

txIndex: txIndex,
canExecute: true,
}
return tracer
}

type testResetTxTracer struct {
testTxTracer

txIndex int
canExecute bool
}

func (t *testTxTracer) OnTxExecute() {
func (t *testResetTxTracer) OnTxExecute() {
if !t.canExecute {
panic(fmt.Errorf("task #%d was asked to execute but the tracer is not in the correct state, most probably due to missing Reset call or over execution", t.txIndex))
}

t.canExecute = false
}

func newTestCommitTxTracer(txIndex int, committedTxIndexes *[]int) sdk.TxTracer {
return &testTxTracer{
onCommit: func() {
*committedTxIndexes = append(*committedTxIndexes, txIndex)
},
}
}

0 comments on commit a0b8b7d

Please sign in to comment.