diff --git a/CHANGELOG.md b/CHANGELOG.md index c4384ee48..f9b30b91c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ **Bug Fixes** - Flush shall only sync the blocks to storage and not delete them from local cache. - Random write has been re-enabled in block cache. -- Writing to an uncommitted block which has been deleted from the in-memory cache. +- Reading or writing to an uncommitted block which has been deleted from the in-memory cache. - Check download status of a block before updating and return error if it failed to download. ## 2.3.1 (Unreleased) diff --git a/azure-pipeline-templates/verbose-tests.yml b/azure-pipeline-templates/verbose-tests.yml index 947686c32..901311e5f 100644 --- a/azure-pipeline-templates/verbose-tests.yml +++ b/azure-pipeline-templates/verbose-tests.yml @@ -177,7 +177,7 @@ steps: distro_name: ${{ parameters.distro_name }} quick_test: false verbose_log: ${{ parameters.verbose_log }} - clone: true + clone: false - ${{ if eq(parameters.test_sas_credential, true) }}: - template: e2e-tests.yml @@ -385,4 +385,4 @@ steps: ${{ parameters.working_dir }}/blobfuse2 mount ${{ parameters.mount_dir }} --config-file=${{ parameters.config }} --default-working-dir=${{ parameters.working_dir }} displayName: 'HugeList: Mount' - continueOnError: false \ No newline at end of file + continueOnError: false diff --git a/cmd/health-monitor.go b/cmd/health-monitor.go index dca3c32bf..ce798e9a1 100644 --- a/cmd/health-monitor.go +++ b/cmd/health-monitor.go @@ -132,7 +132,7 @@ func validateHMonOptions() error { } if len(errMsg) != 0 { - return fmt.Errorf(errMsg) + return fmt.Errorf("%s", errMsg) } return nil diff --git a/cmd/mount.go b/cmd/mount.go index 22dbbb6ab..56326427c 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -754,5 +754,5 @@ func init() { func Destroy(message string) error { _ = log.Destroy() - return fmt.Errorf(message) + return fmt.Errorf("%s", message) } diff --git a/component/azstorage/azauthmsi.go b/component/azstorage/azauthmsi.go index 64b403043..ee56da6f4 100644 --- a/component/azstorage/azauthmsi.go +++ b/component/azstorage/azauthmsi.go @@ -97,7 +97,7 @@ func (azmsi *azAuthMSI) getTokenCredentialUsingCLI() (azcore.TokenCredential, er if msg == "" { msg = err.Error() } - return nil, fmt.Errorf(msg) + return nil, fmt.Errorf("%s", msg) } log.Info("azAuthMSI::getTokenCredentialUsingCLI : Successfully logged in using Azure CLI") diff --git a/component/block_cache/block.go b/component/block_cache/block.go index 40b600305..415f3edb4 100644 --- a/component/block_cache/block.go +++ b/component/block_cache/block.go @@ -51,6 +51,14 @@ const ( BlockFlagFailed // Block upload/download has failed ) +// Flags to denote the status of upload/download of a block +const ( + BlockStatusDownloaded int = iota + 1 // Download of this block is complete + BlockStatusUploaded // Upload of this block is complete + BlockStatusDownloadFailed // Download of this block has failed + BlockStatusUploadFailed // Upload of this block has failed +) + // Block is a memory mapped buffer with its state to hold data type Block struct { offset uint64 // Start offset of the data this block holds @@ -127,9 +135,9 @@ func (b *Block) Uploading() { } // Ready marks this Block is now ready for reading by its first reader (data download completed) -func (b *Block) Ready() { +func (b *Block) Ready(val int) { select { - case b.state <- 1: + case b.state <- val: break default: break diff --git a/component/block_cache/block_cache.go b/component/block_cache/block_cache.go index def3a9988..6b2301616 100644 --- a/component/block_cache/block_cache.go +++ b/component/block_cache/block_cache.go @@ -335,12 +335,12 @@ func (bc *BlockCache) OpenFile(options internal.OpenFileOptions) (*handlemap.Han bc.prepareHandleForBlockCache(handle) - if options.Flags&os.O_TRUNC != 0 || options.Flags&os.O_WRONLY != 0 { + if options.Flags&os.O_TRUNC != 0 || (options.Flags&os.O_WRONLY != 0 && options.Flags&os.O_APPEND == 0) { // If file is opened in truncate or wronly mode then we need to wipe out the data consider current file size as 0 log.Debug("BlockCache::OpenFile : Truncate %v to 0", options.Name) handle.Size = 0 handle.Flags.Set(handlemap.HandleFlagDirty) - } else if options.Flags&os.O_RDWR != 0 && handle.Size != 0 { + } else if handle.Size != 0 && (options.Flags&os.O_RDWR != 0 || options.Flags&os.O_APPEND != 0) { // File is not opened in read-only mode so we need to get the list of blocks and validate the size // As there can be a potential write on this file, currently configured block size and block size of the file in container // has to match otherwise it will corrupt the file. Fail the open call if this is not the case. @@ -537,6 +537,22 @@ func (bc *BlockCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, e return dataRead, nil } +func (bc *BlockCache) addToCooked(handle *handlemap.Handle, block *Block) { + if block.node != nil { + _ = handle.Buffers.Cooking.Remove(block.node) + _ = handle.Buffers.Cooked.Remove(block.node) + } + block.node = handle.Buffers.Cooked.PushBack(block) +} + +func (bc *BlockCache) addToCooking(handle *handlemap.Handle, block *Block) { + if block.node != nil { + _ = handle.Buffers.Cooked.Remove(block.node) + _ = handle.Buffers.Cooking.Remove(block.node) + } + block.node = handle.Buffers.Cooking.PushBack(block) +} + // getBlock: From offset generate the Block index and get the Block corresponding to it /* Base logic of getBlock: Check if the given block is already available or not @@ -563,6 +579,20 @@ func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Bl index := bc.getBlockIndex(readoffset) node, found := handle.GetValue(fmt.Sprintf("%v", index)) if !found { + + // block is not present in the buffer list, check if it is uncommitted + // If yes, commit all the uncommitted blocks first and then download this block + shouldCommit, _ := shouldCommitAndDownload(int64(index), handle) + if shouldCommit { + // commit all the uncommitted blocks to storage + log.Debug("BlockCache::getBlock : Downloading an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path) + err := bc.commitBlocks(handle) + if err != nil { + log.Err("BlockCache::getBlock : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error()) + return nil, err + } + } + // If this is the first read request then prefetch all required nodes val, _ := handle.GetValue("#") if !bc.noPrefetch && val.(uint64) == 0 { @@ -600,40 +630,52 @@ func (bc *BlockCache) getBlock(handle *handlemap.Handle, readoffset uint64) (*Bl block := node.(*Block) // Wait for this block to complete the download - t := int(0) - t = <-block.state + t, ok := <-block.state + if ok { + // this block is now open to read and process + block.Unblock() - if t == 1 { - block.flags.Clear(BlockFlagDownloading) + switch t { + case BlockStatusDownloaded: + log.Debug("BlockCache::getBlock : Downloaded block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) - if block.IsFailed() { - log.Err("BlockCache::getBlock : Failed to download block %v=>%s (offset %v, index %v)", handle.ID, handle.Path, readoffset, index) + block.flags.Clear(BlockFlagDownloading) - // Remove this node from handle so that next read retries to download the block again - bc.releaseFailedBlock(handle, block) - return nil, fmt.Errorf("failed to download block") - } - - // Download complete and you are first reader of this block - if handle.OptCnt <= MIN_RANDREAD { - // So far this file has been read sequentially so prefetch more - val, _ := handle.GetValue("#") - if int64(val.(uint64)*bc.blockSize) < handle.Size { - _ = bc.startPrefetch(handle, val.(uint64), true) + // Download complete and you are first reader of this block + if !bc.noPrefetch && handle.OptCnt <= MIN_RANDREAD { + // So far this file has been read sequentially so prefetch more + val, _ := handle.GetValue("#") + if int64(val.(uint64)*bc.blockSize) < handle.Size { + _ = bc.startPrefetch(handle, val.(uint64), true) + } } - } - // This block was moved to in-process queue as download is complete lets move it back to normal queue - _ = handle.Buffers.Cooking.Remove(block.node) - block.node = handle.Buffers.Cooked.PushBack(block) + // This block was moved to in-process queue as download is complete lets move it back to normal queue + bc.addToCooked(handle, block) - // mark this block as synced so that if it can used for write later - // which will move it back to cooking list as per the synced flag - block.flags.Set(BlockFlagSynced) + // mark this block as synced so that if it can used for write later + // which will move it back to cooking list as per the synced flag + block.flags.Set(BlockFlagSynced) - // Mark this block is now open for everyone to read and process - // Once unblocked and moved to original queue, any instance can delete this block to reuse as well - block.Unblock() + case BlockStatusUploaded: + log.Debug("BlockCache::getBlock : Staged block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) + block.flags.Clear(BlockFlagUploading) + + case BlockStatusDownloadFailed: + log.Err("BlockCache::getBlock : Failed to download block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) + + // Remove this node from handle so that next read retries to download the block again + bc.releaseDownloadFailedBlock(handle, block) + return nil, fmt.Errorf("failed to download block") + + case BlockStatusUploadFailed: + // Local data is still valid so continue using this buffer + log.Err("BlockCache::getBlock : Failed to upload block %v for %v=>%s (read offset %v)", index, handle.ID, handle.Path, readoffset) + block.flags.Clear(BlockFlagUploading) + + // Move this block to end of queue as this is still modified and un-staged + bc.addToCooking(handle, block) + } } return block, nil @@ -725,10 +767,24 @@ func (bc *BlockCache) startPrefetch(handle *handlemap.Handle, index uint64, pref } for i := uint32(0); i < cnt; i++ { - // Revalidate this node does not exists in the block map + // Check if the block exists in the local cache or not + // If not, download the block from storage _, found := handle.GetValue(fmt.Sprintf("%v", index)) if !found { - // Block not found so lets push it for download + // Check if the block is an uncommitted block or not + // For uncommitted block we need to commit the block first + shouldCommit, _ := shouldCommitAndDownload(int64(index), handle) + if shouldCommit { + // This shall happen only for the first uncommitted block and shall flush all the uncommitted blocks to storage + log.Debug("BlockCache::startPrefetch : Fetching an uncommitted block %v, so committing all the staged blocks for %v=>%s", index, handle.ID, handle.Path) + err := bc.commitBlocks(handle) + if err != nil { + log.Err("BlockCache::startPrefetch : Failed to commit blocks for %v=>%s [%s]", handle.ID, handle.Path, err.Error()) + return err + } + } + + // push the block for download err := bc.refreshBlock(handle, index, prefetch || i > 0) if err != nil { return err @@ -770,6 +826,16 @@ func (bc *BlockCache) refreshBlock(handle *handlemap.Handle, index uint64, prefe block := node.Value.(*Block) if block.id != -1 { + // If the block is being staged, then wait till it is uploaded + // and then use it for read + if block.flags.IsSet(BlockFlagUploading) { + log.Debug("BlockCache::refreshBlock : Waiting for the block %v to upload before using it for block %v read for %v=>%s", block.id, index, handle.ID, handle.Path) + _, ok := <-block.state + if ok { + block.Unblock() + } + } + // This is a reuse of a block case so we need to remove old entry from the map handle.RemoveValue(fmt.Sprintf("%v", block.id)) } @@ -783,14 +849,14 @@ func (bc *BlockCache) refreshBlock(handle *handlemap.Handle, index uint64, prefe handle.SetValue(fmt.Sprintf("%v", index), block) handle.SetValue("#", (index + 1)) - bc.lineupDownload(handle, block, prefetch, true) + bc.lineupDownload(handle, block, prefetch) } return nil } // lineupDownload : Create a work item and schedule the download -func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool, pushFront bool) { +func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, prefetch bool) { item := &workItem{ handle: handle, block: block, @@ -800,16 +866,8 @@ func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, pre } // Remove this block from free block list and add to in-process list - if block.node != nil { - _ = handle.Buffers.Cooked.Remove(block.node) - } + bc.addToCooking(handle, block) - if pushFront { - block.node = handle.Buffers.Cooking.PushFront(block) - } else { - // push back to cooking list in case of write scenario where a block is downloaded before it is updated - block.node = handle.Buffers.Cooking.PushBack(block) - } block.flags.Set(BlockFlagDownloading) // Send the work item to worker pool to schedule download @@ -864,7 +922,7 @@ func (bc *BlockCache) download(item *workItem) { // Just mark the block that download is complete item.block.endIndex = item.block.offset + uint64(n) - item.block.Ready() + item.block.Ready(BlockStatusDownloaded) return } } @@ -882,7 +940,7 @@ func (bc *BlockCache) download(item *workItem) { // If we failed to read the data 3 times then just give up log.Err("BlockCache::download : 3 attempts to download a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset) item.block.Failed() - item.block.Ready() + item.block.Ready(BlockStatusDownloadFailed) return } @@ -924,7 +982,7 @@ func (bc *BlockCache) download(item *workItem) { } // Just mark the block that download is complete - item.block.Ready() + item.block.Ready(BlockStatusDownloaded) } // WriteFile: Write to the local file @@ -1023,7 +1081,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) if shouldDownload || shouldCommit { // We are writing somewhere in between so just fetch this block log.Debug("BlockCache::getOrCreateBlock : Downloading block %v for %v=>%v", block.id, handle.ID, handle.Path) - bc.lineupDownload(handle, block, false, false) + bc.lineupDownload(handle, block, false) // Now wait for download to complete <-block.state @@ -1033,7 +1091,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path) // Remove this node from handle so that next read retries to download the block again - bc.releaseFailedBlock(handle, block) + bc.releaseDownloadFailedBlock(handle, block) return nil, fmt.Errorf("failed to download block") } } else { @@ -1063,11 +1121,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) // If the block was staged earlier then we are overwriting it here so move it back to cooking queue if block.flags.IsSet(BlockFlagSynced) { log.Debug("BlockCache::getOrCreateBlock : Overwriting back to staged block %v for %v=>%s", block.id, handle.ID, handle.Path) - if block.node != nil { - _ = handle.Buffers.Cooked.Remove(block.node) - } - block.node = handle.Buffers.Cooking.PushBack(block) } else if block.flags.IsSet(BlockFlagDownloading) { log.Debug("BlockCache::getOrCreateBlock : Waiting for download to finish for committed block %v for %v=>%s", block.id, handle.ID, handle.Path) <-block.state @@ -1078,7 +1132,7 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) log.Err("BlockCache::getOrCreateBlock : Failed to download block %v for %v=>%s", block.id, handle.ID, handle.Path) // Remove this node from handle so that next read retries to download the block again - bc.releaseFailedBlock(handle, block) + bc.releaseDownloadFailedBlock(handle, block) return nil, fmt.Errorf("failed to download block") } } else if block.flags.IsSet(BlockFlagUploading) { @@ -1087,13 +1141,10 @@ func (bc *BlockCache) getOrCreateBlock(handle *handlemap.Handle, offset uint64) log.Debug("BlockCache::getOrCreateBlock : Waiting for the block %v to upload for %v=>%s", block.id, handle.ID, handle.Path) <-block.state block.Unblock() - - if block.node != nil { - _ = handle.Buffers.Cooked.Remove(block.node) - } - block.node = handle.Buffers.Cooking.PushBack(block) } + bc.addToCooking(handle, block) + block.flags.Clear(BlockFlagUploading) block.flags.Clear(BlockFlagDownloading) block.flags.Clear(BlockFlagSynced) @@ -1128,9 +1179,14 @@ func (bc *BlockCache) stageBlocks(handle *handlemap.Handle, cnt int) error { } // remove the block which failed to download so that it can be used again -func (bc *BlockCache) releaseFailedBlock(handle *handlemap.Handle, block *Block) { - _ = handle.Buffers.Cooking.Remove(block.node) +func (bc *BlockCache) releaseDownloadFailedBlock(handle *handlemap.Handle, block *Block) { + if block.node != nil { + _ = handle.Buffers.Cooking.Remove(block.node) + _ = handle.Buffers.Cooked.Remove(block.node) + } + handle.RemoveValue(fmt.Sprintf("%v", block.id)) + block.node = nil block.ReUse() bc.blockPool.Release(block) } @@ -1217,15 +1273,12 @@ func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listM // log.Debug("BlockCache::lineupUpload : Upload block %v=>%s (index %v, offset %v, data %v)", handle.ID, handle.Path, block.id, block.offset, (block.endIndex - block.offset)) - // Remove this block from free block list and add to in-process list - if block.node != nil { - _ = handle.Buffers.Cooking.Remove(block.node) - } - block.Uploading() block.flags.Clear(BlockFlagFailed) block.flags.Set(BlockFlagUploading) - block.node = handle.Buffers.Cooked.PushBack(block) + + // Remove this block from free block list and add to in-process list + bc.addToCooked(handle, block) // Send the work item to worker pool to schedule download bc.threadPool.Schedule(false, item) @@ -1249,7 +1302,9 @@ func (bc *BlockCache) waitAndFreeUploadedBlocks(handle *handlemap.Handle, cnt in if block.id != -1 { // Wait for upload of this block to complete _, ok := <-block.state + block.flags.Clear(BlockFlagDownloading) block.flags.Clear(BlockFlagUploading) + if ok { block.Unblock() } @@ -1259,8 +1314,7 @@ func (bc *BlockCache) waitAndFreeUploadedBlocks(handle *handlemap.Handle, cnt in if block.IsFailed() { log.Err("BlockCache::waitAndFreeUploadedBlocks : Failed to upload block, posting back to cooking list %v=>%s (index %v, offset %v)", handle.ID, handle.Path, block.id, block.offset) - _ = handle.Buffers.Cooked.Remove(block.node) - block.node = handle.Buffers.Cooking.PushFront(block) + bc.addToCooking(handle, block) continue } cnt-- @@ -1301,7 +1355,7 @@ func (bc *BlockCache) upload(item *workItem) { // If we failed to write the data 3 times then just give up log.Err("BlockCache::upload : 3 attempts to upload a block have failed %v=>%s (index %v, offset %v)", item.handle.ID, item.handle.Path, item.block.id, item.block.offset) item.block.Failed() - item.block.Ready() + item.block.Ready(BlockStatusUploadFailed) return } @@ -1342,7 +1396,7 @@ func (bc *BlockCache) upload(item *workItem) { return_safe: item.block.flags.Set(BlockFlagSynced) item.block.NoMoreDirty() - item.block.Ready() + item.block.Ready(BlockStatusUploaded) } // Stage the given number of blocks from this handle diff --git a/component/block_cache/block_cache_test.go b/component/block_cache/block_cache_test.go index 7aa2caffa..864deef17 100644 --- a/component/block_cache/block_cache_test.go +++ b/component/block_cache/block_cache_test.go @@ -2322,6 +2322,281 @@ func (suite *blockCacheTestSuite) TestBlockDownloadFailed() { suite.assert.Equal(fs.Size(), int64(0)) } +func (suite *blockCacheTestSuite) TestReadStagedBlock() { + cfg := "block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: 12\n parallelism: 10" + tobj, err := setupPipeline(cfg) + defer tobj.cleanupPipeline() + + suite.assert.Nil(err) + suite.assert.NotNil(tobj.blockCache) + + path := getTestFileName(suite.T().Name()) + storagePath := filepath.Join(tobj.fake_storage_path, path) + + // write using block cache + options := internal.CreateFileOptions{Name: path, Mode: 0777} + h, err := tobj.blockCache.CreateFile(options) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(0)) + suite.assert.False(h.Dirty()) + + // write 4MB at offset 0 + n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: dataBuff[:4*_1MB]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(4*_1MB)) + suite.assert.True(h.Dirty()) + suite.assert.Equal(3, h.Buffers.Cooking.Len()) + suite.assert.Equal(1, h.Buffers.Cooked.Len()) + + data := make([]byte, _1MB) + n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + suite.assert.Nil(h.Buffers.Cooking) + suite.assert.Nil(h.Buffers.Cooked) + + fs, err := os.Stat(storagePath) + suite.assert.Nil(err) + suite.assert.Equal(fs.Size(), int64(4*_1MB)) +} + +func (suite *blockCacheTestSuite) TestReadUncommittedBlockValidation() { + prefetch := 12 + cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: %v\n parallelism: 10", prefetch) + tobj, err := setupPipeline(cfg) + defer tobj.cleanupPipeline() + + suite.assert.Nil(err) + suite.assert.NotNil(tobj.blockCache) + + path := getTestFileName(suite.T().Name()) + storagePath := filepath.Join(tobj.fake_storage_path, path) + localPath := filepath.Join(tobj.disk_cache_path, path) + + // ------------------------------------------------------------------ + // write to local file + fh, err := os.Create(localPath) + suite.assert.Nil(err) + + defer func(fh *os.File) { + err := fh.Close() + suite.assert.Nil(err) + }(fh) + + // write 62MB data + ind := uint64(0) + for i := 0; i < prefetch+50; i++ { + n, err := fh.WriteAt(dataBuff[ind*_1MB:(ind+1)*_1MB], int64(i*int(_1MB))) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + ind = (ind + 1) % 5 + } + + l, err := computeMD5(fh) + suite.assert.Nil(err) + + // ------------------------------------------------------------------ + // write using block cache + options := internal.CreateFileOptions{Name: path, Mode: 0777} + h, err := tobj.blockCache.CreateFile(options) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(0)) + suite.assert.False(h.Dirty()) + + ind = 0 + for i := 0; i < prefetch+50; i++ { + n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(i * int(_1MB)), Data: dataBuff[ind*_1MB : (ind+1)*_1MB]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + suite.assert.True(h.Dirty()) + ind = (ind + 1) % 5 + } + + suite.assert.Equal(h.Buffers.Cooking.Len()+h.Buffers.Cooked.Len(), prefetch) + + // read blocks 0, 1 and 2 which are uncommitted + data := make([]byte, 2*_1MB) + n, err := tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 512, Data: data}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(2*_1MB)) + suite.assert.Equal(data[:], dataBuff[512:2*_1MB+512]) + suite.assert.False(h.Dirty()) + + // read block 4 which has been committed by the previous read + data = make([]byte, _1MB) + n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(4 * _1MB), Data: data}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + suite.assert.Equal(data[:], dataBuff[4*_1MB:5*_1MB]) + suite.assert.False(h.Dirty()) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + + fs, err := os.Stat(storagePath) + suite.assert.Nil(err) + suite.assert.Equal(fs.Size(), int64(62*_1MB)) + + rfh, err := os.Open(storagePath) + suite.assert.Nil(err) + + defer func(fh *os.File) { + err := fh.Close() + suite.assert.Nil(err) + }(rfh) + + r, err := computeMD5(rfh) + suite.assert.Nil(err) + + // validate md5sum + suite.assert.Equal(l, r) +} + +func (suite *blockCacheTestSuite) TestReadUncommittedPrefetchedBlock() { + prefetch := 12 + cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: %v\n parallelism: 10", prefetch) + tobj, err := setupPipeline(cfg) + defer tobj.cleanupPipeline() + + suite.assert.Nil(err) + suite.assert.NotNil(tobj.blockCache) + + path := getTestFileName(suite.T().Name()) + storagePath := filepath.Join(tobj.fake_storage_path, path) + + // write using block cache + options := internal.CreateFileOptions{Name: path, Mode: 0777} + h, err := tobj.blockCache.CreateFile(options) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(0)) + suite.assert.False(h.Dirty()) + + n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: dataBuff[:_1MB]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + suite.assert.True(h.Dirty()) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + suite.assert.False(h.Dirty()) + + h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR}) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(_1MB)) + suite.assert.False(h.Dirty()) + + ind := uint64(1) + for i := 1; i < prefetch+50; i++ { + n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(i * int(_1MB)), Data: dataBuff[ind*_1MB : (ind+1)*_1MB]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + suite.assert.True(h.Dirty()) + ind = (ind + 1) % 5 + } + + suite.assert.Equal(h.Buffers.Cooking.Len()+h.Buffers.Cooked.Len(), prefetch) + + // read blocks 0, 1 and 2 where prefetched blocks 1 and 2 are uncommmitted + data := make([]byte, 2*_1MB) + n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 512, Data: data}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(2*_1MB)) + suite.assert.Equal(data[:], dataBuff[512:2*_1MB+512]) + suite.assert.False(h.Dirty()) + + // read block 4 which has been committed by the previous read + data = make([]byte, _1MB) + n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(4 * _1MB), Data: data}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + suite.assert.Equal(data[:], dataBuff[4*_1MB:5*_1MB]) + suite.assert.False(h.Dirty()) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + + fs, err := os.Stat(storagePath) + suite.assert.Nil(err) + suite.assert.Equal(fs.Size(), int64(62*_1MB)) +} + +func (suite *blockCacheTestSuite) TestReadWriteBlockInParallel() { + prefetch := 12 + cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 20\n prefetch: %v\n parallelism: 1", prefetch) + tobj, err := setupPipeline(cfg) + defer tobj.cleanupPipeline() + + suite.assert.Nil(err) + suite.assert.NotNil(tobj.blockCache) + + path := getTestFileName(suite.T().Name()) + storagePath := filepath.Join(tobj.fake_storage_path, path) + + // write using block cache + options := internal.CreateFileOptions{Name: path, Mode: 0777} + h, err := tobj.blockCache.CreateFile(options) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(0)) + suite.assert.False(h.Dirty()) + + n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: dataBuff[:]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(5*_1MB)) + suite.assert.True(h.Dirty()) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + suite.assert.False(h.Dirty()) + + h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR}) + suite.assert.Nil(err) + suite.assert.NotNil(h) + suite.assert.Equal(h.Size, int64(5*_1MB)) + suite.assert.False(h.Dirty()) + + ind := uint64(0) + for i := 5; i < prefetch+50; i++ { + n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(i * int(_1MB)), Data: dataBuff[ind*_1MB : (ind+1)*_1MB]}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(_1MB)) + suite.assert.True(h.Dirty()) + ind = (ind + 1) % 5 + } + + suite.assert.Equal(h.Buffers.Cooking.Len()+h.Buffers.Cooked.Len(), prefetch) + + // read blocks 0, 1 and 2 + data := make([]byte, 2*_1MB) + n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 512, Data: data}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(2*_1MB)) + suite.assert.Equal(data[:], dataBuff[512:2*_1MB+512]) + suite.assert.True(h.Dirty()) + + // read blocks 4 and 5 + n, err = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: int64(4 * _1MB), Data: data}) + suite.assert.Nil(err) + suite.assert.Equal(n, int(2*_1MB)) + suite.assert.Equal(data[:_1MB], dataBuff[4*_1MB:]) + suite.assert.Equal(data[_1MB:], dataBuff[:_1MB]) + suite.assert.False(h.Dirty()) + + err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h}) + suite.assert.Nil(err) + + fs, err := os.Stat(storagePath) + suite.assert.Nil(err) + suite.assert.Equal(fs.Size(), int64(62*_1MB)) +} + // In order for 'go test' to run this suite, we need to create // a normal test function and pass our suite to suite.Run func TestBlockCacheTestSuite(t *testing.T) { diff --git a/component/block_cache/block_test.go b/component/block_cache/block_test.go index 8270118a8..28f905f9d 100644 --- a/component/block_cache/block_test.go +++ b/component/block_cache/block_test.go @@ -143,7 +143,7 @@ func (suite *blockTestSuite) TestReady() { b.ReUse() suite.assert.NotNil(b.state) - b.Ready() + b.Ready(BlockStatusDownloaded) suite.assert.Equal(len(b.state), 1) <-b.state @@ -167,7 +167,7 @@ func (suite *blockTestSuite) TestUnBlock() { suite.assert.NotNil(b.state) suite.assert.Nil(b.node) - b.Ready() + b.Ready(BlockStatusDownloaded) suite.assert.Equal(len(b.state), 1) <-b.state @@ -201,7 +201,7 @@ func (suite *blockTestSuite) TestWriter() { suite.assert.Equal(b.id, int64(-1)) suite.assert.False(b.IsDirty()) - b.Ready() + b.Ready(BlockStatusDownloaded) suite.assert.Equal(len(b.state), 1) <-b.state @@ -223,7 +223,7 @@ func (suite *blockTestSuite) TestWriter() { b.NoMoreDirty() suite.assert.False(b.IsDirty()) - b.Ready() + b.Ready(BlockStatusUploaded) suite.assert.Equal(len(b.state), 1) <-b.state