Skip to content

Commit

Permalink
Added TxTracer to DeliverTxEntry which is the first step to suppo…
Browse files Browse the repository at this point in the history
…rt tracing when using OCC (#478)

## Describe your changes and provide context

This brings in an interface that can be set on `DeliverTxEntry` and
hooks into the `scheduler` so it call's the necessary tracer callbacks
when required.

Refer to `types/tx_tracer.go` for extra details about the patch.

## Testing performed to validate your change

---------

Co-authored-by: Steven Landers <[email protected]>
  • Loading branch information
2 people authored and udpatil committed Apr 19, 2024
1 parent 37b8682 commit 7296878
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 1 deletion.
15 changes: 15 additions & 0 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type deliverTxTask struct {
AbsoluteIndex int
Response *types.ResponseDeliverTx
VersionStores map[sdk.StoreKey]*multiversion.VersionIndexedStore
TxTracer sdk.TxTracer
}

// AppendDependencies appends the given indexes to the task's dependencies
Expand Down Expand Up @@ -84,6 +85,10 @@ func (dt *deliverTxTask) Reset() {
dt.Abort = nil
dt.AbortCh = nil
dt.VersionStores = nil

if dt.TxTracer != nil {
dt.TxTracer.Reset()
}
}

func (dt *deliverTxTask) Increment() {
Expand Down Expand Up @@ -188,7 +193,9 @@ func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTa
AbsoluteIndex: r.AbsoluteIndex,
Status: statusPending,
Dependencies: map[int]struct{}{},
TxTracer: r.TxTracer,
}

tasksMap[r.AbsoluteIndex] = task
allTasks = append(allTasks, task)
}
Expand All @@ -199,6 +206,10 @@ func (s *scheduler) collectResponses(tasks []*deliverTxTask) []types.ResponseDel
res := make([]types.ResponseDeliverTx, 0, len(tasks))
for _, t := range tasks {
res = append(res, *t.Response)

if t.TxTracer != nil {
t.TxTracer.Commit()
}
}
return res
}
Expand Down Expand Up @@ -510,6 +521,10 @@ func (s *scheduler) prepareTask(task *deliverTxTask) {
ctx = ctx.WithMultiStore(ms)
}

if task.TxTracer != nil {
ctx = task.TxTracer.InjectInContext(ctx)
}

task.AbortCh = abortCh
task.Ctx = ctx
}
Expand Down
78 changes: 78 additions & 0 deletions tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,45 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test tx Reset properly before re-execution via tracer",
workers: 10,
runs: 1,
addStores: true,
requests: addTxTracerToTxEntries(requestList(250)),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res types.ResponseDeliverTx) {
defer abortRecoveryFunc(&res)
wait := rand.Intn(10)
time.Sleep(time.Duration(wait) * time.Millisecond)
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))
time.Sleep(time.Duration(wait) * time.Millisecond)
// write to the store with this tx's index
newVal := val + fmt.Sprintf("%d", ctx.TxIndex())
kv.Set(itemKey, []byte(newVal))

if v, ok := ctx.Context().Value("test_tracer").(*testTxTracer); ok {
v.OnTxExecute()
}

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: newVal,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {
expected := ""
for idx, response := range res {
expected = expected + fmt.Sprintf("%d", idx)
require.Equal(t, expected, response.Info)
}
// confirm last write made it to the parent store
latest := ctx.MultiStore().GetKVStore(testStoreKey).Get(itemKey)
require.Equal(t, expected, string(latest))
},
expectedErr: nil,
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -428,3 +467,42 @@ func TestProcessAll(t *testing.T) {
})
}
}

func addTxTracerToTxEntries(txEntries []*sdk.DeliverTxEntry) []*sdk.DeliverTxEntry {
for _, txEntry := range txEntries {
txEntry.TxTracer = newTestTxTracer(txEntry.AbsoluteIndex)
}

return txEntries
}

var _ sdk.TxTracer = &testTxTracer{}

func newTestTxTracer(txIndex int) *testTxTracer {
return &testTxTracer{txIndex: txIndex, canExecute: true}
}

type testTxTracer struct {
txIndex int
canExecute bool
}

func (t *testTxTracer) Commit() {
t.canExecute = false
}

func (t *testTxTracer) InjectInContext(ctx sdk.Context) sdk.Context {
return ctx.WithContext(context.WithValue(ctx.Context(), "test_tracer", t))
}

func (t *testTxTracer) Reset() {
t.canExecute = true
}

func (t *testTxTracer) OnTxExecute() {
if !t.canExecute {
panic(fmt.Errorf("task #%d was asked to execute but the tracer is not in the correct state, most probably due to missing Reset call or over execution", t.txIndex))
}

t.canExecute = false
}
3 changes: 2 additions & 1 deletion types/tx_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
)

// DeliverTxEntry represents an individual transaction's request within a batch.
// This can be extended to include tx-level tracing or metadata
// This can be extended to include tx-level metadata
type DeliverTxEntry struct {
Request abci.RequestDeliverTx
SdkTx Tx
Checksum [32]byte
AbsoluteIndex int
EstimatedWritesets MappedWritesets
TxTracer TxTracer
}

// EstimatedWritesets represents an estimated writeset for a transaction mapped by storekey to the writeset estimate.
Expand Down
44 changes: 44 additions & 0 deletions types/tx_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package types

// TxTracer is an interface for tracing transactions generic
// enough to be used by any transaction processing engine be it
// CoWasm or EVM.
//
// The TxTracer responsibility is to inject itself in the context
// that will be used to process the transaction. How the context
// will be used afterward is up to the transaction processing engine.
//
// Today, only EVM transaction processing engine do something with the
// TxTracer (it inject itself into the EVM execution context for
// go-ethereum level tracing).
//
// The TxTracer receives signals from the scheduler when the tracer
// should be reset because the transaction is being re-executed and
// when the transaction is committed.
type TxTracer interface {
// InjectInContext injects the transaction specific tracer in the context
// that will be used to process the transaction.
//
// For now only the EVM transaction processing engine uses the tracer
// so it only make sense to inject an EVM tracer. Future updates might
// add the possibility to inject a tracer for other transaction kind.
//
// Which tracer implementation to provied and how will be retrieved later on
// from the context is dependent on the transaction processing engine.
InjectInContext(ctx Context) Context

// Reset is called when the transaction is being re-executed and the tracer
// should be reset. A transaction executed by the OCC parallel engine might
// be re-executed multiple times before being committed, each time `Reset`
// will be called.
//
// When Reset is received, it means everything that was traced before should
// be discarded.
Reset()

// Commit is called when the transaction is committed. This is the last signal
// the tracer will receive for a given transaction. After this call, the tracer
// should do whatever it needs to forward the tracing information to the
// appropriate place/collector.
Commit()
}

0 comments on commit 7296878

Please sign in to comment.