Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/dataflux): add worksteal algorithm to fast-listing #10913

Merged
merged 12 commits into from
Sep 29, 2024
82 changes: 72 additions & 10 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"runtime"
"strings"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
Expand All @@ -41,18 +43,22 @@ type ListerInput struct {
// BucketName is the name of the bucket to list objects from. Required.
BucketName string

// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
// Parallelism is number of parallel workers to use for listing.
// Default value is 10x number of available CPU. Optional.
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
Parallelism int

// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
// The number of objects returned will be rounded up to a multiple of gcs page size.
// BatchSize is the number of objects to list. Default value returns all
// objects at once. Optional. The number of objects returned will be
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// rounded up to a multiple of gcs page size.
BatchSize int

// Query is the query to filter objects for listing. Default value is nil. Optional.
// Use ProjectionNoACL for faster listing. Including ACLs increases latency while fetching objects.
// Query is the query to filter objects for listing. Default value is nil.
// Optional. Use ProjectionNoACL for faster listing. Including ACLs increases
// latency while fetching objects.
Query storage.Query

// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
// SkipDirectoryObjects is to indicate whether to list directory objects.
// Default value is false. Optional.
SkipDirectoryObjects bool
}

Expand All @@ -62,32 +68,53 @@ type Lister struct {
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
method listingMethod

// pageToken is the token to use for sequential listing.
pageToken string

// bucket is the bucket handle to list objects from.
bucket *storage.BucketHandle

// batchSize is the number of objects to list.
batchSize int

// parallelism is number of parallel workers to use for listing.
parallelism int

// query is the query to filter objects for listing.
query storage.Query

// pageToken is the token to use for sequential listing.
pageToken string

// ranges is the channel to store the start and end ranges to be listed
// by the workers in worksteal listing.
ranges chan *listRange

// skipDirectoryObjects is to indicate whether to list directory objects.
skipDirectoryObjects bool
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)

// If parallelism is not given, set default value to 10x the number of available CPU.
if in.Parallelism == 0 {
in.Parallelism = runtime.NumCPU() * 10
}
// Initialize range channel with entire namespace of object for given prefix,
// startoffset and endoffset. For the default range to list is entire namespace,
// start and end will be empty.
rangeChannel := make(chan *listRange, in.Parallelism*2)
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
start, end := updateStartEndOffset(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
rangeChannel <- &listRange{startRange: start, endRange: end}

lister := &Lister{
method: open,
parallelism: in.Parallelism,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
ranges: rangeChannel,
}
return lister
}
Expand All @@ -108,7 +135,9 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)

// To start listing method is Open and runs both worksteal and sequential listing in parallel.
// The method which completes first is used for all subsequent runs.

// TODO: Run worksteal listing when method is Open or WorkSteal.

// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {

Expand Down Expand Up @@ -150,6 +179,39 @@ func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error)

// Close closes the range channel of the Lister.
func (c *Lister) Close() {
if c.ranges != nil {
close(c.ranges)
}
}

// TODO: Close range channel for worksteal lister.
// updateStartEndOffset updates start and end offset based on prefix.
// If a prefix is given, start and end should contain the value after prefix.
// For example, if start is "prefix_a", then start should be "_a".
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
func updateStartEndOffset(start, end, prefix string) (string, string) {
if prefix == "" {
return start, end
}
if start != "" && end != "" && start >= end {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
return start, start
}
if start != "" {
if start <= prefix {
start = ""
} else if strings.HasPrefix(start, prefix) {
start = start[len(prefix):]
} else {
return start, start
}
}

if end != "" {
if len(end) > len(prefix) && strings.HasPrefix(end, prefix) {
end = end[len(prefix):]
} else if end > prefix {
end = ""
} else {
return end, end
}
}
return start, end
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
}
4 changes: 2 additions & 2 deletions storage/dataflux/range_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ func (rs *rangeSplitter) isRangeEqualWithPadding(startRange, endRange []rune) bo
return true
}

// charAtOrDefault returns the character at the specified position, or the default character if
// the position is out of bounds.
// charAtOrDefault returns the character at the specified position, or the default
// character if the position is out of bounds.
func charAtOrDefault(charArray []rune, position int, defaultChar rune) rune {
if position < 0 || position >= len(charArray) {
return defaultChar
Expand Down
4 changes: 2 additions & 2 deletions storage/dataflux/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (c *Lister) sequentialListing(ctx context.Context) ([]*storage.ObjectAttrs,
objectIterator.PageInfo().MaxSize = defaultPageSize

for {
objects, nextToken, numObjects, err := doListing(objectIterator, c.skipDirectoryObjects)
objects, nextToken, numObjects, err := doSeqListing(objectIterator, c.skipDirectoryObjects)
if err != nil {
return nil, "", fmt.Errorf("failed while listing objects: %w", err)
}
Expand All @@ -55,7 +55,7 @@ func (c *Lister) sequentialListing(ctx context.Context) ([]*storage.ObjectAttrs,
return result, lastToken, nil
}

func doListing(objectIterator *storage.ObjectIterator, skipDirectoryObjects bool) (result []*storage.ObjectAttrs, token string, objectsListed int, err error) {
func doSeqListing(objectIterator *storage.ObjectIterator, skipDirectoryObjects bool) (result []*storage.ObjectAttrs, token string, objectsListed int, err error) {

for {
attrs, errObjectIterator := objectIterator.Next()
Expand Down
184 changes: 179 additions & 5 deletions storage/dataflux/worksteal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@ package dataflux

import (
"context"
"fmt"
"sync"
"time"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
)

const (
// defaultAlphabet used to initiliaze rangesplitter.
defaultAlphabet = "ab"
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
// sleepDurationWhenIdle is the milliseconds we want each worker to sleep before checking
// the next update if it is idle.
sleepDurationWhenIdle = 200
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
)

// workerStatus indicates the status of a worker.
Expand All @@ -45,16 +56,171 @@ type worker struct {
idleChannel chan int
result *listerResult
generation int64
lister *Lister
}

// workstealListing is the main entry point of the worksteal algorithm.
// It performs worksteal to achieve highly dynamic object listing.
func (c *Lister) workstealListing(ctx context.Context) []*storage.ObjectAttrs {
// workstealListing creates multiple (parallelism) workers that simultaneosly lists
// objects from the buckets.
func (c *Lister) workstealListing(ctx context.Context) ([]*storage.ObjectAttrs, error) {
var workerErr []error
// Idle channel is used to track number of idle workers.
idleChannel := make(chan int, c.parallelism)
// Result channel is used to track the results from each worker.
result := &listerResult{
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
objects: []*storage.ObjectAttrs{},
}

rs, err := newRangeSplitter(defaultAlphabet)
if err != nil {
return nil, fmt.Errorf("creating new range splitter: %w", err)
}

g, ctx := errgroup.WithContext(ctx)
// Initialize all workers as idle.
for i := 0; i < c.parallelism; i++ {
idleWorker := &worker{
goroutineID: i,
startRange: "",
endRange: "",
status: idle,
rangesplitter: rs,
result: result,
idleChannel: idleChannel,
generation: int64(0),
lister: c,
}
idleChannel <- 1
g.Go(func() error {
if err := idleWorker.doWorkstealListing(ctx); err != nil {
workerErr = append(workerErr, err)
return fmt.Errorf("listing worker ID %q: %w", i, err)
}
return nil
})
}

if err := g.Wait(); err != nil {
return nil, fmt.Errorf("failed waiting for multiple workers : %w", err)
}
if len(workerErr) > 0 {
return nil, fmt.Errorf("failure in workers : %v", workerErr)
}

close(idleChannel)

return result.objects, nil
}

// doWorkstealListing implements the listing logic for each worker.
// An active worker lists next page of objects to be listed within the given range and then splits
// range into two half if there are idle workers. Worker keeps the first of splitted range and passes second half of the work in range channel for
// idle workers. It continues to do this until shutdown signal is true.
// An idle worker waits till it finds work in rangeChannel. Once it finds work, it acts like an active worker.
func (w *worker) doWorkstealListing(ctx context.Context) error {
for {
if ctx.Err() != nil {
return ctx.Err()
}
// If shutdown signal is true, safely terminate the worker.
if w.shutDownSignal() {
// If the worker is active, update range channel to store the remaining work.
if w.status == active {
w.lister.ranges <- &listRange{startRange: w.startRange, endRange: w.endRange}
// Worker is now idle.
w.status = idle
}
break
}

// If a worker is idle, sleep for a while before checking the next update.
// Worker is active when it finds work in range channel.
if w.status == idle {
if len(w.lister.ranges) == 0 {
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Millisecond * time.Duration(sleepDurationWhenIdle))
continue
} else {
newRange := <-w.lister.ranges
<-w.idleChannel
w.updateWorker(newRange.startRange, newRange.endRange, active)
}
}
// Active worker to list next page of objects within the range.
nextPageResult, err := objectLister(ctx, nextPageOpts{
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
startRange: w.startRange,
endRange: w.endRange,
bucketHandle: w.lister.bucket,
query: w.lister.query,
skipDirectoryObjects: w.lister.skipDirectoryObjects,
generation: w.generation,
})
if err != nil {
return fmt.Errorf("listing next page for worker ID %v, err: %w", w.goroutineID, err)
}

// Append objects listed by objectLister to result.
w.result.mu.Lock()
w.result.objects = append(w.result.objects, nextPageResult.items...)
w.result.mu.Unlock()

// Listing completed for default page size for the given range.
// Update current worker start range to new range and generation
// of the last objects listed if versions is true.
w.startRange = nextPageResult.nextStartRange
w.generation = nextPageResult.generation

// If listing is complete for the range, make worker idle and continue.
if nextPageResult.doneListing {
w.status = idle
w.idleChannel <- 1
w.generation = int64(0)
continue
}
// If listing not complete and idle workers are available, split the range and give half of work to idle worker.
if len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil {
// Split range and upload half of work for idle worker.
splitPoint, err := w.rangesplitter.splitRange(w.startRange, w.endRange, 1)
if err != nil {
return fmt.Errorf("splitting range for worker ID:%v, err: %w", w.goroutineID, err)
}
// If split point is empty, skip splitting the work.
if len(splitPoint) < 1 {
continue
}
w.lister.ranges <- &listRange{startRange: splitPoint[0], endRange: w.endRange}

// Update current worker range.
w.endRange = splitPoint[0]
}
}
return nil
}

// newObjectListerOpts specifies options for instantiating the NewObjectLister.
type newObjectListerOpts struct {
// shutDownSignal returns true if all the workers are idle and the or number of objects listed is equal to page size.
func (w *worker) shutDownSignal() bool {
// If all the workers are idle and range channel is empty, no more objects to list.
if len(w.idleChannel) == w.lister.parallelism && len(w.lister.ranges) == 0 {
return true
}
// If number of objects listed is equal to the given batchSize, then shutdown.
// If batch size is not given i.e. 0, then list until all objects have been listed.
if w.lister.batchSize > 0 && len(w.result.objects) >= w.lister.batchSize {
return true
}
return false
akansha1812 marked this conversation as resolved.
Show resolved Hide resolved
}

// updateWorker updates the worker's start range, end range and status.
func (w *worker) updateWorker(startRange, endRange string, status workerStatus) {
w.startRange = startRange
w.endRange = endRange
w.status = status
w.generation = int64(0)
}

// nextPageOpts specifies options for next page of listing result .
type nextPageOpts struct {
// startRange is the start offset of the objects to be listed.
startRange string
// endRange is the end offset of the objects to be listed.
Expand All @@ -69,8 +235,8 @@ type newObjectListerOpts struct {
generation int64
}

// nextPageResult holds the next page of object names and indicates whether the
// lister has completed listing (no more objects to retrieve).
// nextPageResult holds the next page of object names, start of the next page
// and indicates whether the lister has completed listing (no more objects to retrieve).
type nextPageResult struct {
// items is the list of objects listed.
items []*storage.ObjectAttrs
Expand All @@ -82,6 +248,14 @@ type nextPageResult struct {
generation int64
}

// objectLister lists objects using the given lister options.
func objectLister(ctx context.Context, opts nextPageOpts) (*nextPageResult, error) {

// TODO: Implement objectLister.

return nil, nil
}

func addPrefix(name, prefix string) string {
if name != "" {
return prefix + name
Expand Down
Loading