Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][tracker]: save most recent (archive) write index to disk #36799

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

VihasMakwana
Copy link
Contributor

@VihasMakwana VihasMakwana commented Dec 12, 2024

This PR stores the most recent index to disk. Much similar to what happens for persistent queue. It also adds Batch methods to operator.Persister, as saving the metadata and saving the index should be a transaction and it can only be achieved via Batch.

For eg. if user has configured archiving to store 100 poll cycles, let's assume:

  • For first collector run, it stores 10 cycles and archiveIndex is 11 (pointing to the next index).
  • When the collector is restarted, we will restore the archiveIndex from disk and continue from index 11

Link to tracking issue

Closes #32727

Testing

Added UT for checking index


if err := persister.Set(ctx, key, buf.Bytes()); err != nil {
ops = append(ops, storage.SetOperation(key, buf.Bytes()))
if err := persister.Batch(ctx, ops...); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For existing usage, this will be a no-op.

pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
// It's best if we reset the index or else we might end up writing invalid keys
t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0")
t.archiveIndex = 0
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to check for this case.

However, I wonder if we can handle it better than restarting from zero. What would it take to search the archive for the most recently updated?

I think we could maintain some kind of data structure which notes the time each archive was written. Maybe just map[index]time.Time. Then when we first create the tracker, we can load this up and find the most recent timestamp. We can also check for the case where pollsToArchive has changed and then rewrite the storage to align with the new value.

For example, if we previously saved 10 archives and find that pollsToArchive is now 5, we can find the 5 most recent indices based on the timestamp structure, then rewrite the archive files so that these are 0-4. We should probably even delete the extras from storage as well.

Copy link
Contributor Author

@VihasMakwana VihasMakwana Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djaglowski This solution does makes sense to me, but it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.
We might need to load the filesets in memory.
I'll find a few ways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it becomes tricky when we eventually overwrite old archive data, as it is a ring buffer.

Can you elaborate?

We might need to load the filesets in memory.

If it's more than one at a time then it defeats the point of the archive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate?

Consider this archive,

Screenshot 2024-12-23 at 8 37 38 PM

We've rolled over once and the latest data is at index 4 and archiveIndex (i.e. where the next data will be written) is at index 5.

Let's suppose that new polls_to_archive is 7.
We now need to construct a new, smaller archive with 7 most recent elements.
These elements are (from most recent to least recent):

14, 13, 12, 11, 10, 9, 8

We cannot simply rewrite archive in-place without caching values.

It would be much simpler to convert archive like following image,

Screenshot 2024-12-23 at 8 41 43 PM

and we would delete excess data.

Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would it take to search the archive for the most recently updated?

It would always be data stored at archiveIndex-1 index. We will store archiveIndex on disk, so in next collector run, we would load that value and we can find most recent data.

archiveIndex points at the next location where data will be written.
This can point to either of following:

  • Least recent data
  • Pointing to an empty slot (archive is partially filled)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would always be data stored at archiveIndex-1 index.

What if pollsToArchive < archiveIndex-1?

Anyways, this is an edge case we can worry about later. I think we have bigger problems to work through first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correcting my previous comment,
Most recently written index is (archiveIndex-1) % previouspollsToArchive

pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
@mx-psi mx-psi removed their assignment Dec 13, 2024
@VihasMakwana
Copy link
Contributor Author

@djaglowski I've added documentation and implemented the archive restoration.

I think adding a new data structure like map[index]time.Time will add to the complexity if we've rolled over and overwriting older data.

We can accomplish archive restoration without any new data structure and I've added a document to highlight this. Please take a look and let me know your thoughts.

Copy link
Member

@djaglowski djaglowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the documentation changes to another PR?

@@ -0,0 +1,279 @@
# File Matching
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djaglowski all of it. I've renamed the file and placed it in a new directory.

Anyway, I'll separate it out into a new PR.

@@ -0,0 +1,173 @@
# File archiving

The file consumer now supports archiving. Previously, file offsets older than three poll cycles were discarded, and if such files reappeared (which could happen if they were temporarily removed or if `exclude_older_than` was enabled), the entire file contents would be read again.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to describe previous functionality in this document


## How does archiving work?

- We stores the offsets older than three poll cycles on disk. If we use `polls_to_archive: 10`, the on-disk structure looks like following:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have 3 in memory and 7 on disk? This seems worth calling out explicitly in the example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have 3 in memory and 7 on disk? This seems worth calling out explicitly in the example.

No. We have 3 in memory and 10 on disk.
I will explicitly mention this to clear the difference.

### How does reading from archiving work?

During reader creation, we group all the new (or unmatched) files and try to find a match in archive. From high level, it consists of following steps:
1. We start from most recently writen index on archive and load the data from it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention in-memory first, then archive is used as a fallback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

@VihasMakwana
Copy link
Contributor Author

@djaglowski new PR for docs #37067

@VihasMakwana
Copy link
Contributor Author

@djaglowski as we'll include docs in new PR, please let me know if you have any comments regarding the implementation.

pkg/stanza/fileconsumer/internal/checkpoint/checkpoint.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
pkg/stanza/fileconsumer/internal/tracker/tracker.go Outdated Show resolved Hide resolved
// It's best if we reset the index or else we might end up writing invalid keys
t.set.Logger.Warn("the read index was found, but it exceeds the bounds. Starting from 0")
t.archiveIndex = 0
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would always be data stored at archiveIndex-1 index.

What if pollsToArchive < archiveIndex-1?

Anyways, this is an edge case we can worry about later. I think we have bigger problems to work through first.

@VihasMakwana
Copy link
Contributor Author

@djaglowski I've added more test coverage. PTAL at your convenience! Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

old logs could be collected again when exclude_older_than is enabled
3 participants