Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[extension/db_storage] Add transaction in batch operations #30551

Closed
27 changes: 27 additions & 0 deletions .chloggen/fix_dbstorage_batch_operations.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: extension/db_storage

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add transaction to batch operations to prevent database state changes when an operation fails

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29730]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 8 additions & 1 deletion extension/storage/dbstorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// SQLite driver
_ "github.com/mattn/go-sqlite3"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/multierr"
)

const (
Expand Down Expand Up @@ -85,6 +86,10 @@ func (c *dbStorageClient) Delete(ctx context.Context, key string) error {
// Batch executes the specified operations in order. Get operation results are updated in place
func (c *dbStorageClient) Batch(ctx context.Context, ops ...storage.Operation) error {
var err error
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
return err
}
for _, op := range ops {
switch op.Type {
case storage.Get:
Expand All @@ -94,13 +99,15 @@ func (c *dbStorageClient) Batch(ctx context.Context, ops ...storage.Operation) e
case storage.Delete:
err = c.Delete(ctx, op.Key)
default:
return errors.New("wrong operation type")
err = errors.New("wrong operation type")
}

if err != nil {
err = multierr.Append(err, tx.Rollback())
return err
}
}
err = tx.Commit()
return err
}

Expand Down
163 changes: 163 additions & 0 deletions extension/storage/dbstorage/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dbstorage // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/dbstorage"

import (
"context"
"database/sql"
"errors"
"os"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

// Define a test database file name
const testDBFile = "test.db"

// TestDBStorageClient verifies the behavior of the dbStorageClient
func TestDBStorageClient(t *testing.T) {
// Set up the test client
client, cleanup := setupTestEnvironment(t, "test_table")
defer cleanup()

// Run tests for Get, Set, Delete, and Batch methods
t.Run("Get", func(t *testing.T) {
// Test Get for a key that doesn't exist
value, err := client.Get(context.Background(), "nonexistent_key")
assert.NoError(t, err)
assert.Nil(t, value)

// Test Get for an existing key
err = client.Set(context.Background(), "existing_key", []byte("some_value"))
assert.NoError(t, err)
value, err = client.Get(context.Background(), "existing_key")
assert.NoError(t, err)
assert.Equal(t, []byte("some_value"), value)
})

t.Run("Set", func(t *testing.T) {
// Test Set for a new key
err := client.Set(context.Background(), "new_key", []byte("new_value"))
assert.NoError(t, err)

// Test update for an existing key
err = client.Set(context.Background(), "existing_key", []byte("updated_value"))
assert.NoError(t, err)

// Verify the updated value
value, err := client.Get(context.Background(), "existing_key")
assert.NoError(t, err)
assert.Equal(t, []byte("updated_value"), value)
})

t.Run("Delete", func(t *testing.T) {
// Test Delete for an existing key
err := client.Delete(context.Background(), "existing_key")
assert.NoError(t, err)

// Verify that the key is deleted
value, err := client.Get(context.Background(), "existing_key")
assert.NoError(t, err)
assert.Nil(t, value)

// Test Delete for a nonexistent key (no error expected)
err = client.Delete(context.Background(), "nonexistent_key")
assert.NoError(t, err)
})

t.Run("Batch", func(t *testing.T) {
// Test Batch for a combination of Get, Set, and Delete operations
ops := []storage.Operation{
{Type: storage.Get, Key: "existing_key"},
{Type: storage.Set, Key: "new_key", Value: []byte("new_value")},
{Type: storage.Delete, Key: "nonexistent_key"},
}

err := client.Batch(context.Background(), ops...)
assert.NoError(t, err)

// Verify the results of the Get operation in the batch
if ops[0].Value != nil {
assert.Equal(t, []byte("updated_value"), ops[0].Value)
}

// Verify that Set and Delete operations in the batch were successful
value, err := client.Get(context.Background(), "new_key")
assert.NoError(t, err)
assert.Equal(t, []byte("new_value"), value)

value, err = client.Get(context.Background(), "nonexistent_key")
assert.NoError(t, err)
assert.Nil(t, value)
})

// Create a test client that embeds the dbStorageClient and overrides the Batch method
testClient := &testDBStorageClient{dbStorageClient: client}

// Run a test to simulate a rollback.
t.Run("RollbackInBatch", func(t *testing.T) {
// Override the Batch method with a custom implementation
testClient.batchOverride = func(ctx context.Context, ops ...storage.Operation) error {
for _, op := range ops {
if op.Type == storage.Set && op.Key == "key5" {
return errors.New("intentional error")
}
}
return testClient.dbStorageClient.Batch(ctx, ops...)
}

ops := []storage.Operation{
{Type: storage.Set, Key: "key1", Value: []byte("value1")},
{Type: storage.Set, Key: "key2", Value: []byte("value2")},
{Type: storage.Set, Key: "key3", Value: []byte("value3")},
{Type: storage.Set, Key: "key4", Value: []byte("value4")},
{Type: storage.Set, Key: "key5", Value: []byte("value5")},
}

err := testClient.Batch(context.Background(), ops...)
assert.Error(t, err) // Expecting an error due to intentional error during Set

// Verify that the values are not stored in the database
for _, op := range ops {
value, getErr := client.Get(context.Background(), op.Key)
assert.NoError(t, getErr) // Expecting no error during Get
assert.Nil(t, value) // Expecting nil value as a result of rollback
}
})
}

type testDBStorageClient struct {
*dbStorageClient
batchOverride func(ctx context.Context, ops ...storage.Operation) error
}

func (c *testDBStorageClient) Batch(ctx context.Context, ops ...storage.Operation) error {
if c.batchOverride != nil {
return c.batchOverride(ctx, ops...)
}
return c.dbStorageClient.Batch(ctx, ops...)
}

// setupTestEnvironment creates a new dbStorageClient for testing, opens a test database
// returns the client and cleanup function.
func setupTestEnvironment(t *testing.T, tableName string) (*dbStorageClient, func()) {
// Open a test database.
db, err := sql.Open("sqlite3", testDBFile)
assert.NoError(t, err)

// Set up the test client.
client, err := newClient(context.Background(), db, tableName)
assert.NoError(t, err)

// Define cleanup function to close the client and database and remove the test database file.
cleanupFunc := func() {
_ = client.Close(context.Background())
_ = db.Close()
_ = os.Remove(testDBFile)
}

return client, cleanupFunc
}