diff --git a/internal/testing/cluster_test.go b/internal/testing/cluster_test.go index e811738f6..ef4b4efd7 100644 --- a/internal/testing/cluster_test.go +++ b/internal/testing/cluster_test.go @@ -891,7 +891,10 @@ func TestUploadDownloadSpending(t *testing.T) { cluster.MineToRenewWindow() // wait for the contract to be renewed - tt.Retry(100, 100*time.Millisecond, func() error { + tt.Retry(10, time.Second, func() error { + // mine a block + cluster.MineBlocks(1) + // fetch contracts cms, err := cluster.Bus.Contracts(context.Background(), api.ContractsOpts{}) tt.OK(err) diff --git a/worker/mocks_test.go b/worker/mocks_test.go index b2597240c..7335a4962 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -8,10 +8,12 @@ import ( "time" rhpv2 "go.sia.tech/core/rhp/v2" + rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/hostdb" "go.sia.tech/renterd/object" + "go.uber.org/zap" "lukechampine.com/frand" ) @@ -23,7 +25,7 @@ type ( sectors map[types.Hash256]*[rhpv2.SectorSize]byte } - mockContractLocker struct { + mockContractStore struct { contracts map[types.FileContractID]*mockContract } @@ -42,18 +44,39 @@ type ( mockMemoryManager struct{} mockObjectStore struct { - mu sync.Mutex - objects map[string]map[string]object.Object + mu sync.Mutex + objects map[string]map[string]object.Object + partials map[string]mockPackedSlab + bufferIDCntr uint // allows marking packed slabs as uploaded + } + + mockPackedSlab struct { + parameterKey string // ([minshards]-[totalshards]-[contractset]) + bufferID uint + slabKey object.EncryptionKey + data []byte + } + + mockWorker struct { + cs *mockContractStore + hm *mockHostManager + mm *mockMemoryManager + os *mockObjectStore + + dl *downloadManager + ul *uploadManager + + contracts map[types.PublicKey]api.ContractMetadata } ) var ( - _ ContractLocker = (*mockContractLocker)(nil) - _ Host = (*mockHost)(nil) - _ HostManager = (*mockHostManager)(nil) - _ Memory = (*mockMemory)(nil) - _ MemoryManager = (*mockMemoryManager)(nil) - _ ObjectStore = (*mockObjectStore)(nil) + _ ContractStore = (*mockContractStore)(nil) + _ Host = (*mockHost)(nil) + _ HostManager = (*mockHostManager)(nil) + _ Memory = (*mockMemory)(nil) + _ MemoryManager = (*mockMemoryManager)(nil) + _ ObjectStore = (*mockObjectStore)(nil) ) var ( @@ -74,6 +97,10 @@ func (mm *mockMemoryManager) AcquireMemory(ctx context.Context, amt uint64) Memo return &mockMemory{} } +func (os *mockContractStore) RenewedContract(ctx context.Context, renewedFrom types.FileContractID) (api.ContractMetadata, error) { + return api.ContractMetadata{}, api.ErrContractNotFound +} + func (os *mockObjectStore) AddMultipartPart(ctx context.Context, bucket, path, contractSet, ETag, uploadID string, partNumber int, slices []object.SlabSlice) (err error) { return nil } @@ -189,8 +216,8 @@ func (hp *mockHostManager) Host(hk types.PublicKey, fcid types.FileContractID, s return hp.hosts[hk] } -func (cl *mockContractLocker) AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error) { - if lock, ok := cl.contracts[fcid]; !ok { +func (cs *mockContractStore) AcquireContract(ctx context.Context, fcid types.FileContractID, priority int, d time.Duration) (lockID uint64, err error) { + if lock, ok := cs.contracts[fcid]; !ok { return 0, errContractNotFound } else { lock.mu.Lock() @@ -199,8 +226,8 @@ func (cl *mockContractLocker) AcquireContract(ctx context.Context, fcid types.Fi return 0, nil } -func (cl *mockContractLocker) ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error) { - if lock, ok := cl.contracts[fcid]; !ok { +func (cs *mockContractStore) ReleaseContract(ctx context.Context, fcid types.FileContractID, lockID uint64) (err error) { + if lock, ok := cs.contracts[fcid]; !ok { return errContractNotFound } else { lock.mu.Unlock() @@ -208,7 +235,7 @@ func (cl *mockContractLocker) ReleaseContract(ctx context.Context, fcid types.Fi return nil } -func (cl *mockContractLocker) KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error) { +func (cs *mockContractStore) KeepaliveContract(ctx context.Context, fcid types.FileContractID, lockID uint64, d time.Duration) (err error) { return nil } @@ -243,12 +270,12 @@ func newMockContract(fcid types.FileContractID) *mockContract { } } -func newMockContractLocker(contracts []*mockContract) *mockContractLocker { - cl := &mockContractLocker{contracts: make(map[types.FileContractID]*mockContract)} +func newMockContractStore(contracts []*mockContract) *mockContractStore { + cs := &mockContractStore{contracts: make(map[types.FileContractID]*mockContract)} for _, c := range contracts { - cl.contracts[c.rev.ParentID] = c + cs.contracts[c.rev.ParentID] = c } - return cl + return cs } func newMockHostManager(hosts []*mockHost) *mockHostManager { @@ -270,3 +297,48 @@ func newMockSector() (*[rhpv2.SectorSize]byte, types.Hash256) { frand.Read(sector[:]) return §or, rhpv2.SectorRoot(§or) } + +func newMockWorker(numHosts int) *mockWorker { + // create hosts and contracts + hosts := newMockHosts(numHosts) + contracts := newMockContracts(hosts) + + // create dependencies + cs := newMockContractStore(contracts) + hm := newMockHostManager(hosts) + os := newMockObjectStore() + mm := &mockMemoryManager{} + + dl := newDownloadManager(context.Background(), hm, mm, os, 0, 0, zap.NewNop().Sugar()) + ul := newUploadManager(context.Background(), hm, mm, os, cs, 0, 0, time.Minute, zap.NewNop().Sugar()) + + // create contract metadata + metadatas := make(map[types.PublicKey]api.ContractMetadata) + for _, h := range hosts { + metadatas[h.hk] = api.ContractMetadata{ + ID: h.c.rev.ParentID, + HostKey: h.hk, + } + } + + return &mockWorker{ + hm: hm, + mm: mm, + os: os, + + dl: dl, + ul: ul, + + contracts: metadatas, + } +} + +func newTestHostPriceTable(expiry time.Time) hostdb.HostPriceTable { + var uid rhpv3.SettingsID + frand.Read(uid[:]) + + return hostdb.HostPriceTable{ + HostPriceTable: rhpv3.HostPriceTable{UID: uid, Validity: time.Minute}, + Expiry: expiry, + } +} diff --git a/worker/upload.go b/worker/upload.go index bab0a34b8..8ce41b342 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -39,7 +39,7 @@ type ( hm HostManager mm MemoryManager os ObjectStore - cl ContractLocker + cs ContractStore logger *zap.SugaredLogger contractLockDuration time.Duration @@ -110,13 +110,15 @@ type ( } sectorUpload struct { - index int - data *[rhpv2.SectorSize]byte - root types.Hash256 - uploaded object.Sector + index int + root types.Hash256 ctx context.Context cancel context.CancelFunc + + mu sync.Mutex + uploaded object.Sector + data *[rhpv2.SectorSize]byte } sectorUploadReq struct { @@ -313,12 +315,12 @@ func (w *worker) uploadPackedSlab(ctx context.Context, rs api.RedundancySettings return nil } -func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os ObjectStore, cl ContractLocker, maxOverdrive uint64, overdriveTimeout time.Duration, contractLockDuration time.Duration, logger *zap.SugaredLogger) *uploadManager { +func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os ObjectStore, cs ContractStore, maxOverdrive uint64, overdriveTimeout time.Duration, contractLockDuration time.Duration, logger *zap.SugaredLogger) *uploadManager { return &uploadManager{ hm: hm, mm: mm, os: os, - cl: cl, + cs: cs, logger: logger, contractLockDuration: contractLockDuration, @@ -335,10 +337,11 @@ func newUploadManager(ctx context.Context, hm HostManager, mm MemoryManager, os } } -func (mgr *uploadManager) newUploader(os ObjectStore, cl ContractLocker, h Host, c api.ContractMetadata, bh uint64) *uploader { +func (mgr *uploadManager) newUploader(os ObjectStore, cs ContractStore, hm HostManager, c api.ContractMetadata, bh uint64) *uploader { return &uploader{ os: os, - cl: cl, + cs: cs, + hm: hm, logger: mgr.logger, // static @@ -352,7 +355,7 @@ func (mgr *uploadManager) newUploader(os ObjectStore, cl ContractLocker, h Host, statsSectorUploadSpeedBytesPerMS: stats.NoDecay(), // covered by mutex - host: h, + host: hm.Host(c.HostKey, c.ID, c.SiamuxAddr), bh: bh, fcid: c.ID, endHeight: c.WindowEnd, @@ -559,6 +562,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a mem.Release() }(up.rs, data, length, slabIndex) } + slabIndex++ } }() @@ -727,10 +731,9 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh var refreshed []*uploader existing := make(map[types.FileContractID]struct{}) for _, uploader := range mgr.uploaders { - // renew uploaders that got renewed + // refresh uploaders that got renewed if renewal, renewed := renewals[uploader.ContractID()]; renewed { - host := mgr.hm.Host(renewal.HostKey, renewal.ID, renewal.SiamuxAddr) - uploader.Renew(host, renewal, bh) + uploader.Refresh(renewal, bh) } // stop uploaders that expired @@ -751,8 +754,7 @@ func (mgr *uploadManager) refreshUploaders(contracts []api.ContractMetadata, bh // add missing uploaders for _, c := range contracts { if _, exists := existing[c.ID]; !exists { - host := mgr.hm.Host(c.HostKey, c.ID, c.SiamuxAddr) - uploader := mgr.newUploader(mgr.os, mgr.cl, host, c, bh) + uploader := mgr.newUploader(mgr.os, mgr.cs, mgr.hm, c, bh) refreshed = append(refreshed, uploader) go uploader.Start() } @@ -1000,7 +1002,6 @@ func (s *slabUpload) launch(req *sectorUploadReq) error { s.lastOverdrive = time.Now() s.numOverdriving++ } - // update the state s.numInflight++ s.numLaunched++ @@ -1071,18 +1072,15 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { } // store the sector - sector.uploaded = object.Sector{ + sector.finish(object.Sector{ Contracts: map[types.PublicKey][]types.FileContractID{req.hk: {req.fcid}}, LatestHost: req.hk, Root: resp.root, - } + }) // update uploaded sectors s.numUploaded++ - // cancel the sector's context - sector.cancel() - // release all other candidates for this sector for _, candidate := range s.candidates { if candidate.req != nil && candidate.req != req && candidate.req.sector.index == sector.index { @@ -1091,16 +1089,30 @@ func (s *slabUpload) receive(resp sectorUploadResp) (bool, bool) { } // release memory - sector.data = nil s.mem.ReleaseSome(rhpv2.SectorSize) return true, s.numUploaded == s.numSectors } +func (s *sectorUpload) finish(sector object.Sector) { + s.mu.Lock() + defer s.mu.Unlock() + + s.cancel() + s.uploaded = sector + s.data = nil +} + func (s *sectorUpload) isUploaded() bool { return s.uploaded.Root != (types.Hash256{}) } +func (s *sectorUpload) sectorData() *[rhpv2.SectorSize]byte { + s.mu.Lock() + defer s.mu.Unlock() + return s.data +} + func (req *sectorUploadReq) done() bool { select { case <-req.sector.ctx.Done(): diff --git a/worker/upload_test.go b/worker/upload_test.go index 3534d2edf..aff063e00 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -27,14 +27,14 @@ func TestUploadDownload(t *testing.T) { contracts := newMockContracts(hosts) // mock dependencies - cl := newMockContractLocker(contracts) + cs := newMockContractStore(contracts) hm := newMockHostManager(hosts) os := newMockObjectStore() mm := &mockMemoryManager{} // create managers dl := newDownloadManager(context.Background(), hm, mm, os, 0, 0, zap.NewNop().Sugar()) - ul := newUploadManager(context.Background(), hm, mm, os, cl, 0, 0, time.Minute, zap.NewNop().Sugar()) + ul := newUploadManager(context.Background(), hm, mm, os, cs, 0, 0, time.Minute, zap.NewNop().Sugar()) // create test data data := make([]byte, 128) diff --git a/worker/uploader.go b/worker/uploader.go index e85bc260d..bc8f06a5e 100644 --- a/worker/uploader.go +++ b/worker/uploader.go @@ -22,7 +22,8 @@ const ( type ( uploader struct { os ObjectStore - cl ContractLocker + cs ContractStore + hm HostManager logger *zap.SugaredLogger hk types.PublicKey @@ -70,12 +71,12 @@ func (u *uploader) Healthy() bool { return u.consecutiveFailures == 0 } -func (u *uploader) Renew(h Host, c api.ContractMetadata, bh uint64) { +func (u *uploader) Refresh(c api.ContractMetadata, bh uint64) { u.mu.Lock() defer u.mu.Unlock() u.bh = bh - u.host = h + u.host = u.hm.Host(c.HostKey, c.ID, c.SiamuxAddr) u.fcid = c.ID u.siamuxAddr = c.SiamuxAddr u.endHeight = c.WindowEnd @@ -120,8 +121,10 @@ outer: // the uploader's contract got renewed, requeue the request if errors.Is(err, errMaxRevisionReached) { - u.enqueue(req) - continue outer + if u.tryRefresh(req.sector.ctx) { + u.enqueue(req) + continue outer + } } // send the response @@ -196,13 +199,13 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, u.mu.Unlock() // acquire contract lock - lockID, err := u.cl.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration) + lockID, err := u.cs.AcquireContract(req.sector.ctx, fcid, req.contractLockPriority, req.contractLockDuration) if err != nil { return types.Hash256{}, 0, err } // defer the release - lock := newContractLock(u.shutdownCtx, fcid, lockID, req.contractLockDuration, u.cl, u.logger) + lock := newContractLock(u.shutdownCtx, fcid, lockID, req.contractLockDuration, u.cs, u.logger) defer func() { ctx, cancel := context.WithTimeout(u.shutdownCtx, 10*time.Second) lock.Release(ctx) @@ -228,7 +231,7 @@ func (u *uploader) execute(req *sectorUploadReq) (types.Hash256, time.Duration, // upload the sector start := time.Now() - root, err := host.UploadSector(ctx, req.sector.data, rev) + root, err := host.UploadSector(ctx, req.sector.sectorData(), rev) if err != nil { return types.Hash256{}, 0, fmt.Errorf("failed to upload sector to contract %v, err: %v", fcid, err) } @@ -287,3 +290,18 @@ func (u *uploader) tryRecomputeStats() { u.statsSectorUploadEstimateInMS.Recompute() u.statsSectorUploadSpeedBytesPerMS.Recompute() } + +func (u *uploader) tryRefresh(ctx context.Context) bool { + // fetch the renewed contract + renewed, err := u.cs.RenewedContract(ctx, u.ContractID()) + if isError(err, api.ErrContractNotFound) || isError(err, context.Canceled) { + return false + } else if err != nil { + u.logger.Errorf("failed to fetch renewed contract %v, err: %v", u.ContractID(), err) + return false + } + + // renew the uploader with the renewed contract + u.Refresh(renewed, u.BlockHeight()) + return true +} diff --git a/worker/worker.go b/worker/worker.go index 8ba6f03b0..04c947ed9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -69,7 +69,7 @@ type ( webhooks.Broadcaster AccountStore - ContractLocker + ContractStore ObjectStore BroadcastTransaction(ctx context.Context, txns []types.Transaction) error @@ -79,7 +79,6 @@ type ( ContractSize(ctx context.Context, id types.FileContractID) (api.ContractSize, error) ContractRoots(ctx context.Context, id types.FileContractID) ([]types.Hash256, []types.Hash256, error) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) - RenewedContract(ctx context.Context, renewedFrom types.FileContractID) (api.ContractMetadata, error) RecordHostScans(ctx context.Context, scans []hostdb.HostScan) error RecordPriceTables(ctx context.Context, priceTableUpdate []hostdb.PriceTableUpdate) error @@ -116,6 +115,12 @@ type ( ScheduleSync(ctx context.Context, id rhpv3.Account, hk types.PublicKey) error } + ContractStore interface { + ContractLocker + + RenewedContract(ctx context.Context, renewedFrom types.FileContractID) (api.ContractMetadata, error) + } + ObjectStore interface { // NOTE: used for download DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) error