From 2bacb0eb48a81b4075805f5d01ab92026b57f56c Mon Sep 17 00:00:00 2001 From: jules01 Date: Mon, 18 Sep 2023 22:32:48 +0300 Subject: [PATCH] - probable fix --- leveldb/leveldbSerial.go | 67 +++++++++++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 14 deletions(-) diff --git a/leveldb/leveldbSerial.go b/leveldb/leveldbSerial.go index fbb8c5e6..071cf0f2 100644 --- a/leveldb/leveldbSerial.go +++ b/leveldb/leveldbSerial.go @@ -10,6 +10,7 @@ import ( "time" "github.com/multiversx/mx-chain-core-go/core" + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-core-go/core/closing" "github.com/multiversx/mx-chain-storage-go/common" "github.com/multiversx/mx-chain-storage-go/types" @@ -25,11 +26,14 @@ type SerialDB struct { maxBatchSize int batchDelaySeconds int sizeBatch int - batch types.Batcher - mutBatch sync.RWMutex - dbAccess chan serialQueryer - cancel context.CancelFunc - closer core.SafeCloser + + accessBatch types.Batcher + writingBatch types.Batcher + mutBatch sync.RWMutex + + dbAccess chan serialQueryer + cancel context.CancelFunc + closer core.SafeCloser } // NewSerialDB is a constructor for the leveldb persister @@ -80,7 +84,7 @@ func NewSerialDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFi closer: closing.NewSafeChanCloser(), } - dbStore.batch = NewBatch() + dbStore.accessBatch = NewBatch() go dbStore.batchTimeoutHandle(ctx) go dbStore.processLoop(ctx) @@ -142,7 +146,7 @@ func (s *SerialDB) Put(key, val []byte) error { } s.mutBatch.RLock() - err := s.batch.Put(key, val) + err := s.accessBatch.Put(key, val) s.mutBatch.RUnlock() if err != nil { return err @@ -158,12 +162,12 @@ func (s *SerialDB) Get(key []byte) ([]byte, error) { } s.mutBatch.RLock() - if s.batch.IsRemoved(key) { + if s.isRemoved(key) { s.mutBatch.RUnlock() return nil, common.ErrKeyNotFound } - data := s.batch.Get(key) + data := s.getFromBatches(key) s.mutBatch.RUnlock() if data != nil { @@ -200,12 +204,12 @@ func (s *SerialDB) Has(key []byte) error { } s.mutBatch.RLock() - if s.batch.IsRemoved(key) { + if s.isRemoved(key) { s.mutBatch.RUnlock() return common.ErrKeyNotFound } - data := s.batch.Get(key) + data := s.getFromBatches(key) s.mutBatch.RUnlock() if data != nil { @@ -228,6 +232,30 @@ func (s *SerialDB) Has(key []byte) error { return result } +func (s *SerialDB) isRemoved(key []byte) bool { + if s.accessBatch.IsRemoved(key) { + return true + } + if check.IfNil(s.writingBatch) { + return false + } + + return s.writingBatch.IsRemoved(key) +} + +func (s *SerialDB) getFromBatches(key []byte) []byte { + // start testing the access batch as it will contain the most up-to-date variant + data := s.accessBatch.Get(key) + if data != nil { + return data + } + if check.IfNil(s.writingBatch) { + return nil + } + + return s.writingBatch.Get(key) +} + func (s *SerialDB) tryWriteInDbAccessChan(req serialQueryer) error { select { case s.dbAccess <- req: @@ -240,13 +268,20 @@ func (s *SerialDB) tryWriteInDbAccessChan(req serialQueryer) error { // putBatch writes the Batch data into the database func (s *SerialDB) putBatch() error { s.mutBatch.Lock() - dbBatch, ok := s.batch.(*batch) + if !check.IfNil(s.writingBatch) { + s.mutBatch.Unlock() + return nil + } + + s.writingBatch = s.accessBatch + + dbBatch, ok := s.writingBatch.(*batch) if !ok { s.mutBatch.Unlock() return common.ErrInvalidBatch } s.sizeBatch = 0 - s.batch = NewBatch() + s.accessBatch = NewBatch() s.mutBatch.Unlock() ch := make(chan error) @@ -262,6 +297,10 @@ func (s *SerialDB) putBatch() error { result := <-ch close(ch) + s.mutBatch.Lock() + s.writingBatch = nil + s.mutBatch.Unlock() + return result } @@ -287,7 +326,7 @@ func (s *SerialDB) Remove(key []byte) error { } s.mutBatch.Lock() - _ = s.batch.Delete(key) + _ = s.accessBatch.Delete(key) s.mutBatch.Unlock() return s.updateBatchWithIncrement()