diff --git a/go/cli/mcap/go.mod b/go/cli/mcap/go.mod index 2d95ce2d49..27c6d66d06 100644 --- a/go/cli/mcap/go.mod +++ b/go/cli/mcap/go.mod @@ -7,7 +7,7 @@ require ( github.com/fatih/color v1.13.0 github.com/foxglove/mcap/go/mcap v0.4.0 github.com/foxglove/mcap/go/ros v0.0.0-20230114025807-456e6a6ca1be - github.com/klauspost/compress v1.15.15 + github.com/klauspost/compress v1.16.7 github.com/mattn/go-sqlite3 v1.14.14 github.com/olekukonko/tablewriter v0.0.5 github.com/pierrec/lz4/v4 v4.1.17 diff --git a/go/cli/mcap/go.sum b/go/cli/mcap/go.sum index cb3400f3fc..5508fe81ea 100644 --- a/go/cli/mcap/go.sum +++ b/go/cli/mcap/go.sum @@ -309,6 +309,8 @@ github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kE github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= diff --git a/go/mcap/indexed_message_iterator.go b/go/mcap/indexed_message_iterator.go index 86cabb9188..ef7ba4b6fa 100644 --- a/go/mcap/indexed_message_iterator.go +++ b/go/mcap/indexed_message_iterator.go @@ -11,6 +11,10 @@ import ( "github.com/pierrec/lz4/v4" ) +const ( + chunkBufferGrowthMultiple = 1.2 +) + // indexedMessageIterator is an iterator over an indexed mcap read seeker (as // seeking is required). It makes reads in alternation from the index data // section, the message index at the end of a chunk, and the chunk's contents. @@ -34,6 +38,8 @@ type indexedMessageIterator struct { zstdDecoder *zstd.Decoder lz4Reader *lz4.Reader hasReadSummarySection bool + + compressedChunkAndMessageIndex []byte } // parseIndexSection parses the index section of the file and populates the @@ -147,12 +153,17 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error { if err != nil { return err } - chunk := make([]byte, chunkIndex.ChunkLength+chunkIndex.MessageIndexLength) - _, err = io.ReadFull(it.rs, chunk) + + compressedChunkLength := chunkIndex.ChunkLength + chunkIndex.MessageIndexLength + if len(it.compressedChunkAndMessageIndex) < int(compressedChunkLength) { + newSize := int(float64(compressedChunkLength) * chunkBufferGrowthMultiple) + it.compressedChunkAndMessageIndex = make([]byte, newSize) + } + _, err = io.ReadFull(it.rs, it.compressedChunkAndMessageIndex[:compressedChunkLength]) if err != nil { return fmt.Errorf("failed to read chunk data: %w", err) } - parsedChunk, err := ParseChunk(chunk[9:chunkIndex.ChunkLength]) + parsedChunk, err := ParseChunk(it.compressedChunkAndMessageIndex[9:chunkIndex.ChunkLength]) if err != nil { return fmt.Errorf("failed to parse chunk: %w", err) } @@ -162,18 +173,16 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error { case CompressionNone: chunkData = parsedChunk.Records case CompressionZSTD: - var err error if it.zstdDecoder == nil { - it.zstdDecoder, err = zstd.NewReader(bytes.NewReader(parsedChunk.Records)) - } else { - err = it.zstdDecoder.Reset(bytes.NewReader(parsedChunk.Records)) - } - if err != nil { - return fmt.Errorf("failed to read zstd chunk: %w", err) + it.zstdDecoder, err = zstd.NewReader(nil) + if err != nil { + return fmt.Errorf("failed to instantiate zstd decoder: %w", err) + } } - chunkData, err = io.ReadAll(it.zstdDecoder) + chunkData = make([]byte, 0, parsedChunk.UncompressedSize) + chunkData, err = it.zstdDecoder.DecodeAll(parsedChunk.Records, chunkData) if err != nil { - return fmt.Errorf("failed to decompress zstd chunk: %w", err) + return fmt.Errorf("failed to decode chunk data: %w", err) } case CompressionLZ4: if it.lz4Reader == nil { @@ -181,7 +190,8 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error { } else { it.lz4Reader.Reset(bytes.NewReader(parsedChunk.Records)) } - chunkData, err = io.ReadAll(it.lz4Reader) + chunkData = make([]byte, parsedChunk.UncompressedSize) + _, err = io.ReadFull(it.lz4Reader, chunkData) if err != nil { return fmt.Errorf("failed to decompress lz4 chunk: %w", err) } @@ -189,7 +199,7 @@ func (it *indexedMessageIterator) loadChunk(chunkIndex *ChunkIndex) error { return fmt.Errorf("unsupported compression %s", parsedChunk.Compression) } // use the message index to find the messages we want from the chunk - messageIndexSection := chunk[chunkIndex.ChunkLength:] + messageIndexSection := it.compressedChunkAndMessageIndex[chunkIndex.ChunkLength:compressedChunkLength] var recordLen uint64 offset := 0 for offset < len(messageIndexSection) {