Skip to content

Commit

Permalink
Merge pull request #908 from SiaFoundation/chris/cherry-pick-upload-d…
Browse files Browse the repository at this point in the history
…eadlock-fix

Cherry-pick upload deadlock fix
  • Loading branch information
ChrisSchinnerl authored Jan 19, 2024
2 parents edc2889 + 13d25bc commit 969b48a
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 53 deletions.
5 changes: 4 additions & 1 deletion internal/testing/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,10 @@ func TestUploadDownloadSpending(t *testing.T) {
cluster.MineToRenewWindow()

// wait for the contract to be renewed
tt.Retry(100, 100*time.Millisecond, func() error {
tt.Retry(10, time.Second, func() error {
// mine a block
cluster.MineBlocks(1)

// fetch contracts
cms, err := cluster.Bus.Contracts(context.Background(), api.ContractsOpts{})
tt.OK(err)
Expand Down
108 changes: 90 additions & 18 deletions worker/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"go.sia.tech/renterd/object"
"go.uber.org/zap"
"lukechampine.com/frand"
)

Expand All @@ -23,7 +25,7 @@ type (
sectors map[types.Hash256]*[rhpv2.SectorSize]byte
}

mockContractLocker struct {
mockContractStore struct {
contracts map[types.FileContractID]*mockContract
}

Expand All @@ -42,18 +44,39 @@ type (
mockMemoryManager struct{}

mockObjectStore struct {
mu sync.Mutex
objects map[string]map[string]object.Object
mu sync.Mutex
objects map[string]map[string]object.Object
partials map[string]mockPackedSlab
bufferIDCntr uint // allows marking packed slabs as uploaded
}

mockPackedSlab struct {
parameterKey string // ([minshards]-[totalshards]-[contractset])
bufferID uint
slabKey object.EncryptionKey
data []byte
}

mockWorker struct {
cs *mockContractStore
hm *mockHostManager
mm *mockMemoryManager
os *mockObjectStore

dl *downloadManager
ul *uploadManager

contracts map[types.PublicKey]api.ContractMetadata
}
)

var (
_ ContractLocker = (*mockContractLocker)(nil)
_ Host = (*mockHost)(nil)
_ HostManager = (*mockHostManager)(nil)
_ Memory = (*mockMemory)(nil)
_ MemoryManager = (*mockMemoryManager)(nil)
_ ObjectStore = (*mockObjectStore)(nil)
_ ContractStore = (*mockContractStore)(nil)
_ Host = (*mockHost)(nil)
_ HostManager = (*mockHostManager)(nil)
_ Memory = (*mockMemory)(nil)
_ MemoryManager = (*mockMemoryManager)(nil)
_ ObjectStore = (*mockObjectStore)(nil)
)

var (
Expand All @@ -74,6 +97,10 @@ func (mm *mockMemoryManager) AcquireMemory(ctx context.Context, amt uint64) Memo
return &mockMemory{}
}

func (os *mockContractStore) RenewedContract(ctx context.Context, renewedFrom types.FileContractID) (api.ContractMetadata, error) {
return api.ContractMetadata{}, api.ErrContractNotFound
}

func (os *mockObjectStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, ETag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) {
return nil
}
Expand Down Expand Up @@ -189,8 +216,8 @@ func (hp *mockHostManager) Host(hk types.PublicKey, fcid types.FileContractID, s
return hp.hosts[hk]
}

func (cl *mockContractLocker) AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error) {
if lock, ok := cl.contracts[fcid]; !ok {
func (cs *mockContractStore) AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error) {
if lock, ok := cs.contracts[fcid]; !ok {
return 0, errContractNotFound
} else {
lock.mu.Lock()
Expand All @@ -199,16 +226,16 @@ func (cl *mockContractLocker) AcquireContract(ctx context.Context, fcid types.Fi
return 0, nil
}

func (cl *mockContractLocker) ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error) {
if lock, ok := cl.contracts[fcid]; !ok {
func (cs *mockContractStore) ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error) {
if lock, ok := cs.contracts[fcid]; !ok {
return errContractNotFound
} else {
lock.mu.Unlock()
}
return nil
}

func (cl *mockContractLocker) KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error) {
func (cs *mockContractStore) KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error) {
return nil
}

Expand Down Expand Up @@ -243,12 +270,12 @@ func newMockContract(fcid types.FileContractID) *mockContract {
}
}

func newMockContractLocker(contracts []*mockContract) *mockContractLocker {
cl := &mockContractLocker{contracts: make(map[types.FileContractID]*mockContract)}
func newMockContractStore(contracts []*mockContract) *mockContractStore {
cs := &mockContractStore{contracts: make(map[types.FileContractID]*mockContract)}
for _, c := range contracts {
cl.contracts[c.rev.ParentID] = c
cs.contracts[c.rev.ParentID] = c
}
return cl
return cs
}

func newMockHostManager(hosts []*mockHost) *mockHostManager {
Expand All @@ -270,3 +297,48 @@ func newMockSector() (*[rhpv2.SectorSize]byte, types.Hash256) {
frand.Read(sector[:])
return &sector, rhpv2.SectorRoot(&sector)
}

func newMockWorker(numHosts int) *mockWorker {
// create hosts and contracts
hosts := newMockHosts(numHosts)
contracts := newMockContracts(hosts)

// create dependencies
cs := newMockContractStore(contracts)
hm := newMockHostManager(hosts)
os := newMockObjectStore()
mm := &mockMemoryManager{}

dl := newDownloadManager(context.Background(), hm, mm, os, 0, 0, zap.NewNop().Sugar())
ul := newUploadManager(context.Background(), hm, mm, os, cs, 0, 0, time.Minute, zap.NewNop().Sugar())

// create contract metadata
metadatas := make(map[types.PublicKey]api.ContractMetadata)
for _, h := range hosts {
metadatas[h.hk] = api.ContractMetadata{
ID: h.c.rev.ParentID,
HostKey: h.hk,
}
}

return &mockWorker{
hm: hm,
mm: mm,
os: os,

dl: dl,
ul: ul,

contracts: metadatas,
}
}

func newTestHostPriceTable(expiry time.Time) hostdb.HostPriceTable {
var uid rhpv3.SettingsID
frand.Read(uid[:])

return hostdb.HostPriceTable{
HostPriceTable: rhpv3.HostPriceTable{UID: uid, Validity: time.Minute},
Expiry: expiry,
}
}
56 changes: 34 additions & 22 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type (
hm HostManager
mm MemoryManager
os ObjectStore
cl ContractLocker
cs ContractStore
logger *zap.SugaredLogger

contractLockDuration time.Duration
Expand Down Expand Up @@ -110,13 +110,15 @@ type (
}

sectorUpload struct {
index int
data *[rhpv2.SectorSize]byte
root types.Hash256
uploaded object.Sector
index int
root types.Hash256

ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
uploaded object.Sector
data *[rhpv2.SectorSize]byte
}

sectorUploadReq struct {
Expand Down Expand Up @@ -313,12 +315,12 @@ func (w *worker) uploadPackedSlab(ctx context.Context, rs api.RedundancySettings
return nil
}

func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os ObjectStore, cl ContractLocker, maxOverdrive uint64, overdriveTimeout time.Duration, contractLockDuration time.Duration, logger *zap.SugaredLogger) *uploadManager {
func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os ObjectStore, cs ContractStore, maxOverdrive uint64, overdriveTimeout time.Duration, contractLockDuration time.Duration, logger *zap.SugaredLogger) *uploadManager {
return &uploadManager{
hm: hm,
mm: mm,
os: os,
cl: cl,
cs: cs,
logger: logger,

contractLockDuration: contractLockDuration,
Expand All @@ -335,10 +337,11 @@ func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os
}
}

func (mgr *uploadManager) newUploader(os ObjectStore, cl ContractLocker, h Host, c api.ContractMetadata, bh uint64) *uploader {
func (mgr *uploadManager) newUploader(os ObjectStore, cs ContractStore, hm HostManager, c api.ContractMetadata, bh uint64) *uploader {
return &uploader{
os: os,
cl: cl,
cs: cs,
hm: hm,
logger: mgr.logger,

// static
Expand All @@ -352,7 +355,7 @@ func (mgr *uploadManager) newUploader(os ObjectStore, cl ContractLocker, h Host,
statsSectorUploadSpeedBytesPerMS: stats.NoDecay(),

// covered by mutex
host: h,
host: hm.Host(c.HostKey, c.ID, c.SiamuxAddr),
bh: bh,
fcid: c.ID,
endHeight: c.WindowEnd,
Expand Down Expand Up @@ -559,6 +562,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a
mem.Release()
}(up.rs, data, length, slabIndex)
}

slabIndex++
}
}()
Expand Down Expand Up @@ -727,10 +731,9 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
var refreshed []*uploader
existing := make(map[types.FileContractID]struct{})
for _, uploader := range mgr.uploaders {
// renew uploaders that got renewed
// refresh uploaders that got renewed
if renewal, renewed := renewals[uploader.ContractID()]; renewed {
host := mgr.hm.Host(renewal.HostKey, renewal.ID, renewal.SiamuxAddr)
uploader.Renew(host, renewal, bh)
uploader.Refresh(renewal, bh)
}

// stop uploaders that expired
Expand All @@ -751,8 +754,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh
// add missing uploaders
for _, c := range contracts {
if _, exists := existing[c.ID]; !exists {
host := mgr.hm.Host(c.HostKey, c.ID, c.SiamuxAddr)
uploader := mgr.newUploader(mgr.os, mgr.cl, host, c, bh)
uploader := mgr.newUploader(mgr.os, mgr.cs, mgr.hm, c, bh)
refreshed = append(refreshed, uploader)
go uploader.Start()
}
Expand Down Expand Up @@ -1000,7 +1002,6 @@ func (s *slabUpload) launch(req *sectorUploadReq) error {
s.lastOverdrive = time.Now()
s.numOverdriving++
}

// update the state
s.numInflight++
s.numLaunched++
Expand Down Expand Up @@ -1071,18 +1072,15 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) {
}

// store the sector
sector.uploaded = object.Sector{
sector.finish(object.Sector{
Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}},
LatestHost: req.hk,
Root: resp.root,
}
})

// update uploaded sectors
s.numUploaded++

// cancel the sector's context
sector.cancel()

// release all other candidates for this sector
for _, candidate := range s.candidates {
if candidate.req != nil && candidate.req != req && candidate.req.sector.index == sector.index {
Expand All @@ -1091,16 +1089,30 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) {
}

// release memory
sector.data = nil
s.mem.ReleaseSome(rhpv2.SectorSize)

return true, s.numUploaded == s.numSectors
}

func (s *sectorUpload) finish(sector object.Sector) {
s.mu.Lock()
defer s.mu.Unlock()

s.cancel()
s.uploaded = sector
s.data = nil
}

func (s *sectorUpload) isUploaded() bool {
return s.uploaded.Root != (types.Hash256{})
}

func (s *sectorUpload) sectorData() *[rhpv2.SectorSize]byte {
s.mu.Lock()
defer s.mu.Unlock()
return s.data
}

func (req *sectorUploadReq) done() bool {
select {
case <-req.sector.ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions worker/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestUploadDownload(t *testing.T) {
contracts := newMockContracts(hosts)

// mock dependencies
cl := newMockContractLocker(contracts)
cs := newMockContractStore(contracts)
hm := newMockHostManager(hosts)
os := newMockObjectStore()
mm := &mockMemoryManager{}

// create managers
dl := newDownloadManager(context.Background(), hm, mm, os, 0, 0, zap.NewNop().Sugar())
ul := newUploadManager(context.Background(), hm, mm, os, cl, 0, 0, time.Minute, zap.NewNop().Sugar())
ul := newUploadManager(context.Background(), hm, mm, os, cs, 0, 0, time.Minute, zap.NewNop().Sugar())

// create test data
data := make([]byte, 128)
Expand Down
Loading

0 comments on commit 969b48a

Please sign in to comment.