Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve list query #140

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
fcbbef2
First rough draft for batching
marco6 Sep 30, 2024
0423697
Implementing update
marco6 Sep 30, 2024
59cb5ad
Refactoring batching
marco6 Sep 30, 2024
904f5b6
Removing useless TODOs
marco6 Oct 1, 2024
3578459
Moving database-related structs in a single package
marco6 Oct 24, 2024
7a437c6
Testing prepared
marco6 Oct 24, 2024
52abef0
Testing batched
marco6 Oct 24, 2024
d0467e5
Addressing review
marco6 Oct 24, 2024
907ce6c
Fix connection leak in tests
marco6 Oct 25, 2024
1290eac
Removing old prepared package
marco6 Oct 28, 2024
44a01e1
Collapse revisionAfter and listRevision
marco6 Oct 28, 2024
c4b3922
Single path for list
marco6 Oct 28, 2024
8b51a78
Simplify count query
marco6 Oct 28, 2024
708f7f6
Remove dialect support for placeholders
marco6 Oct 28, 2024
f7dd57b
Remove query customization for dialects
marco6 Oct 28, 2024
5a05866
Remove useless adjustRevision
marco6 Oct 28, 2024
54c9815
Wait for log to finish
marco6 Oct 28, 2024
026f23a
Properly checking for revision in After
marco6 Oct 28, 2024
3ad0959
Fix update test scaling
marco6 Oct 29, 2024
4e9dde3
Merge branch 'marco6/simplify-list-query'
marco6 Oct 28, 2024
812fcf2
Merging logstructured package with sqllog
marco6 Oct 28, 2024
5c3b645
Renaming logstructured to sqllog
marco6 Oct 28, 2024
a75ccbd
Make logstructured embed sqllog
marco6 Oct 28, 2024
fdcd83f
Remove forwarding Count
marco6 Oct 28, 2024
c09d043
Moving watch to sqllog
marco6 Oct 29, 2024
da9f8da
Move ttl logic to sqllog
marco6 Oct 29, 2024
bb5a208
Move List to logstructured
marco6 Oct 29, 2024
3e37701
Moving Get to sqllog
marco6 Oct 29, 2024
dd32ab7
Remove logstructured
marco6 Oct 29, 2024
c4f06d4
Remove useless function
marco6 Oct 29, 2024
ebd19f9
Remove useless Get operation
marco6 Oct 29, 2024
81ce8f9
Better start/stop management for sqllog
marco6 Oct 30, 2024
e3c4357
Make broadcaster typed
marco6 Oct 30, 2024
3c76053
Use typed ScanAll to reduce a bit boilerplate
marco6 Oct 30, 2024
302c1ae
Stop GRPC server once done
marco6 Oct 31, 2024
af18aeb
Mark log as started
marco6 Oct 31, 2024
7ff146f
Merge branch 'KU-1750/merge-logstructured-and-sqllog' into KU-1750/be…
marco6 Oct 31, 2024
d13c61c
Improve list query
marco6 Oct 29, 2024
26ae39e
Add TTL query to speed up startup
marco6 Oct 29, 2024
d334e10
Merge branch 'KU-1750/better-get-queries-after-merge'
marco6 Oct 31, 2024
db4e42b
Merge branch 'KU-1744/batching'
marco6 Oct 31, 2024
05b62da
Improve list query
marco6 Jul 22, 2024
0c69291
Fix count query
marco6 Jul 29, 2024
8a2178b
Better index order
marco6 Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions pkg/database/batched.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package database

import (
"context"
"database/sql"
"errors"
"sync"
)

type Execer interface {
ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error)
}

type batchStatus int

const (
batchNotStarted batchStatus = iota
batchStarted
batchRunning
)

var errDBClosed = errors.New("sql: database is closed")

type batchedDb struct {
Interface

mu sync.Mutex
workerContext context.Context
stopWorker func()
closed bool

cv sync.Cond
status batchStatus

queue []*batchJob
runId int64
}

func NewBatched(db Interface) Interface {
workerContext, stopWorker := context.WithCancel(context.Background())

b := &batchedDb{
Interface: db,
workerContext: workerContext,
stopWorker: stopWorker,
}
b.cv.L = &b.mu
return b
}

func (b *batchedDb) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
b.mu.Lock()
defer b.mu.Unlock()

if b.closed {
return nil, errDBClosed
}

runId := b.runId
if b.status == batchRunning {
// The current run is already taking place.
runId++
}

job := &batchJob{
ctx: ctx,
query: query,
args: args,
runId: runId,
}
b.queue = append(b.queue, job)

b.run()

for job.runId >= b.runId {
b.cv.Wait()
}

if job.err != nil {
return nil, job.err
}

return job, nil
}

// run starts the batching job if it has not already been started
// which will run until queue exaustion. run does not block other
// goroutine from enqueuing new jobs.
//
// It must be called while holding the Batch's mu lock.
func (b *batchedDb) run() {
if b.status == batchNotStarted {
b.status = batchStarted

go func() {
b.mu.Lock()
defer b.mu.Unlock()

b.status = batchRunning
defer func() { b.status = batchNotStarted }()

for len(b.queue) > 0 {
queue := b.queue
b.queue = nil

b.mu.Unlock()
b.execQueue(b.workerContext, queue)
b.mu.Lock()

b.runId++
b.cv.Broadcast()
}
}()
}
}

func (b *batchedDb) execQueue(ctx context.Context, queue []*batchJob) {
if len(queue) == 0 {
return // This should never happen.
}
if len(queue) == 1 {
// We don't need to address the error here as it will be
// handled by the goroutine waiting for this result
queue[0].exec(queue[0].ctx, b.Interface)
return
}

transaction := func() error {
tx, err := b.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

for _, q := range queue {
// In the case of SQLITE_FULL SQLITE_IOERR SQLITE_BUSY SQLITE_NOMEM
// we should explicitly rollback the whole transaction. In all the other
// cases, we could keep going with other queries. However, it is a bit
// unclear to me what to do next though as:
// - SQLITE_FULL, SQLITE_IOERR mean that we have problems with the disk
// so, even retrying the batch will not work. We might throttle the
// max batch size, hoping in a checkpoint?
// - SQLITE_BUSY should never happen if we manage to get `IMMEDIATE`
// transactions in. Otherwise it only affects the first statement.
// - SQLITE_NOMEM, again, we could throttle here? Call a gc collection?
// Given the points above, the code below always rolls back the whole
// batch. It might seem inefficient, but it should almost never happen.
if err := q.exec(ctx, tx); err != nil {
return err
}
}

return tx.Commit()
}
if err := transaction(); err != nil {
for _, q := range queue {
q.err = err
}
}
}

func (b *batchedDb) Close() error {
b.mu.Lock()
defer b.mu.Unlock()

if b.closed {
return errDBClosed
}

b.closed = true
b.stopWorker()

for b.status != batchNotStarted {
b.cv.Wait()
}

return b.Interface.Close()
}

type batchJob struct {
ctx context.Context
query string
args []any

runId int64
lastInsertId int64
rowsAffected int64
err error
}

var _ sql.Result = &batchJob{}

func (job *batchJob) exec(ctx context.Context, execer Execer) error {
select {
case <-job.ctx.Done():
job.err = job.ctx.Err()
return job.err
default:
// From this point on, the job is not interruptible anymore
// as interrupting would mean that we would be forced to
// ROLLBACK the whole transaction.
}

var result sql.Result
result, job.err = execer.ExecContext(ctx, job.query, job.args...)
if job.err != nil {
return job.err
}

rowsAffected, err := result.RowsAffected()
if err != nil {
job.err = err
return job.err
}
job.rowsAffected = rowsAffected

lastInsertId, err := result.LastInsertId()
if err != nil {
job.err = err
return job.err
}
job.lastInsertId = lastInsertId
return nil
}

// LastInsertId implements sql.Result.
func (job *batchJob) LastInsertId() (int64, error) {
if job.err != nil {
return 0, job.err
}
return job.lastInsertId, nil
}

// RowsAffected implements sql.Result.
func (job *batchJob) RowsAffected() (int64, error) {
if job.err != nil {
return 0, job.err
}
return job.rowsAffected, nil
}
86 changes: 86 additions & 0 deletions pkg/database/batched_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package database_test

import (
"context"
"database/sql"
"sync"
"testing"

"github.com/canonical/k8s-dqlite/pkg/database"
)

func TestBatchedSequentialLoad(t *testing.T) {
ctx := context.Background()
driver := &testDriver{
t: t,
}
db := database.NewBatched(database.Wrap(sql.OpenDB(&testConnector{driver: driver})))
defer db.Close()

for i := 0; i < 100; i++ {
_, err := db.ExecContext(ctx, "query 1")
if err != nil {
t.Error(err)
}
}
if trans := driver.trans.Load(); trans != 0 {
t.Errorf("unexpected number of transaction: want 0, got %d", trans)
}
}

func TestBatchedParallelLoad(t *testing.T) {
ctx := context.Background()
driver := &testDriver{
t: t,
}
db := database.NewBatched(database.Wrap(sql.OpenDB(&testConnector{driver: driver})))
defer db.Close()

wg := &sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
_, err := db.ExecContext(ctx, "query 1")
if err != nil {
t.Error(err)
}
}()
}
wg.Wait()

trans := driver.trans.Load()
if trans == 0 {
t.Error("unexpected number of transaction: want >0, got 0")
}
}

func TestBatchedClosed(t *testing.T) {
ctx := context.Background()
driver := &testDriver{
t: t,
}
db := database.NewBatched(database.Wrap(sql.OpenDB(&testConnector{driver: driver})))
db.Close()

_, err := db.ExecContext(ctx, "query 1")
if err == nil {
t.Error("closed connection did not fail")
}
}

func TestBatchedCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

driver := &testDriver{
t: t,
}
db := database.NewBatched(database.Wrap(sql.OpenDB(&testConnector{driver: driver})))
defer db.Close()

_, err := db.ExecContext(ctx, "query 1")
if err == nil {
t.Error("closed connection did not fail")
}
}
Loading
Loading