Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] - add cancellation to s3 source #3574

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

Conversation

ahrav
Copy link
Collaborator

@ahrav ahrav commented Nov 8, 2024

Description:

This PR handles cancellation during a s3 scan.

Checklist:

  • Tests passing (make test-community)?
  • Lint passing (make lint this requires golangci-lint)?

@ahrav ahrav requested review from a team November 8, 2024 18:02
@ahrav ahrav marked this pull request as ready for review November 8, 2024 18:07
@ahrav ahrav requested a review from a team as a code owner November 8, 2024 18:07
Copy link
Collaborator

@mcastorina mcastorina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like most of the cancellation happens via contexts, which means the workers we want to cancel should already be context aware.

Can this be done with a standard context.WithCancel?

@ahrav
Copy link
Collaborator Author

ahrav commented Nov 9, 2024

It looks like most of the cancellation happens via contexts, which means the workers we want to cancel should already be context aware.

Can this be done with a standard context.WithCancel?

Would you mind providing an example? I couldn't get it to work :( I was having a problem getting the error out of pageChunker

@ahrav
Copy link
Collaborator Author

ahrav commented Nov 9, 2024

It looks like most of the cancellation happens via contexts, which means the workers we want to cancel should already be context aware.

Can this be done with a standard context.WithCancel?

Would you mind providing an example? I couldn't get it to work :( I was having a problem getting the error out of pageChunker

If you can get it to work feel free to close this. I'll be out next week, so feel free to replace this PR :)

Just gotta make sure the resumption test in #3570 works.

@mcastorina
Copy link
Collaborator

I'll be out most of next week too, but for posterity another avenue could be errgroup.WithContext.

@ahrav
Copy link
Collaborator Author

ahrav commented Nov 9, 2024

I'll be out most of next week too, but for posterity another avenue could be errgroup.WithContext.

hmm.. I guess we could do something like:

        // Start a goroutine to watch for errors from pageChunker
        go func() {
            select {
            case err := <-errChan:
                if err != nil {
                    cancel() // Cancel the context if we get an error
                }
            case <-scanCtx.Done():
                return
            }
        }()

        input := &s3.ListObjectsV2Input{Bucket: &bucket}
        if bucket == resumePoint.CurrentBucket && resumePoint.StartAfter != "" {
            input.StartAfter = &resumePoint.StartAfter
        }

        err = regionalClient.ListObjectsV2PagesWithContext(
            scanCtx, // Use the cancellable context
            input,
            func(page *s3.ListObjectsV2Output, _ bool) bool {
                pageMetadata := pageMetadata{
                    bucket:     bucket,
                    pageNumber: pageNumber,
                    client:     regionalClient,
                    page:       page,
                }
                processingState := processingState{
                    errorCount:  &errorCount,
                    objectCount: &objectCount,
                }

                // Process the page and send any errors to errChan
                if err := s.pageChunker(scanCtx, pageMetadata, processingState, chunksChan); err != nil {
                    select {
                    case errChan <- err:
                    default:
                    }
                    return false
                }

                if scanCtx.Err() != nil {
                    return false
                }

                pageNumber++
                return true
            })

        // Check if we stopped due to context cancellation
        if scanCtx.Err() != nil {
            return scanCtx.Err()
        }

        if err != nil {
            if role == "" {
                bucketCtx.Logger().Error(err, "could not list objects in bucket")
            } else {
                bucketCtx.Logger().V(3).Info("could not list objects in bucket", "err", err)
            }
        }
    }

Idk why, but that feels more complicated 😅 (assuming, this is what you meant?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants