Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][tracker]: save most recent (archive) write index to disk #36799

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

const knownFilesKey = "knownFiles"
Expand All @@ -21,7 +22,7 @@ func Save(ctx context.Context, persister operator.Persister, rmds []*reader.Meta
return SaveKey(ctx, persister, rmds, knownFilesKey)
}

func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error {
func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...storage.Operation) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

Expand All @@ -37,8 +38,8 @@ func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.M
errs = append(errs, fmt.Errorf("encode metadata: %w", err))
}
}

if err := persister.Set(ctx, key, buf.Bytes()); err != nil {
ops = append(ops, storage.SetOperation(key, buf.Bytes()))
if err := persister.Batch(ctx, ops...); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For existing usage, this will be a no-op.

errs = append(errs, fmt.Errorf("persist known files: %w", err))
}

Expand Down
51 changes: 46 additions & 5 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package tracker // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"context"
"encoding/binary"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
Expand Down Expand Up @@ -52,12 +55,33 @@ type fileTracker struct {
archiveIndex int
}

var errInvalidValue = errors.New("invalid value")

var archiveIndexKey = "knonwFiles_ai"
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
}
set.Logger = set.Logger.With(zap.String("tracker", "fileTracker"))
archiveIndex := 0
if persister != nil && pollsToArchive > 0 {
byteIndex, err := persister.Get(context.Background(), archiveIndexKey)
if err != nil {
set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err))
}
archiveIndex, err = byteToIndex(byteIndex)
if err != nil {
set.Logger.Error("error getting read index. Starting from 0", zap.Error(err))
archiveIndex = 0
} else if archiveIndex < 0 || archiveIndex >= pollsToArchive {
// safety check. It can happen if `polls_to_archive` was changed.
// It's best if we reset the index or else we might end up writing invalid keys
set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0")
archiveIndex = 0
}
}
return &fileTracker{
set: set,
maxBatchFiles: maxBatchFiles,
Expand All @@ -66,7 +90,7 @@ func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToA
knownFiles: knownFiles,
pollsToArchive: pollsToArchive,
persister: persister,
archiveIndex: 0,
archiveIndex: archiveIndex,
}
}

Expand Down Expand Up @@ -165,10 +189,12 @@ func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
if t.pollsToArchive <= 0 || t.persister == nil {
return
}
if err := t.writeArchive(t.archiveIndex, metadata); err != nil {
index := t.archiveIndex
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
indexOp := storage.SetOperation(archiveIndexKey, intToByte(t.archiveIndex)) // batch the updated index with metadata
if err := t.writeArchive(index, metadata, indexOp); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
Expand All @@ -184,9 +210,9 @@ func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata]
}

// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...storage.Operation) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key, ops...)
}

// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
Expand Down Expand Up @@ -295,3 +321,18 @@ func (t *noStateTracker) EndPoll() {}
func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }

func intToByte(val int) []byte {
return binary.LittleEndian.AppendUint64([]byte{}, uint64(val))
}

func byteToIndex(buf []byte) (int, error) {
if buf == nil {
return 0, nil
}
// The sizeof uint64 in binary is 8.
if len(buf) < 8 {
return 0, errInvalidValue
}
return int(binary.LittleEndian.Uint64(buf)), nil
}
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
31 changes: 31 additions & 0 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -44,6 +45,36 @@ func TestFindFilesOrder(t *testing.T) {
}
}

func TestIndexInBounds(t *testing.T) {
persister := testutil.NewUnscopedMockPersister()
pollsToArchive := 100
tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// no index exists. archiveIndex should be 0
require.Equal(t, tracker.archiveIndex, 0)

// run archiving. Each time, index should be in bound.
for i := 0; i < 1075; i++ {
require.Equalf(t, i%pollsToArchive, tracker.archiveIndex, "Index should %d, but was %d", i%pollsToArchive, tracker.archiveIndex)
tracker.archive(&fileset.Fileset[*reader.Metadata]{})
require.Truef(t, tracker.archiveIndex >= 0 && tracker.archiveIndex < pollsToArchive, "Index should be between 0 and %d, but was %d", pollsToArchive, tracker.archiveIndex)
}
oldIndex := tracker.archiveIndex

// re-create archive
tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// index should exist and new archiveIndex should be equal to oldIndex
require.Equalf(t, oldIndex, tracker.archiveIndex, "New index should %d, but was %d", oldIndex, tracker.archiveIndex)

// re-create archive, with reduced pollsToArchive
pollsToArchive = 70
tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// index should exist but it is out of bounds. So it should reset to 0
require.Equalf(t, tracker.archiveIndex, 0, "Index should be reset to 0 but was %d", tracker.archiveIndex)
}

func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool {
md := make([]*reader.Metadata, 0)

Expand Down
7 changes: 7 additions & 0 deletions pkg/stanza/operator/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ package operator // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"context"
"fmt"

"go.opentelemetry.io/collector/extension/experimental/storage"
)

// Persister is an interface used to persist data
type Persister interface {
Get(context.Context, string) ([]byte, error)
Set(context.Context, string, []byte) error
Delete(context.Context, string) error
Batch(ctx context.Context, ops ...storage.Operation) error
}

type scopedPersister struct {
Expand All @@ -38,3 +41,7 @@ func (p scopedPersister) Set(ctx context.Context, key string, value []byte) erro
func (p scopedPersister) Delete(ctx context.Context, key string) error {
return p.Persister.Delete(ctx, fmt.Sprintf("%s.%s", p.scope, key))
}

func (p scopedPersister) Batch(ctx context.Context, ops ...storage.Operation) error {
return p.Persister.Batch(ctx, ops...)
}
19 changes: 19 additions & 0 deletions pkg/stanza/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

type mockPersister struct {
Expand Down Expand Up @@ -46,6 +47,24 @@ func (p *mockPersister) Delete(_ context.Context, k string) error {
return nil
}

func (p *mockPersister) Batch(_ context.Context, ops ...storage.Operation) error {
var err error
for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value, err = p.Get(context.Background(), op.Key)
case storage.Set:
err = p.Set(context.Background(), op.Key, op.Value)
case storage.Delete:
err = p.Delete(context.Background(), op.Key)
}
if err != nil {
return err
}
}
return nil
}

// NewUnscopedMockPersister will return a new persister for testing
func NewUnscopedMockPersister() operator.Persister {
data := make(map[string][]byte)
Expand Down
Loading