Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][tracker]: save most recent (archive) write index to disk #36799

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
"errors"
"fmt"

"go.opentelemetry.io/collector/extension/experimental/storage"

Check failure on line 13 in pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go

View workflow job for this annotation

GitHub Actions / govulncheck (pkg)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
Expand All @@ -21,7 +23,7 @@
return SaveKey(ctx, persister, rmds, knownFilesKey)
}

func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string) error {
func SaveKey(ctx context.Context, persister operator.Persister, rmds []*reader.Metadata, key string, ops ...storage.Operation) error {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

Expand All @@ -37,8 +39,8 @@
errs = append(errs, fmt.Errorf("encode metadata: %w", err))
}
}

if err := persister.Set(ctx, key, buf.Bytes()); err != nil {
ops = append(ops, storage.SetOperation(key, buf.Bytes()))
if err := persister.Batch(ctx, ops...); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For existing usage, this will be a no-op.

errs = append(errs, fmt.Errorf("persist known files: %w", err))
}

Expand Down
178 changes: 166 additions & 12 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"

Check failure on line 14 in pkg/stanza/fileconsumer/internal/tracker/tracker.go

View workflow job for this annotation

GitHub Actions / govulncheck (pkg)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
Expand Down Expand Up @@ -52,13 +56,19 @@
archiveIndex int
}

var errInvalidValue = errors.New("invalid value")

var archiveIndexKey = "knownFilesArchiveIndex"
var archivePollsToArchiveKey = "knonwFilesPollsToArchive"

func NewFileTracker(set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
}
set.Logger = set.Logger.With(zap.String("tracker", "fileTracker"))
return &fileTracker{

t := &fileTracker{
set: set,
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
Expand All @@ -68,6 +78,11 @@
persister: persister,
archiveIndex: 0,
}
if t.archiveEnabled() {
t.restoreArchiveIndex(context.Background())
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
}

return t
}

func (t *fileTracker) Add(reader *reader.Reader) {
Expand Down Expand Up @@ -131,7 +146,9 @@
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]

// Instead of throwing it away, archive it.
t.archive(t.knownFiles[2])
if t.archiveEnabled() {
t.archive(t.knownFiles[2])
}
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles)
}
Expand All @@ -144,6 +161,109 @@
return total
}

func (t *fileTracker) restoreArchiveIndex(ctx context.Context) {
byteIndex, err := t.persister.Get(ctx, archiveIndexKey)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err))
t.archiveIndex = 0
return
}

previousPollsToArchive, err := t.previousPollsToArchive(ctx)
if err != nil {
// if there's an error reading previousPollsToArchive, default to current value
previousPollsToArchive = t.pollsToArchive
}

t.archiveIndex, err = decodeIndex(byteIndex)
if err != nil {
t.set.Logger.Error("error getting read index. Starting from 0", zap.Error(err))
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
} else if previousPollsToArchive < t.pollsToArchive {
// if archive size has increased, we just increment the index until we enconter a nil value
for t.archiveIndex < t.pollsToArchive && t.isSet(ctx, t.archiveIndex) {
t.archiveIndex++
}
} else if previousPollsToArchive > t.pollsToArchive {
// we will only attempt to rewrite archive if the archive size has shrunk
t.set.Logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive")
t.rewriteArchive(ctx, previousPollsToArchive)
}

t.removeExtraKeys(ctx)

// store current pollsToArchive
if err := t.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(t.pollsToArchive)); err != nil {
t.set.Logger.Error("Error storing polls_to_archive", zap.Error(err))
}
}

func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) {
// Ensure archiveIndex is non-negative
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
if t.archiveIndex < 0 {
t.archiveIndex = 0
return
}
// Function to swap data between two indices
swapData := func(idx1, idx2 int) error {
VihasMakwana marked this conversation as resolved.
Show resolved Hide resolved
val1, err := t.persister.Get(ctx, archiveKey(idx1))
if err != nil {
return err
}
val2, err := t.persister.Get(ctx, archiveKey(idx2))
if err != nil {
return err
}
return t.persister.Batch(ctx, storage.SetOperation(archiveKey(idx1), val2), storage.SetOperation(archiveKey(idx2), val1))
}
// Calculate the least recent index, w.r.t. new archive size

leastRecentIndex := mod(t.archiveIndex-t.pollsToArchive, previousPollsToArchive)
// Refer archive.md for the detailed design

if mod(t.archiveIndex-1, previousPollsToArchive) > t.pollsToArchive {
for i := 0; i < t.pollsToArchive; i++ {
if err := swapData(i, leastRecentIndex); err != nil {
t.set.Logger.Error("error while swapping archive", zap.Error(err))
}
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
}
t.archiveIndex = 0
} else {
if t.isSet(ctx, t.archiveIndex) {
// If the current index points at an unset key, no need to do anything
return
}
for i := 0; i < t.pollsToArchive-t.archiveIndex; i++ {
if err := swapData(t.archiveIndex+i, leastRecentIndex); err != nil {
t.set.Logger.Warn("error while swapping archive", zap.Error(err))
}
leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive
}
}
}

func (t *fileTracker) removeExtraKeys(ctx context.Context) {
for i := t.pollsToArchive; t.isSet(ctx, i); i++ {
if err := t.persister.Delete(ctx, archiveKey(i)); err != nil {
t.set.Logger.Error("error while cleaning extra keys", zap.Error(err))
}
}
}

func (t *fileTracker) previousPollsToArchive(ctx context.Context) (int, error) {
byteIndex, err := t.persister.Get(ctx, archivePollsToArchiveKey)
if err != nil {
t.set.Logger.Error("error while reading the archiveIndexKey", zap.Error(err))
return 0, err
}
previousPollsToArchive, err := decodeIndex(byteIndex)
if err != nil {
t.set.Logger.Error("error while decoding previousPollsToArchive", zap.Error(err))
return 0, err
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to check for this case.

However, I wonder if we can handle it better than restarting from zero. What would it take to search the archive for the most recently updated?

I think we could maintain some kind of data structure which notes the time each archive was written. Maybe just map[index]time.Time. Then when we first create the tracker, we can load this up and find the most recent timestamp. We can also check for the case where pollsToArchive has changed and then rewrite the storage to align with the new value.

For example, if we previously saved 10 archives and find that pollsToArchive is now 5, we can find the 5 most recent indices based on the timestamp structure, then rewrite the archive files so that these are 0-4. We should probably even delete the extras from storage as well.

Copy link
Contributor Author

@VihasMakwana VihasMakwana Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djaglowski This solution does makes sense to me, but it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.
We might need to load the filesets in memory.
I'll find a few ways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.

Can you elaborate?

We might need to load the filesets in memory.

If it's more than one at a time then it defeats the point of the archive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate?

Consider this archive,

Screenshot 2024-12-23 at 8 37 38 PM

We've rolled over once and the latest data is at index 4 and archiveIndex (i.e. where the next data will be written) is at index 5.

Let's suppose that new polls_to_archive is 7.
We now need to construct a new, smaller archive with 7 most recent elements.
These elements are (from most recent to least recent):

14, 13, 12, 11, 10, 9, 8

We cannot simply rewrite archive in-place without caching values.

It would be much simpler to convert archive like following image,

Screenshot 2024-12-23 at 8 41 43 PM

and we would delete excess data.

Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would it take to search the archive for the most recently updated?

It would always be data stored at archiveIndex-1 index. We will store archiveIndex on disk, so in next collector run, we would load that value and we can find most recent data.

archiveIndex points at the next location where data will be written.
This can point to either of following:

  • Least recent data
  • Pointing to an empty slot (archive is partially filled)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would always be data stored at archiveIndex-1 index.

What if pollsToArchive < archiveIndex-1?

Anyways, this is an edge case we can worry about later. I think we have bigger problems to work through first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correcting my previous comment,
Most recently written index is (archiveIndex-1) % previouspollsToArchive

return previousPollsToArchive, nil
}

func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) {
// We make use of a ring buffer, where each set of files is stored under a specific index.
// Instead of discarding knownFiles[2], write it to the next index and eventually roll over.
Expand All @@ -162,19 +282,17 @@
// start
// index

if t.pollsToArchive <= 0 || t.persister == nil {
return
}
if err := t.writeArchive(t.archiveIndex, metadata); err != nil {
index := t.archiveIndex
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
indexOp := storage.SetOperation(archiveIndexKey, encodeIndex(t.archiveIndex)) // batch the updated index with metadata
if err := t.writeArchive(index, metadata, indexOp); err != nil {
t.set.Logger.Error("error faced while saving to the archive", zap.Error(err))
}
t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index
}

// readArchive loads data from the archive for a given index and returns a fileset.Filset.
func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) {
key := fmt.Sprintf("knownFiles%d", index)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, key)
metadata, err := checkpoint.LoadKey(context.Background(), t.persister, archiveKey(index))
if err != nil {
return nil, err
}
Expand All @@ -184,9 +302,17 @@
}

// writeArchive saves data to the archive for a given index and returns an error, if encountered.
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata]) error {
key := fmt.Sprintf("knownFiles%d", index)
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), key)
func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...storage.Operation) error {
return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), archiveKey(index), ops...)
}

func (t *fileTracker) archiveEnabled() bool {
return t.pollsToArchive > 0 && t.persister != nil
}

func (t *fileTracker) isSet(ctx context.Context, index int) bool {
val, err := t.persister.Get(ctx, archiveKey(index))
return val != nil && err == nil
}

// FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set.
Expand Down Expand Up @@ -295,3 +421,31 @@
func (t *noStateTracker) TotalReaders() int { return 0 }

func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil }

func encodeIndex(val int) []byte {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)

// Encode the index
if err := enc.Encode(val); err != nil {
return nil
}
return buf.Bytes()
}

func decodeIndex(buf []byte) (int, error) {
var index int

// Decode the index
dec := json.NewDecoder(bytes.NewReader(buf))
err := dec.Decode(&index)
return max(index, 0), err
}

func archiveKey(i int) string {
return fmt.Sprintf("knownFiles%d", i)
}

func mod(x, y int) int {
return (x + y) % y
}
31 changes: 31 additions & 0 deletions pkg/stanza/fileconsumer/internal/tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -44,6 +45,36 @@ func TestFindFilesOrder(t *testing.T) {
}
}

func TestIndexInBounds(t *testing.T) {
persister := testutil.NewUnscopedMockPersister()
pollsToArchive := 100
tracker := NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// no index exists. archiveIndex should be 0
require.Equal(t, 0, tracker.archiveIndex)

// run archiving. Each time, index should be in bound.
for i := 0; i < 1099; i++ {
require.Equalf(t, i%pollsToArchive, tracker.archiveIndex, "Index should %d, but was %d", i%pollsToArchive, tracker.archiveIndex)
tracker.archive(&fileset.Fileset[*reader.Metadata]{})
require.Truef(t, tracker.archiveIndex >= 0 && tracker.archiveIndex < pollsToArchive, "Index should be between 0 and %d, but was %d", pollsToArchive, tracker.archiveIndex)
}
oldIndex := tracker.archiveIndex

// re-create archive
tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// index should exist and new archiveIndex should be equal to oldIndex
require.Equalf(t, oldIndex, tracker.archiveIndex, "New index should %d, but was %d", oldIndex, tracker.archiveIndex)

// re-create archive, with reduced pollsToArchive
pollsToArchive = 70
tracker = NewFileTracker(componenttest.NewNopTelemetrySettings(), 0, pollsToArchive, persister).(*fileTracker)

// index should exist but it is out of bounds. So it should reset to 0
require.Equalf(t, 0, tracker.archiveIndex, "Index should be reset to 0 but was %d", tracker.archiveIndex)
}

func populatedPersisterData(persister operator.Persister, fps []*fingerprint.Fingerprint) []bool {
md := make([]*reader.Metadata, 0)

Expand Down
10 changes: 10 additions & 0 deletions pkg/stanza/operator/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import (
"context"
"fmt"

"go.opentelemetry.io/collector/extension/experimental/storage"

Check failure on line 10 in pkg/stanza/operator/persister.go

View workflow job for this annotation

GitHub Actions / govulncheck (pkg)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")

Check failure on line 10 in pkg/stanza/operator/persister.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor-0)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")

Check failure on line 10 in pkg/stanza/operator/persister.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-1)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")

Check failure on line 10 in pkg/stanza/operator/persister.go

View workflow job for this annotation

GitHub Actions / govulncheck (exporter-1)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")

Check failure on line 10 in pkg/stanza/operator/persister.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-0)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")

Check failure on line 10 in pkg/stanza/operator/persister.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-3)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")
)

// Persister is an interface used to persist data
type Persister interface {
Get(context.Context, string) ([]byte, error)
Set(context.Context, string, []byte) error
Delete(context.Context, string) error
Batch(ctx context.Context, ops ...storage.Operation) error
}

type scopedPersister struct {
Expand All @@ -38,3 +41,10 @@
func (p scopedPersister) Delete(ctx context.Context, key string) error {
return p.Persister.Delete(ctx, fmt.Sprintf("%s.%s", p.scope, key))
}

func (p scopedPersister) Batch(ctx context.Context, ops ...storage.Operation) error {
for _, op := range ops {
op.Key = fmt.Sprintf("%s.%s", p.scope, op.Key)
}
return p.Persister.Batch(ctx, ops...)
}
20 changes: 20 additions & 0 deletions pkg/stanza/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"strings"
"sync"

"go.opentelemetry.io/collector/extension/experimental/storage"

Check failure on line 11 in pkg/stanza/testutil/util.go

View workflow job for this annotation

GitHub Actions / govulncheck (pkg)

could not import go.opentelemetry.io/collector/extension/experimental/storage (invalid package name: "")

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)

Expand Down Expand Up @@ -46,6 +48,24 @@
return nil
}

func (p *mockPersister) Batch(_ context.Context, ops ...storage.Operation) error {
var err error
for _, op := range ops {
switch op.Type {
case storage.Get:
op.Value, err = p.Get(context.Background(), op.Key)
case storage.Set:
err = p.Set(context.Background(), op.Key, op.Value)
case storage.Delete:
err = p.Delete(context.Background(), op.Key)
}
if err != nil {
return err
}
}
return nil
}

// NewUnscopedMockPersister will return a new persister for testing
func NewUnscopedMockPersister() operator.Persister {
data := make(map[string][]byte)
Expand Down
Loading