Skip to content

Commit

Permalink
Improve parallel chunking when dealing with large sparse files (#111)
Browse files Browse the repository at this point in the history
* Add method to advance the chunker without producing chunks

* Optimize parallel chunking of sparse files with large sections of null bytes

* Fix bug that caused the last chunker to be ignored/inefficiency
  • Loading branch information
folbricht authored Aug 1, 2019
1 parent d91e287 commit d9ad4b1
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 74 deletions.
24 changes: 24 additions & 0 deletions chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"math/bits"
)

Expand Down Expand Up @@ -214,6 +215,29 @@ func (c *Chunker) split(i int, err error) (uint64, []byte, error) {
return start, b, err
}

// Advance n bytes without producing chunks. This can be used if the content of the next
// section in the file is known (i.e. it is known that there are a number of null chunks
// coming). This resets everything in the chunker and behaves as if the streams starts
// at (current position+n).
func (c *Chunker) Advance(n int) error {
// We might still have bytes in the buffer. These count towards the move forward.
// It's possible the advance stays within the buffer and doesn't impact the reader.
c.start += uint64(n)
if n <= len(c.buf) {
c.buf = c.buf[n:]
return nil
}
readerN := int64(n - len(c.buf))
c.buf = nil
rs, ok := c.r.(io.Seeker)
if ok {
_, err := rs.Seek(readerN, io.SeekCurrent)
return err
}
_, err := io.CopyN(ioutil.Discard, c.r, readerN)
return err
}

// Min returns the minimum chunk size
func (c *Chunker) Min() uint64 { return c.min }

Expand Down
88 changes: 88 additions & 0 deletions chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,68 @@ func TestChunkerBounds(t *testing.T) {
}
}

// Test to confirm advancing through the input without producing chunks works.
func TestChunkerAdvance(t *testing.T) {
// Build an input slice that is NullChunk + <dataA> + Nullchunk + <dataB>.
// Then skip over the data slices and we should be left with only Null chunks.
dataA := make([]byte, 128) // Short slice
for i := range dataA {
dataA[i] = 'a'
}

dataB := make([]byte, 12*ChunkSizeMaxDefault) // Long slice to ensure we read past the chunker-internal buffer
for i := range dataB {
dataB[i] = 'b'
}

nullChunk := NewNullChunk(ChunkSizeMaxDefault)

// Build the input slice consisting of Null+dataA+Null+dataB
input := join(nullChunk.Data, dataA, nullChunk.Data, dataB)

c, err := NewChunker(bytes.NewReader(input), ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault)
if err != nil {
t.Fatal(err)
}

// Chunk the first part, this should be a null chunk
_, buf, err := c.Next()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, nullChunk.Data) {
t.Fatal("expected null chunk")
}

// Now skip the dataA slice
if err := c.Advance(len(dataA)); err != nil {
t.Fatal(err)
}

// Read the 2nd null chunk
_, buf, err = c.Next()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, nullChunk.Data) {
t.Fatal("expected null chunk")
}

// Skip over dataB
if err := c.Advance(len(dataB)); err != nil {
t.Fatal(err)
}

// Should be at the end, nothing more to chunk
_, buf, err = c.Next()
if err != nil {
t.Fatal(err)
}
if len(buf) != 0 {
t.Fatal("expected end of input")
}
}

// Global vars used for results during the benchmark to prevent optimizer
// from optimizing away some operations
var (
Expand Down Expand Up @@ -215,3 +277,29 @@ func chunkFile(b *testing.B, name string) error {
}
return err
}

func benchmarkChunkNull(b *testing.B, size int) {
in := make([]byte, size)
for n := 0; n < b.N; n++ {
c, err := NewChunker(bytes.NewReader(in), ChunkSizeMinDefault, ChunkSizeAvgDefault, ChunkSizeMaxDefault)
if err != nil {
panic(err)
}
for {
start, buf, err := c.Next()
if err != nil {
panic(err)
}
if len(buf) == 0 {
break
}
chunkStart = start
chunkBuf = buf
}
}
}

func BenchmarkChunkNull1M(b *testing.B) { benchmarkChunkNull(b, 1024*1024) }
func BenchmarkChunkNull10M(b *testing.B) { benchmarkChunkNull(b, 10*1024*1024) }
func BenchmarkChunkNull50M(b *testing.B) { benchmarkChunkNull(b, 50*1024*1024) }
func BenchmarkChunkNull100M(b *testing.B) { benchmarkChunkNull(b, 100*1024*1024) }
87 changes: 74 additions & 13 deletions make.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func IndexFromFile(ctx context.Context,
defer pb.Finish()
}

// Null chunks is produced when a large section of null bytes is chunked. There are no
// split points in those sections so it's always of max chunk size. Used for optimizations
// when chunking files with large empty sections.
nullChunk := NewNullChunk(max)

// Create/initialize the workers
worker := make([]*pChunker, n)
for i := 0; i < n; i++ {
Expand All @@ -96,11 +101,12 @@ func IndexFromFile(ctx context.Context,
return index, stats, err
}
p := &pChunker{
chunker: c,
results: make(chan IndexChunk, mChunks),
done: make(chan struct{}),
offset: start,
stats: &stats,
chunker: c,
results: make(chan IndexChunk, mChunks),
done: make(chan struct{}),
offset: start,
stats: &stats,
nullChunk: nullChunk,
}
worker[i] = p
}
Expand Down Expand Up @@ -163,6 +169,9 @@ type pChunker struct {
eof bool
sync IndexChunk
stats *ChunkingStats

// Null chunk for optimizing chunking sparse files
nullChunk *NullChunk
}

func (c *pChunker) start(ctx context.Context) {
Expand Down Expand Up @@ -200,8 +209,24 @@ func (c *pChunker) start(ctx context.Context) {

// Check if the next worker already has this chunk, at which point we stop
// here and let the next continue
if c.next != nil && c.next.syncWith(chunk) {
return
if c.next != nil {
inSync, zeroes := c.next.syncWith(chunk)
if inSync {
return
}
numNullChunks := int(int(zeroes) / len(c.nullChunk.Data))
if numNullChunks > 0 {
if err := c.chunker.Advance(numNullChunks * len(c.nullChunk.Data)); err != nil {
c.err = err
return
}
nc := chunk
for i := 0; i < numNullChunks; i++ {
nc = IndexChunk{Start: nc.Start + nc.Size, Size: uint64(len(c.nullChunk.Data)), ID: c.nullChunk.ID}
c.results <- nc
zeroes -= uint64(len(c.nullChunk.Data))
}
}
}

// If the next worker has stopped and has no more chunks in its bucket,
Expand All @@ -225,24 +250,60 @@ func (c *pChunker) active() bool {
}
}

// Returns true if the given chunk lines up with one in the current bucket
func (c *pChunker) syncWith(chunk IndexChunk) bool {
// Returns true if the given chunk lines up with one in the current bucket. Also returns
// the number of zero bytes this chunker has found from 'chunk'. This helps the previous
// chunker to skip chunking over those areas and put a null-chunks (always max size) in
// place instead.
func (c *pChunker) syncWith(chunk IndexChunk) (bool, uint64) {
// Read from our bucket until we're past (or match) where the previous worker
// currently is
var prev IndexChunk
for chunk.Start > c.sync.Start {
prev = c.sync
var ok bool
select {
case c.sync, ok = <-c.results:
if !ok {
return false
return false, 0
}
default: // Nothing in my bucket? Move on
return false
return false, 0
}
}
// Did we find a match with the previous worker. If so the previous worker

// Did we find a match with the previous worker? If so, the previous worker
// should stop and this one will keep going
return chunk.Start == c.sync.Start && chunk.Size == c.sync.Size
if chunk.Start == c.sync.Start && chunk.Size == c.sync.Size {
return true, 0
}

// The previous chunker didn't sync up with this one, but perhaps we're in a large area
// of nulls (chunk split points are unlikely to line up). If so we can tell the previous
// chunker how many nulls are coming so it doesn't need to do all the work again and can
// skip ahead, producing null-chunks of max size.
var n uint64
if c.sync.ID == c.nullChunk.ID && prev.ID == c.nullChunk.ID {
// We know there're at least some null chunks in front of the previous chunker. Let's
// see if there are more in our bucket so we can tell the previous chunker how far to
// skip ahead.
n = prev.Start + prev.Size - chunk.Start
for {
var ok bool
select {
case c.sync, ok = <-c.results:
if !ok {
return false, n
}
default: // Nothing more in my bucket? Move on
return false, n
}
if c.sync.ID != c.nullChunk.ID { // Hit the end of the null chunks, stop here
break
}
n += uint64(len(c.nullChunk.Data))
}
}
return false, n
}

// ChunkingStats is used to report statistics of a parallel chunking operation.
Expand Down
Loading

0 comments on commit d9ad4b1

Please sign in to comment.