Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go: Allocate less in indexed message chunk loading #950

Merged
merged 5 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cli/mcap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/fatih/color v1.13.0
github.com/foxglove/mcap/go/mcap v0.4.0
github.com/foxglove/mcap/go/ros v0.0.0-20230114025807-456e6a6ca1be
github.com/klauspost/compress v1.15.15
github.com/klauspost/compress v1.16.7
github.com/mattn/go-sqlite3 v1.14.14
github.com/olekukonko/tablewriter v0.0.5
github.com/pierrec/lz4/v4 v4.1.17
Expand Down
2 changes: 2 additions & 0 deletions go/cli/mcap/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kE
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand Down
38 changes: 24 additions & 14 deletions go/mcap/indexed_message_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/pierrec/lz4/v4"
)

const (
chunkBufferGrowthMultiple = 1.2
)

// indexedMessageIterator is an iterator over an indexed mcap read seeker (as
// seeking is required). It makes reads in alternation from the index data
// section, the message index at the end of a chunk, and the chunk's contents.
Expand All @@ -34,6 +38,8 @@ type indexedMessageIterator struct {
zstdDecoder *zstd.Decoder
lz4Reader *lz4.Reader
hasReadSummarySection bool

compressedChunkAndMessageIndex []byte
}

// parseIndexSection parses the index section of the file and populates the
Expand Down Expand Up @@ -147,12 +153,17 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
if err != nil {
return err
}
chunk := make([]byte, chunkIndex.ChunkLength+chunkIndex.MessageIndexLength)
_, err = io.ReadFull(it.rs, chunk)

compressedChunkLength := chunkIndex.ChunkLength + chunkIndex.MessageIndexLength
if len(it.compressedChunkAndMessageIndex) < int(compressedChunkLength) {
newSize := int(float64(compressedChunkLength) * chunkBufferGrowthMultiple)
it.compressedChunkAndMessageIndex = make([]byte, newSize)
}
_, err = io.ReadFull(it.rs, it.compressedChunkAndMessageIndex[:compressedChunkLength])
if err != nil {
return fmt.Errorf("failed to read chunk data: %w", err)
}
parsedChunk, err := ParseChunk(chunk[9:chunkIndex.ChunkLength])
parsedChunk, err := ParseChunk(it.compressedChunkAndMessageIndex[9:chunkIndex.ChunkLength])
if err != nil {
return fmt.Errorf("failed to parse chunk: %w", err)
}
Expand All @@ -162,34 +173,33 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error {
case CompressionNone:
chunkData = parsedChunk.Records
case CompressionZSTD:
var err error
if it.zstdDecoder == nil {
it.zstdDecoder, err = zstd.NewReader(bytes.NewReader(parsedChunk.Records))
} else {
err = it.zstdDecoder.Reset(bytes.NewReader(parsedChunk.Records))
}
if err != nil {
return fmt.Errorf("failed to read zstd chunk: %w", err)
it.zstdDecoder, err = zstd.NewReader(nil)
if err != nil {
return fmt.Errorf("failed to instantiate zstd decoder: %w", err)
}
}
chunkData, err = io.ReadAll(it.zstdDecoder)
chunkData = make([]byte, 0, parsedChunk.UncompressedSize)
chunkData, err = it.zstdDecoder.DecodeAll(parsedChunk.Records, chunkData)
if err != nil {
return fmt.Errorf("failed to decompress zstd chunk: %w", err)
return fmt.Errorf("failed to decode chunk data: %w", err)
}
case CompressionLZ4:
if it.lz4Reader == nil {
it.lz4Reader = lz4.NewReader(bytes.NewReader(parsedChunk.Records))
} else {
it.lz4Reader.Reset(bytes.NewReader(parsedChunk.Records))
}
chunkData, err = io.ReadAll(it.lz4Reader)
chunkData = make([]byte, parsedChunk.UncompressedSize)
_, err = io.ReadFull(it.lz4Reader, chunkData)
if err != nil {
return fmt.Errorf("failed to decompress lz4 chunk: %w", err)
}
default:
return fmt.Errorf("unsupported compression %s", parsedChunk.Compression)
}
// use the message index to find the messages we want from the chunk
messageIndexSection := chunk[chunkIndex.ChunkLength:]
messageIndexSection := it.compressedChunkAndMessageIndex[chunkIndex.ChunkLength:compressedChunkLength]
var recordLen uint64
offset := 0
for offset < len(messageIndexSection) {
Expand Down
Loading