-
Notifications
You must be signed in to change notification settings - Fork 2
/
txngen.go
127 lines (114 loc) · 3.43 KB
/
txngen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"context"
"fmt"
"log/slog"
"strings"
"time"
"github.com/jackc/pgx/v5"
"golang.org/x/sync/errgroup"
)
type txngenConfig struct {
parallelism int
iterations int
batchSize int
delayMs int
noTxn bool
}
// executes a single thread inserting rows, optionally in a txn
func txngenProcessor(ctx context.Context, threadID int,
config *polorexConfig, txngenConfig *txngenConfig) error {
conn, err := config.createPostgresConn(ctx, false)
if err != nil {
return fmt.Errorf("unable to establish connection: %w", err)
}
// slightly inefficient queries to report minID and maxID
stmt := fmt.Sprintf("INSERT INTO %s VALUES%s", config.tablename,
strings.TrimSuffix(strings.Repeat("(default,default),", txngenConfig.batchSize), ","))
minStmt := fmt.Sprintf("WITH ir AS (%s RETURNING id) SELECT min(id) FROM ir", stmt)
maxStmt := fmt.Sprintf("WITH ir AS (%s RETURNING id) SELECT max(id) FROM ir", stmt)
var minID, maxID int64
// don't want to duplicate code for txn vs noTxn, so scoping txn to this function
insertFunc := func(i int) error {
switch i {
case 0:
err = conn.QueryRow(ctx, minStmt).Scan(&minID)
case txngenConfig.iterations - 1:
err = conn.QueryRow(ctx, maxStmt).Scan(&maxID)
default:
_, err = conn.Exec(ctx, stmt)
}
if err != nil {
return fmt.Errorf("failed to execute INSERT statement outside txn: %w", err)
}
return nil
}
if !txngenConfig.noTxn {
tx, err := conn.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction for inserting records: %w", err)
}
defer func() {
deferErr := tx.Rollback(ctx)
if deferErr != pgx.ErrTxClosed && deferErr != nil {
slog.Error("error rolling back transaction for inserting records", "err", err)
}
}()
insertFunc = func(i int) error {
switch i {
case 0:
err = conn.QueryRow(ctx, minStmt).Scan(&minID)
case txngenConfig.iterations - 1:
err = conn.QueryRow(ctx, maxStmt).Scan(&maxID)
default:
_, err = conn.Exec(ctx, stmt)
}
if err != nil {
return fmt.Errorf("failed to execute INSERT statement in txn: %w", err)
}
if i == (txngenConfig.iterations - 1) {
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction for inserting records: %w", err)
}
}
return nil
}
}
totalRecords := txngenConfig.iterations * txngenConfig.batchSize
startTime := time.Now()
for i := 0; i < txngenConfig.iterations; i++ {
slog.Info("Inserting records into table",
"thread", threadID,
"iterations", fmt.Sprintf("%d/%d", i, txngenConfig.iterations),
"records", fmt.Sprintf("%d/%d", i*txngenConfig.batchSize, totalRecords),
)
err = insertFunc(i)
if err != nil {
return err
}
if (txngenConfig.delayMs > 0) && i < (txngenConfig.iterations-1) {
time.Sleep(time.Duration(txngenConfig.delayMs) * time.Millisecond)
}
}
slog.Info("Finished inserting records",
"totalRecords", totalRecords,
"minID", minID,
"maxID", maxID,
"totalTime", time.Since(startTime),
"rps", totalRecords/int(time.Since(startTime).Seconds()),
)
return nil
}
func txngenMain(ctx context.Context, config *polorexConfig, txngenConfig *txngenConfig) error {
// cancellation will come from the quit logic in main
g, _ := errgroup.WithContext(ctx)
g.SetLimit(txngenConfig.parallelism)
for i := 0; i < txngenConfig.parallelism; i++ {
ci := i
g.Go(func() error {
return txngenProcessor(ctx, ci, config, txngenConfig)
})
}
return g.Wait()
}