Skip to content

Commit

Permalink
- probable fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Sep 18, 2023
1 parent 100b248 commit 2bacb0e
Showing 1 changed file with 53 additions and 14 deletions.
67 changes: 53 additions & 14 deletions leveldb/leveldbSerial.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/core/closing"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/types"
Expand All @@ -25,11 +26,14 @@ type SerialDB struct {
maxBatchSize int
batchDelaySeconds int
sizeBatch int
batch types.Batcher
mutBatch sync.RWMutex
dbAccess chan serialQueryer
cancel context.CancelFunc
closer core.SafeCloser

accessBatch types.Batcher
writingBatch types.Batcher
mutBatch sync.RWMutex

dbAccess chan serialQueryer
cancel context.CancelFunc
closer core.SafeCloser
}

// NewSerialDB is a constructor for the leveldb persister
Expand Down Expand Up @@ -80,7 +84,7 @@ func NewSerialDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFi
closer: closing.NewSafeChanCloser(),
}

dbStore.batch = NewBatch()
dbStore.accessBatch = NewBatch()

go dbStore.batchTimeoutHandle(ctx)
go dbStore.processLoop(ctx)
Expand Down Expand Up @@ -142,7 +146,7 @@ func (s *SerialDB) Put(key, val []byte) error {
}

s.mutBatch.RLock()
err := s.batch.Put(key, val)
err := s.accessBatch.Put(key, val)
s.mutBatch.RUnlock()
if err != nil {
return err
Expand All @@ -158,12 +162,12 @@ func (s *SerialDB) Get(key []byte) ([]byte, error) {
}

s.mutBatch.RLock()
if s.batch.IsRemoved(key) {
if s.isRemoved(key) {
s.mutBatch.RUnlock()
return nil, common.ErrKeyNotFound
}

data := s.batch.Get(key)
data := s.getFromBatches(key)
s.mutBatch.RUnlock()

if data != nil {
Expand Down Expand Up @@ -200,12 +204,12 @@ func (s *SerialDB) Has(key []byte) error {
}

s.mutBatch.RLock()
if s.batch.IsRemoved(key) {
if s.isRemoved(key) {
s.mutBatch.RUnlock()
return common.ErrKeyNotFound
}

data := s.batch.Get(key)
data := s.getFromBatches(key)
s.mutBatch.RUnlock()

if data != nil {
Expand All @@ -228,6 +232,30 @@ func (s *SerialDB) Has(key []byte) error {
return result
}

func (s *SerialDB) isRemoved(key []byte) bool {
if s.accessBatch.IsRemoved(key) {
return true
}
if check.IfNil(s.writingBatch) {
return false
}

return s.writingBatch.IsRemoved(key)
}

func (s *SerialDB) getFromBatches(key []byte) []byte {
// start testing the access batch as it will contain the most up-to-date variant
data := s.accessBatch.Get(key)
if data != nil {
return data
}
if check.IfNil(s.writingBatch) {
return nil
}

return s.writingBatch.Get(key)
}

func (s *SerialDB) tryWriteInDbAccessChan(req serialQueryer) error {
select {
case s.dbAccess <- req:
Expand All @@ -240,13 +268,20 @@ func (s *SerialDB) tryWriteInDbAccessChan(req serialQueryer) error {
// putBatch writes the Batch data into the database
func (s *SerialDB) putBatch() error {
s.mutBatch.Lock()
dbBatch, ok := s.batch.(*batch)
if !check.IfNil(s.writingBatch) {
s.mutBatch.Unlock()
return nil
}

s.writingBatch = s.accessBatch

dbBatch, ok := s.writingBatch.(*batch)
if !ok {
s.mutBatch.Unlock()
return common.ErrInvalidBatch
}
s.sizeBatch = 0
s.batch = NewBatch()
s.accessBatch = NewBatch()
s.mutBatch.Unlock()

ch := make(chan error)
Expand All @@ -262,6 +297,10 @@ func (s *SerialDB) putBatch() error {
result := <-ch
close(ch)

s.mutBatch.Lock()
s.writingBatch = nil
s.mutBatch.Unlock()

return result
}

Expand All @@ -287,7 +326,7 @@ func (s *SerialDB) Remove(key []byte) error {
}

s.mutBatch.Lock()
_ = s.batch.Delete(key)
_ = s.accessBatch.Delete(key)
s.mutBatch.Unlock()

return s.updateBatchWithIncrement()
Expand Down

0 comments on commit 2bacb0e

Please sign in to comment.