diff --git a/.changeset/refactor_sector_management.md b/.changeset/refactor_sector_management.md new file mode 100644 index 00000000..c69638d2 --- /dev/null +++ b/.changeset/refactor_sector_management.md @@ -0,0 +1,7 @@ +--- +default: minor +--- + +# Refactor Sector Management + +Improves sector lookups by 50% on average by removing the sector lock tables and moving reference pruning out of the hot path. \ No newline at end of file diff --git a/api/volumes.go b/api/volumes.go index 9f77f805..c8552a40 100644 --- a/api/volumes.go +++ b/api/volumes.go @@ -218,7 +218,7 @@ func (a *api) handleGETVerifySector(jc jape.Context) { // if the sector is not referenced return the empty response without // attempting to read the sector data - if len(refs.Contracts) == 0 && refs.TempStorage == 0 && refs.Locks == 0 { + if len(refs.Contracts) == 0 && refs.TempStorage == 0 { jc.Encode(resp) return } diff --git a/host/accounts/budget_test.go b/host/accounts/budget_test.go index 86945af2..df1432b9 100644 --- a/host/accounts/budget_test.go +++ b/host/accounts/budget_test.go @@ -23,7 +23,6 @@ func TestUsageTotal(t *testing.T) { for i := 0; i < uv.NumField(); i++ { v := types.NewCurrency(frand.Uint64n(math.MaxUint64), 0) total = total.Add(v) - t.Log("setting field", uv.Type().Field(i).Name, "to", v.ExactString()) uv.Field(i).Set(reflect.ValueOf(v)) } diff --git a/host/contracts/contracts_test.go b/host/contracts/contracts_test.go index b6492345..d4c3c8ae 100644 --- a/host/contracts/contracts_test.go +++ b/host/contracts/contracts_test.go @@ -96,22 +96,12 @@ func TestContractUpdater(t *testing.T) { } defer updater.Close() - var releaseFuncs []func() error - defer func() { - for _, release := range releaseFuncs { - if err := release(); err != nil { - t.Fatal(err) - } - } - }() - for i := 0; i < test.append; i++ { root := frand.Entropy256() - release, err := node.Store.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := node.Store.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { t.Fatal(err) } - releaseFuncs = append(releaseFuncs, release) updater.AppendSector(root) roots = append(roots, root) } @@ -135,11 +125,6 @@ func TestContractUpdater(t *testing.T) { } else if err := updater.Close(); err != nil { t.Fatal(err) } - for _, release := range releaseFuncs { - if err := release(); err != nil { - t.Fatal(err) - } - } // check that the sector roots are correct in the database allRoots, err := node.Store.SectorRoots() diff --git a/host/contracts/integrity_test.go b/host/contracts/integrity_test.go index 6f9155b3..6816a97e 100644 --- a/host/contracts/integrity_test.go +++ b/host/contracts/integrity_test.go @@ -104,16 +104,14 @@ func TestCheckIntegrity(t *testing.T) { defer updater.Close() var roots []types.Hash256 - var releases []func() error for i := 0; i < 5; i++ { var sector [rhp2.SectorSize]byte frand.Read(sector[:256]) root := rhp2.SectorRoot(§or) - release, err := host.Volumes.Write(root, §or) + err := host.Volumes.Write(root, §or) if err != nil { t.Fatal(err) } - releases = append(releases, release) roots = append(roots, root) updater.AppendSector(root) } @@ -126,12 +124,6 @@ func TestCheckIntegrity(t *testing.T) { t.Fatal(err) } - for _, release := range releases { - if err := release(); err != nil { - t.Fatal(err) - } - } - // helper func to serialize integrity check checkIntegrity := func() (issues, checked, sectors uint64, err error) { results, sectors, err := host.Contracts.CheckIntegrity(context.Background(), rev.Revision.ParentID) diff --git a/host/contracts/manager_test.go b/host/contracts/manager_test.go index 2da4527b..9e1b26cf 100644 --- a/host/contracts/manager_test.go +++ b/host/contracts/manager_test.go @@ -350,25 +350,15 @@ func TestContractLifecycle(t *testing.T) { assertContractStatus(t, node.Contracts, rev.Revision.ParentID, contracts.ContractStatusActive) assertContractMetrics(t, node.Store, 1, 0, hostCollateral, types.ZeroCurrency) - var releaseFuncs []func() error - defer func() { - for _, release := range releaseFuncs { - if err := release(); err != nil { - t.Fatal(err) - } - } - }() - var roots []types.Hash256 for i := 0; i < 5; i++ { var sector [rhp2.SectorSize]byte frand.Read(sector[:256]) root := rhp2.SectorRoot(§or) - release, err := node.Volumes.Write(root, §or) - if err != nil { + + if err := node.Volumes.Write(root, §or); err != nil { t.Fatal(err) } - releaseFuncs = append(releaseFuncs, release) roots = append(roots, root) } @@ -404,11 +394,6 @@ func TestContractLifecycle(t *testing.T) { if err != nil { t.Fatal(err) } - for _, release := range releaseFuncs { - if err := release(); err != nil { - t.Fatal(err) - } - } assertContractMetrics(t, node.Store, 1, 0, hostCollateral, collateral) @@ -637,17 +622,15 @@ func TestContractLifecycle(t *testing.T) { assertContractMetrics(t, node.Store, 1, 0, hostCollateral, types.ZeroCurrency) // add sectors to the volume manager - var releaseFuncs []func() error var roots []types.Hash256 for i := 0; i < 5; i++ { var sector [rhp2.SectorSize]byte frand.Read(sector[:]) root := rhp2.SectorRoot(§or) - release, err := node.Volumes.Write(root, §or) + err := node.Volumes.Write(root, §or) if err != nil { t.Fatal(err) } - releaseFuncs = append(releaseFuncs, release) roots = append(roots, root) } @@ -684,13 +667,6 @@ func TestContractLifecycle(t *testing.T) { t.Fatal(err) } - // release the sectors - for _, release := range releaseFuncs { - if err := release(); err != nil { - t.Fatal(err) - } - } - assertContractStatus(t, node.Contracts, rev.Revision.ParentID, contracts.ContractStatusActive) assertContractMetrics(t, node.Store, 1, 0, hostCollateral, collateral) @@ -752,6 +728,11 @@ func TestV2ContractLifecycle(t *testing.T) { assertContractMetrics := func(t *testing.T, locked, risked types.Currency) { t.Helper() + // ensure any dereferenced sectors have been pruned + if err := node.Store.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { + t.Fatal(err) + } + m, err := node.Store.Metrics(time.Now()) if err != nil { t.Fatal(err) @@ -854,11 +835,9 @@ func TestV2ContractLifecycle(t *testing.T) { root := rhp2.SectorRoot(§or) roots := []types.Hash256{root} - release, err := node.Volumes.Write(root, §or) - if err != nil { + if err := node.Volumes.Write(root, §or); err != nil { t.Fatal(err) } - defer release() fc.Filesize = proto4.SectorSize fc.Capacity = proto4.SectorSize @@ -873,14 +852,12 @@ func TestV2ContractLifecycle(t *testing.T) { fc.HostSignature = hostKey.SignHash(sigHash) fc.RenterSignature = renterKey.SignHash(sigHash) - err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ + err := node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ Storage: cost, RiskedCollateral: collateral, }) if err != nil { t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } // metrics should not have been updated, contract is still pending assertContractMetrics(t, types.ZeroCurrency, types.ZeroCurrency) @@ -914,11 +891,9 @@ func TestV2ContractLifecycle(t *testing.T) { root := frand.Entropy256() // random root roots := []types.Hash256{root} - release, err := node.Volumes.Write(root, §or) - if err != nil { + if err := node.Volumes.Write(root, §or); err != nil { t.Fatal(err) } - defer release() fc.Filesize = proto4.SectorSize fc.Capacity = proto4.SectorSize @@ -933,14 +908,12 @@ func TestV2ContractLifecycle(t *testing.T) { fc.HostSignature = hostKey.SignHash(sigHash) fc.RenterSignature = renterKey.SignHash(sigHash) - err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ + err := node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ Storage: cost, RiskedCollateral: collateral, }) if err != nil { t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } // metrics should not have been updated, contract is still pending assertContractMetrics(t, types.ZeroCurrency, types.ZeroCurrency) @@ -974,11 +947,9 @@ func TestV2ContractLifecycle(t *testing.T) { root := rhp2.SectorRoot(§or) roots := []types.Hash256{root} - release, err := node.Volumes.Write(root, §or) - if err != nil { + if err := node.Volumes.Write(root, §or); err != nil { t.Fatal(err) } - defer release() fc.Filesize = proto4.SectorSize fc.Capacity = proto4.SectorSize @@ -993,14 +964,12 @@ func TestV2ContractLifecycle(t *testing.T) { fc.HostSignature = hostKey.SignHash(sigHash) fc.RenterSignature = renterKey.SignHash(sigHash) - err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ + err := node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{ Storage: cost, RiskedCollateral: collateral, }) if err != nil { t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } // mine to confirm the contract @@ -1264,11 +1233,10 @@ func TestSectorRoots(t *testing.T) { for i := 0; i < sectors; i++ { root, err := func() (types.Hash256, error) { root := frand.Entropy256() - release, err := node.Store.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := node.Store.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { return types.Hash256{}, fmt.Errorf("failed to store sector: %w", err) } - defer release() updater, err := node.Contracts.ReviseContract(rev.Revision.ParentID) if err != nil { diff --git a/host/storage/options.go b/host/storage/options.go index 6273a940..b3eed51d 100644 --- a/host/storage/options.go +++ b/host/storage/options.go @@ -1,6 +1,10 @@ package storage -import "go.uber.org/zap" +import ( + "time" + + "go.uber.org/zap" +) // A VolumeManagerOption configures a VolumeManager. type VolumeManagerOption func(*VolumeManager) @@ -25,3 +29,11 @@ func WithCacheSize(cacheSize int) VolumeManagerOption { s.cacheSize = cacheSize } } + +// WithPruneInterval sets the time between cleaning up dereferenced +// sectors. +func WithPruneInterval(d time.Duration) VolumeManagerOption { + return func(vm *VolumeManager) { + vm.pruneInterval = d + } +} diff --git a/host/storage/persist.go b/host/storage/persist.go index b31342a5..7bf248c8 100644 --- a/host/storage/persist.go +++ b/host/storage/persist.go @@ -3,15 +3,19 @@ package storage import ( "context" "errors" + "time" "go.sia.tech/core/types" ) type ( - // MigrateFunc is a callback function that is called for each sector that - // needs to be migrated If the function returns an error, the sector should - // be skipped and migration should continue. - MigrateFunc func(location SectorLocation) error + // StoreFunc is called for every sector that needs written + // to disk. + StoreFunc func(loc SectorLocation) error + // MigrateFunc is called for every sector that needs migration. + // The sector should be migrated from 'from' to 'to' during + // migrate func. + MigrateFunc func(from, to SectorLocation) error // A VolumeStore stores and retrieves information about storage volumes. VolumeStore interface { @@ -42,31 +46,40 @@ type ( // SetAvailable sets the available flag on a volume. SetAvailable(volumeID int64, available bool) error + // PruneSectors removes all sectors that have not been accessed since + // lastAccess and are no longer referenced by a contract or temp storage. + // If the context is canceled, pruning is stopped and the function returns + // with the error. + PruneSectors(ctx context.Context, lastAccess time.Time) error + // MigrateSectors returns a new location for each occupied sector of a // volume starting at min. The sector data should be copied to the new // location and synced to disk during migrateFn. If migrateFn returns an // error, migration will continue, but that sector is not migrated. - MigrateSectors(ctx context.Context, volumeID int64, min uint64, migrateFn MigrateFunc) (migrated, failed int, err error) + MigrateSectors(ctx context.Context, volumeID int64, min uint64, fn MigrateFunc) (migrated, failed int, err error) // StoreSector calls fn with an empty location in a writable volume. If - // the sector root already exists, fn is called with the existing - // location and exists is true. Unless exists is true, The sector must - // be written to disk within fn. If fn returns an error, the metadata is - // rolled back. If no space is available, ErrNotEnoughStorage is - // returned. The location is locked until release is called. - // - // The sector should be referenced by either a contract or temp store - // before release is called to prevent Prune() from removing it. - StoreSector(root types.Hash256, fn func(loc SectorLocation, exists bool) error) (release func() error, err error) + // the sector root already exists, nil is returned. The sector should be + // written to disk within fn. If fn returns an error, the metadata is + // rolled back and the error is returned. If no space is available, + // ErrNotEnoughStorage is returned. + StoreSector(root types.Hash256, fn StoreFunc) error // RemoveSector removes the metadata of a sector and returns its // location in the volume. RemoveSector(root types.Hash256) error + // HasSector returns true if the sector is stored by the host. + HasSector(root types.Hash256) (bool, error) // SectorLocation returns the location of a sector or an error if the // sector is not found. The location is locked until release is // called. - SectorLocation(root types.Hash256) (loc SectorLocation, release func() error, err error) + SectorLocation(root types.Hash256) (loc SectorLocation, err error) + // AddTempSector adds a sector to temporary storage. The sectors will be deleted + // after the expiration height + AddTempSector(root types.Hash256, expiration uint64) error // AddTemporarySectors adds a list of sectors to the temporary store. // The sectors are not referenced by a contract and will be removed // at the expiration height. + // + // Deprecated: use AddTempSector AddTemporarySectors(sectors []TempSector) error // ExpireTempSectors removes all temporary sectors that expired before // the given height. diff --git a/host/storage/storage.go b/host/storage/storage.go index 5f939fbd..6457e93b 100644 --- a/host/storage/storage.go +++ b/host/storage/storage.go @@ -10,7 +10,8 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" - rhp2 "go.sia.tech/core/rhp/v2" + proto2 "go.sia.tech/core/rhp/v2" + proto4 "go.sia.tech/core/rhp/v4" "go.sia.tech/core/types" "go.sia.tech/hostd/alerts" "go.sia.tech/hostd/internal/threadgroup" @@ -60,14 +61,14 @@ type ( SectorReference struct { Contracts []types.FileContractID `json:"contracts"` TempStorage int `json:"tempStorage"` - Locks int `json:"locks"` } // A VolumeManager manages storage using local volumes. VolumeManager struct { - cacheHits uint64 // ensure 64-bit alignment on 32-bit systems - cacheMisses uint64 - cacheSize int + cacheHits uint64 // ensure 64-bit alignment on 32-bit systems + cacheMisses uint64 + cacheSize int + pruneInterval time.Duration vs VolumeStore recorder *sectorAccessRecorder @@ -80,7 +81,7 @@ type ( volumes map[int64]*volume // changedVolumes tracks volumes that need to be fsynced changedVolumes map[int64]bool - cache *lru.Cache[types.Hash256, *[rhp2.SectorSize]byte] // Added cache + cache *lru.Cache[types.Hash256, *[proto2.SectorSize]byte] // Added cache } ) @@ -156,27 +157,30 @@ func (vm *VolumeManager) loadVolumes() error { // migrateSector migrates a sector to a new location. The sector is read from // its current location and written to its new location. The volume is // immediately synced after the sector is written. -func (vm *VolumeManager) migrateSector(loc SectorLocation) error { +func (vm *VolumeManager) migrateSector(from, to SectorLocation) error { // read the sector from the old location - sector, err := vm.Read(loc.Root) + sector, err := vm.readLocation(from) if err != nil { return fmt.Errorf("failed to read sector: %w", err) } // calculate the returned root - root := rhp2.SectorRoot(sector) + root := proto2.SectorRoot(sector) // verify the the sector is not corrupt - if root != loc.Root { - return fmt.Errorf("sector corrupt: %v != %v", loc.Root, root) + if root != from.Root { + return fmt.Errorf("sector corrupt: %v != %v", from.Root, root) + } else if root != to.Root { + // sanity check + panic("migrateSector called with mismatched roots") } vm.mu.Lock() - vol, ok := vm.volumes[loc.Volume] + vol, ok := vm.volumes[to.Volume] vm.mu.Unlock() if !ok { - return fmt.Errorf("volume %v not found", loc.Volume) + return fmt.Errorf("volume %v not found", from.Volume) } // write the sector to the new location and sync the volume - if err := vol.WriteSector(sector, loc.Index); err != nil { + if err := vol.WriteSector(sector, to.Index); err != nil { return fmt.Errorf("failed to write sector: %w", err) } else if err := vol.Sync(); err != nil { return fmt.Errorf("failed to sync volume: %w", err) @@ -269,8 +273,8 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol // migrate any sectors outside of the target range. var migrated int - migrated, failed, err := vm.vs.MigrateSectors(ctx, id, newMaxSectors, func(newLoc SectorLocation) error { - if err := vm.migrateSector(newLoc); err != nil { + migrated, failed, err := vm.vs.MigrateSectors(ctx, id, newMaxSectors, func(from, to SectorLocation) error { + if err := vm.migrateSector(from, to); err != nil { return err } migrated++ @@ -321,6 +325,49 @@ func (vm *VolumeManager) shrinkVolume(ctx context.Context, id int64, volume *vol return nil } +// writeSector atomically adds a sector to the database and writes it to disk +func (vm *VolumeManager) writeSector(root types.Hash256, data *[proto4.SectorSize]byte) error { + return vm.vs.StoreSector(root, func(loc SectorLocation) error { + start := time.Now() + + vm.mu.Lock() + vol, ok := vm.volumes[loc.Volume] + vm.mu.Unlock() + if !ok { + return fmt.Errorf("volume %v not found", loc.Volume) + } + + // write the sector to the volume + if err := vol.WriteSector(data, loc.Index); err != nil { + stats := vol.Stats() + vm.alerts.Register(alerts.Alert{ + ID: vol.alertID("write"), + Severity: alerts.SeverityError, + Message: "Failed to write sector", + Data: map[string]interface{}{ + "volume": vol.Location(), + "failedReads": stats.FailedReads, + "failedWrites": stats.FailedWrites, + "sector": root, + "error": err.Error(), + }, + Timestamp: time.Now(), + }) + return err + } + vm.log.Debug("wrote sector", zap.String("root", root.String()), zap.Int64("volume", loc.Volume), zap.Uint64("index", loc.Index), zap.Duration("elapsed", time.Since(start))) + + // Add newly written sector to cache + vm.cache.Add(root, data) + + // mark the volume as changed + vm.mu.Lock() + vm.changedVolumes[loc.Volume] = true + vm.mu.Unlock() + return nil + }) +} + // volumeStats returns the stats for a volume. A lock must be held on the volume // manager before this function is called. func (vm *VolumeManager) volumeStats(id int64) VolumeStats { @@ -571,8 +618,8 @@ func (vm *VolumeManager) RemoveVolume(ctx context.Context, id int64, force bool, } doMigration := func() error { - migrated, failed, err = vm.vs.MigrateSectors(ctx, id, 0, func(newLoc SectorLocation) error { - err := vm.migrateSector(newLoc) + migrated, failed, err = vm.vs.MigrateSectors(ctx, id, 0, func(from, to SectorLocation) error { + err := vm.migrateSector(from, to) if err != nil { failed++ } else { @@ -730,21 +777,20 @@ func (vm *VolumeManager) RemoveSector(root types.Hash256) error { } defer done() - // get and lock the sector's current location - loc, release, err := vm.vs.SectorLocation(root) + vm.mu.Lock() + defer vm.mu.Unlock() + + // get the sector's current location + loc, err := vm.vs.SectorLocation(root) if err != nil { return fmt.Errorf("failed to locate sector %v: %w", root, err) } - defer release() // remove the sector from the volume store if err := vm.vs.RemoveSector(root); err != nil { return fmt.Errorf("failed to remove sector %v: %w", root, err) } - vm.mu.Lock() - defer vm.mu.Unlock() - // get the volume from memory vol, ok := vm.volumes[loc.Volume] if !ok { @@ -752,7 +798,7 @@ func (vm *VolumeManager) RemoveSector(root types.Hash256) error { } // zero the sector and immediately sync the volume - var zeroes [rhp2.SectorSize]byte + var zeroes [proto2.SectorSize]byte if err := vol.WriteSector(&zeroes, loc.Index); err != nil { return fmt.Errorf("failed to zero sector %v: %w", root, err) } else if err := vol.Sync(); err != nil { @@ -764,46 +810,12 @@ func (vm *VolumeManager) RemoveSector(root types.Hash256) error { return nil } -// LockSector prevents the sector with the given root from being pruned. If the -// sector does not exist, an error is returned. Release must be called when the -// sector is no longer needed. -func (vm *VolumeManager) LockSector(root types.Hash256) (func() error, error) { - done, err := vm.tg.Add() - if err != nil { - return nil, err - } - defer done() - _, release, err := vm.vs.SectorLocation(root) - return release, err -} - // CacheStats returns the number of cache hits and misses. func (vm *VolumeManager) CacheStats() (hits, misses uint64) { return atomic.LoadUint64(&vm.cacheHits), atomic.LoadUint64(&vm.cacheMisses) } -// Read reads the sector with the given root -func (vm *VolumeManager) Read(root types.Hash256) (*[rhp2.SectorSize]byte, error) { - done, err := vm.tg.Add() - if err != nil { - return nil, err - } - defer done() - - // Check the cache first - if sector, ok := vm.cache.Get(root); ok { - vm.recorder.AddCacheHit() - atomic.AddUint64(&vm.cacheHits, 1) - return sector, nil - } - - // Cache miss, read from disk - loc, release, err := vm.vs.SectorLocation(root) - if err != nil { - return nil, fmt.Errorf("failed to locate sector: %w", err) - } - defer release() - +func (vm *VolumeManager) readLocation(loc SectorLocation) (*[proto2.SectorSize]byte, error) { vm.mu.Lock() v, ok := vm.volumes[loc.Volume] if !ok { @@ -822,7 +834,7 @@ func (vm *VolumeManager) Read(root types.Hash256) (*[rhp2.SectorSize]byte, error "volume": v.Location(), "failedReads": stats.FailedReads, "failedWrites": stats.FailedWrites, - "sector": root, + "sector": loc.Root, "error": err.Error(), }, Timestamp: time.Now(), @@ -831,12 +843,35 @@ func (vm *VolumeManager) Read(root types.Hash256) (*[rhp2.SectorSize]byte, error } // Add sector to cache - vm.cache.Add(root, sector) + vm.cache.Add(loc.Root, sector) vm.recorder.AddCacheMiss() atomic.AddUint64(&vm.cacheMisses, 1) return sector, nil } +// Read reads the sector with the given root +func (vm *VolumeManager) Read(root types.Hash256) (*[proto2.SectorSize]byte, error) { + done, err := vm.tg.Add() + if err != nil { + return nil, err + } + defer done() + + // Check the cache first + if sector, ok := vm.cache.Get(root); ok { + vm.recorder.AddCacheHit() + atomic.AddUint64(&vm.cacheHits, 1) + return sector, nil + } + + // Cache miss, read from disk + loc, err := vm.vs.SectorLocation(root) + if err != nil { + return nil, fmt.Errorf("failed to locate sector: %w", err) + } + return vm.readLocation(loc) +} + // Sync syncs the data files of changed volumes. func (vm *VolumeManager) Sync() error { done, err := vm.tg.Add() @@ -869,57 +904,36 @@ func (vm *VolumeManager) Sync() error { return nil } -// Write writes a sector to a volume. release should only be called after the -// contract roots have been committed to prevent the sector from being deleted. -func (vm *VolumeManager) Write(root types.Hash256, data *[rhp2.SectorSize]byte) (func() error, error) { +// HasSector returns true if the host is storing a sector +func (vm *VolumeManager) HasSector(root types.Hash256) (bool, error) { done, err := vm.tg.Add() if err != nil { - return nil, err + return false, err } defer done() - release, err := vm.vs.StoreSector(root, func(loc SectorLocation, exists bool) error { - if exists { - return nil - } - start := time.Now() - vm.mu.Lock() - vol, ok := vm.volumes[loc.Volume] - vm.mu.Unlock() - if !ok { - return fmt.Errorf("volume %v not found", loc.Volume) - } + return vm.vs.HasSector(root) +} - // write the sector to the volume - if err := vol.WriteSector(data, loc.Index); err != nil { - stats := vol.Stats() - vm.alerts.Register(alerts.Alert{ - ID: vol.alertID("write"), - Severity: alerts.SeverityError, - Message: "Failed to write sector", - Data: map[string]interface{}{ - "volume": vol.Location(), - "failedReads": stats.FailedReads, - "failedWrites": stats.FailedWrites, - "sector": root, - "error": err.Error(), - }, - Timestamp: time.Now(), - }) - return err - } - vm.log.Debug("wrote sector", zap.String("root", root.String()), zap.Int64("volume", loc.Volume), zap.Uint64("index", loc.Index), zap.Duration("elapsed", time.Since(start))) +// StoreSector writes a sector to disk and adds it to temporary storage +func (vm *VolumeManager) StoreSector(root types.Hash256, data *[proto4.SectorSize]byte, expiration uint64) error { + if err := vm.writeSector(root, data); err != nil { + return fmt.Errorf("failed to store sector: %w", err) + } else if err := vm.vs.AddTempSector(root, expiration); err != nil { + return fmt.Errorf("failed to reference temporary sector: %w", err) + } + return nil +} - // Add newly written sector to cache - vm.cache.Add(root, data) +// Write writes a sector to a volume. +func (vm *VolumeManager) Write(root types.Hash256, data *[proto2.SectorSize]byte) error { + done, err := vm.tg.Add() + if err != nil { + return err + } + defer done() - // mark the volume as changed - vm.mu.Lock() - vm.changedVolumes[loc.Volume] = true - vm.mu.Unlock() - return nil - }) - return release, err + return vm.writeSector(root, data) } // AddTemporarySectors adds sectors to the temporary store. The sectors are not @@ -958,7 +972,8 @@ func (vm *VolumeManager) ProcessActions(index types.ChainIndex) error { // NewVolumeManager creates a new VolumeManager. func NewVolumeManager(vs VolumeStore, opts ...VolumeManagerOption) (*VolumeManager, error) { vm := &VolumeManager{ - vs: vs, + pruneInterval: 5 * time.Minute, + vs: vs, log: zap.NewNop(), alerts: alerts.NewNop(), @@ -972,8 +987,28 @@ func NewVolumeManager(vs VolumeStore, opts ...VolumeManagerOption) (*VolumeManag opt(vm) } + go func() { + ctx, cancel, err := vm.tg.AddContext(context.Background()) + if err != nil { + vm.log.Debug("failed to start pruning thread", zap.Error(err)) + return + } + defer cancel() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(vm.pruneInterval): + if err := vm.vs.PruneSectors(ctx, time.Now().Add(-1*vm.pruneInterval)); err != nil && !errors.Is(err, context.Canceled) { + vm.log.Error("failed to prune sectors", zap.Error(err)) + } + } + } + }() + // Initialize cache with LRU eviction and a max capacity of 64 - cache, err := lru.New[types.Hash256, *[rhp2.SectorSize]byte](64) + cache, err := lru.New[types.Hash256, *[proto2.SectorSize]byte](64) if err != nil { return nil, fmt.Errorf("failed to initialize cache: %w", err) } diff --git a/host/storage/storage_test.go b/host/storage/storage_test.go index d4f69b71..11e4e1da 100644 --- a/host/storage/storage_test.go +++ b/host/storage/storage_test.go @@ -57,13 +57,10 @@ func TestVolumeLoad(t *testing.T) { var sector [rhp2.SectorSize]byte frand.Read(sector[:]) root := rhp2.SectorRoot(§or) - release, err := vm.Write(root, §or) - if err != nil { + if err = vm.Write(root, §or); err != nil { t.Fatal(err) } else if err := vm.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: 1}}); err != nil { // must add a temp sector to prevent pruning t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } // close the volume manager @@ -103,10 +100,7 @@ func TestVolumeLoad(t *testing.T) { // write a new sector frand.Read(sector[:]) root = rhp2.SectorRoot(§or) - release, err = vm.Write(root, §or) - if err != nil { - t.Fatal(err) - } else if err := release(); err != nil { + if err = vm.Write(root, §or); err != nil { t.Fatal(err) } } @@ -193,13 +187,11 @@ func TestRemoveVolume(t *testing.T) { roots[i] = rhp2.SectorRoot(§or) // write the sector - release, err := vm.Write(roots[i], §or) - if err != nil { + + if err := vm.Write(roots[i], §or); err != nil { t.Fatal(err) } else if err := vm.AddTemporarySectors([]storage.TempSector{{Root: roots[i], Expiration: 1}}); err != nil { // must add a temp sector to prevent pruning t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } } @@ -232,8 +224,7 @@ func TestRemoveVolume(t *testing.T) { if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { // blocking error should be nil t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { - // async error should be ErrMigrationFailed + } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } @@ -260,7 +251,7 @@ func TestRemoveVolume(t *testing.T) { // but some sectors should be migrated to the second volume. if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { + } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { t.Fatal(err) } @@ -309,6 +300,20 @@ func TestRemoveCorrupt(t *testing.T) { } defer db.Close() + assertMetrics := func(t *testing.T, physical, lost, total uint64) { + t.Helper() + + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != total { + t.Fatalf("expected %v total sectors, got %v", total, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != physical { + t.Fatalf("expected %v used sectors, got %v", physical, m.Storage.PhysicalSectors) + } else if m.Storage.LostSectors != lost { + t.Fatalf("expected %v lost sectors, got %v", lost, m.Storage.LostSectors) + } + } + // initialize the storage manager vm, err := storage.NewVolumeManager(db, storage.WithLogger(log.Named("volumes"))) if err != nil { @@ -316,6 +321,8 @@ func TestRemoveCorrupt(t *testing.T) { } defer vm.Close() + assertMetrics(t, 0, 0, 0) + result := make(chan error, 1) volumePath := filepath.Join(t.TempDir(), "hostdata.dat") volume, err := vm.AddVolume(context.Background(), volumePath, expectedSectors, result) @@ -325,59 +332,44 @@ func TestRemoveCorrupt(t *testing.T) { t.Fatal(err) } - for i := 0; i < 10; i++ { - if _, err := storeRandomSector(vm, 1); err != nil { + assertVolume := func(t *testing.T, volumeID int64, used, total uint64) { + t.Helper() + + if vol, err := vm.Volume(volumeID); err != nil { t.Fatal(err) + } else if vol.Status != storage.VolumeStatusReady { + t.Fatal("volume should be ready") + } else if vol.UsedSectors != used { + t.Fatalf("expected %v used sectors, got %v", used, vol.UsedSectors) + } else if vol.TotalSectors != total { + t.Fatalf("expected %v total sectors, got %v", total, vol.TotalSectors) } } - // check that the volume metrics did not change - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) - } + assertVolume(t, volume.ID, 0, expectedSectors) + assertMetrics(t, 0, 0, expectedSectors) - if vol, err := vm.Volume(volume.ID); err != nil { - t.Fatal(err) - } else if vol.Status != storage.VolumeStatusReady { - t.Fatal("volume should be ready") - } else if vol.UsedSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) - } else if vol.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) + for i := 0; i < 10; i++ { + if _, err := storeRandomSector(vm, 1); err != nil { + t.Fatal(err) + } } - // attempt to remove the volume. Should return ErrMigrationFailed since + assertMetrics(t, 10, 0, expectedSectors) + assertVolume(t, volume.ID, 10, expectedSectors) + + // attempt to remove the volume. Should return ErrNotEnoughStorage since // there is only one volume. if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { // blocking error should be nil t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { - // async error should be ErrMigrationFailed - t.Fatalf("expected ErrMigrationFailed, got %v", err) - } - - // check that the volume metrics did not change - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { + t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } - if vol, err := vm.Volume(volume.ID); err != nil { - t.Fatal(err) - } else if vol.Status != storage.VolumeStatusReady { - t.Fatal("volume should be ready") - } else if vol.UsedSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) - } else if vol.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) - } + // check that the metrics did not change + assertMetrics(t, 10, 0, expectedSectors) + assertVolume(t, volume.ID, 10, expectedSectors) f, err := os.OpenFile(volumePath, os.O_RDWR, 0) if err != nil { @@ -394,34 +386,6 @@ func TestRemoveCorrupt(t *testing.T) { t.Fatal(err) } - // remove the volume - if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { - t.Fatal(err) // blocking error should be nil - } else if err := <-result; err == nil { - t.Fatal("expected error when removing corrupt volume", err) - } else if !errors.Is(err, storage.ErrMigrationFailed) { - t.Fatalf("expected ErrMigrationFailed, got %v", err) - } - - // check that the volume metrics did not change - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) - } - - if vol, err := vm.Volume(volume.ID); err != nil { - t.Fatal(err) - } else if vol.Status != storage.VolumeStatusReady { - t.Fatal("volume should be ready") - } else if vol.UsedSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) - } else if vol.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) - } - // add a second volume to accept the data volume2, err := vm.AddVolume(context.Background(), filepath.Join(t.TempDir(), "hostdata.dat"), expectedSectors, result) if err != nil { @@ -430,15 +394,25 @@ func TestRemoveCorrupt(t *testing.T) { t.Fatal(err) } - // check that the volume metrics doubled - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors*2 { - t.Fatalf("expected %v total sectors, got %v", expectedSectors*2, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) + // check that the total metrics doubled, but the volume metrics are unchanged + assertMetrics(t, 10, 0, expectedSectors*2) + assertVolume(t, volume.ID, 10, expectedSectors) + assertVolume(t, volume2.ID, 0, expectedSectors) + + // remove the volume + if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { + t.Fatal(err) // blocking error should be nil + } else if err := <-result; err == nil { + t.Fatal("expected error when removing corrupt volume", err) + } else if !errors.Is(err, storage.ErrMigrationFailed) { + t.Fatalf("expected ErrMigrationFailed, got %v", err) } + // check that only the one failed sector is left in the original volume + assertMetrics(t, 10, 0, expectedSectors*2) + assertVolume(t, volume.ID, 1, expectedSectors) + assertVolume(t, volume2.ID, 9, expectedSectors) + // force remove the volume if err := vm.RemoveVolume(context.Background(), volume.ID, true, result); err != nil { t.Fatal(err) @@ -449,24 +423,11 @@ func TestRemoveCorrupt(t *testing.T) { } // check that the corrupt sector was removed from the volume metrics - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors-1, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 9 { - t.Fatalf("expected 9 used sectors, got %v", m.Storage.PhysicalSectors) - } else if m.Storage.LostSectors != 1 { - t.Fatalf("expected 1 lost sectors, got %v", m.Storage.LostSectors) - } + assertMetrics(t, 9, 1, expectedSectors) + assertVolume(t, volume2.ID, 9, expectedSectors) - if vol, err := vm.Volume(volume2.ID); err != nil { - t.Fatal(err) - } else if vol.Status != storage.VolumeStatusReady { - t.Fatal("volume should be ready") - } else if vol.UsedSectors != 9 { - t.Fatalf("expected 9 used sectors, got %v", vol.UsedSectors) - } else if vol.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) + if _, err := vm.Volume(volume.ID); !errors.Is(err, storage.ErrVolumeNotFound) { + t.Fatalf("expected ErrVolumeNotFound, got %v", err) } } @@ -482,6 +443,20 @@ func TestRemoveMissing(t *testing.T) { } defer db.Close() + assertMetrics := func(t *testing.T, physical, lost, total uint64) { + t.Helper() + + if m, err := db.Metrics(time.Now()); err != nil { + t.Fatal(err) + } else if m.Storage.TotalSectors != total { + t.Fatalf("expected %v total sectors, got %v", total, m.Storage.TotalSectors) + } else if m.Storage.PhysicalSectors != physical { + t.Fatalf("expected %v used sectors, got %v", physical, m.Storage.PhysicalSectors) + } else if m.Storage.LostSectors != lost { + t.Fatalf("expected %v lost sectors, got %v", lost, m.Storage.LostSectors) + } + } + // initialize the storage manager vm, err := storage.NewVolumeManager(db, storage.WithLogger(log.Named("volumes"))) if err != nil { @@ -489,6 +464,8 @@ func TestRemoveMissing(t *testing.T) { } defer vm.Close() + assertMetrics(t, 0, 0, 0) + result := make(chan error, 1) volumePath := filepath.Join(t.TempDir(), "hostdata.dat") volume, err := vm.AddVolume(context.Background(), volumePath, expectedSectors, result) @@ -498,38 +475,44 @@ func TestRemoveMissing(t *testing.T) { t.Fatal(err) } + assertVolume := func(t *testing.T, volumeID int64, used, total uint64) { + t.Helper() + + if vol, err := vm.Volume(volumeID); err != nil { + t.Fatal(err) + } else if vol.UsedSectors != used { + t.Fatalf("expected %v used sectors, got %v", used, vol.UsedSectors) + } else if vol.TotalSectors != total { + t.Fatalf("expected %v total sectors, got %v", total, vol.TotalSectors) + } + } + + assertMetrics(t, 0, 0, expectedSectors) + assertVolume(t, volume.ID, 0, expectedSectors) + + assertVolume(t, volume.ID, 0, expectedSectors) + assertMetrics(t, 0, 0, expectedSectors) + for i := 0; i < 10; i++ { if _, err := storeRandomSector(vm, 1); err != nil { t.Fatal(err) } } - // attempt to remove the volume. Should return ErrMigrationFailed since + assertMetrics(t, 10, 0, expectedSectors) + assertVolume(t, volume.ID, 10, expectedSectors) + + // attempt to remove the volume. Should return ErrNotEnoughStorage since // there is only one volume. if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { - t.Fatalf("expected ErrMigrationFailed, got %v", err) + } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { + t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } // check that the volume metrics did not change - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) - } - - if vol, err := vm.Volume(volume.ID); err != nil { - t.Fatal(err) - } else if vol.Status != storage.VolumeStatusReady { - t.Fatal("volume should be ready") - } else if vol.UsedSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) - } else if vol.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) - } + assertMetrics(t, 10, 0, expectedSectors) + assertVolume(t, volume.ID, 10, expectedSectors) // close the volume manager if err := vm.Close(); err != nil { @@ -549,24 +532,22 @@ func TestRemoveMissing(t *testing.T) { defer vm.Close() // check that the volume metrics did not change - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) - } + assertMetrics(t, 10, 0, expectedSectors) + assertVolume(t, volume.ID, 10, expectedSectors) - if vol, err := vm.Volume(volume.ID); err != nil { + // add a volume to accept the data + volume2, err := vm.AddVolume(context.Background(), filepath.Join(t.TempDir(), "hostdata.dat"), expectedSectors, result) + if err != nil { + t.Fatal(err) + } else if err := <-result; err != nil { t.Fatal(err) - } else if vol.Status != storage.VolumeStatusUnavailable { - t.Fatal("volume should be unavailable") - } else if vol.UsedSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) - } else if vol.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) } + // check that the total metrics doubled and the volume metrics did not change + assertMetrics(t, 10, 0, expectedSectors*2) + assertVolume(t, volume.ID, 10, expectedSectors) + assertVolume(t, volume2.ID, 0, expectedSectors) + // remove the volume if err := vm.RemoveVolume(context.Background(), volume.ID, false, result); err != nil { t.Fatal(err) @@ -575,40 +556,20 @@ func TestRemoveMissing(t *testing.T) { } // check that the volume metrics did not change - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", m.Storage.PhysicalSectors) - } - - if vol, err := vm.Volume(volume.ID); err != nil { - t.Fatal(err) - } else if vol.Status != storage.VolumeStatusUnavailable { - t.Fatal("volume should be unavailable") - } else if vol.UsedSectors != 10 { - t.Fatalf("expected 10 used sectors, got %v", vol.UsedSectors) - } else if vol.TotalSectors != expectedSectors { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, vol.TotalSectors) - } + assertMetrics(t, 10, 0, expectedSectors*2) + assertVolume(t, volume.ID, 10, expectedSectors) + assertVolume(t, volume2.ID, 0, expectedSectors) + // force remve the volume if err := vm.RemoveVolume(context.Background(), volume.ID, true, result); err != nil { t.Fatal(err) } else if err := <-result; err != nil { t.Fatal(err) } - // check that the volume metrics did not change - if m, err := db.Metrics(time.Now()); err != nil { - t.Fatal(err) - } else if m.Storage.TotalSectors != 0 { - t.Fatalf("expected %v total sectors, got %v", expectedSectors, m.Storage.TotalSectors) - } else if m.Storage.PhysicalSectors != 0 { - t.Fatalf("expected 0 used sectors, got %v", m.Storage.PhysicalSectors) - } else if m.Storage.LostSectors != 10 { - t.Fatalf("expected 10 lost sectors, got %v", m.Storage.LostSectors) - } + // check that the sectors were marked as lost + assertMetrics(t, 0, 10, expectedSectors) + assertVolume(t, volume2.ID, 0, expectedSectors) if _, err := vm.Volume(volume.ID); !errors.Is(err, storage.ErrVolumeNotFound) { t.Fatalf("expected ErrVolumeNotFound, got %v", err) @@ -722,7 +683,7 @@ func TestVolumeConcurrency(t *testing.T) { } // try to write a sector to the volume, which should fail - if _, err := vm.Write(root, §or); !errors.Is(err, storage.ErrNotEnoughStorage) { + if err := vm.Write(root, §or); !errors.Is(err, storage.ErrNotEnoughStorage) { t.Fatalf("expected %v, got %v", storage.ErrNotEnoughStorage, err) } @@ -748,7 +709,7 @@ func TestVolumeConcurrency(t *testing.T) { } // write the sector again, which should succeed - if _, err := vm.Write(root, §or); err != nil { + if err := vm.Write(root, §or); err != nil { t.Fatal(err) } } @@ -907,11 +868,9 @@ func TestVolumeShrink(t *testing.T) { } sectorLocation := func(root types.Hash256) (storage.SectorLocation, error) { - loc, release, err := db.SectorLocation(root) + loc, err := db.SectorLocation(root) if err != nil { return loc, err - } else if err := release(); err != nil { - return loc, err } return loc, nil } @@ -934,8 +893,8 @@ func TestVolumeShrink(t *testing.T) { remainingSectors := uint64(sectors - toRemove) if err := vm.ResizeVolume(context.Background(), volume.ID, remainingSectors, result); err != nil { t.Fatal(err) - } else if err := <-result; !errors.Is(err, storage.ErrMigrationFailed) { - t.Fatalf("expected ErrMigrationFailed, got %v", err) + } else if err := <-result; !errors.Is(err, storage.ErrNotEnoughStorage) { + t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } // remove some sectors from the beginning of the volume @@ -1061,17 +1020,16 @@ func storeRandomSector(vm *storage.VolumeManager, expiration uint64) (types.Hash return types.Hash256{}, fmt.Errorf("failed to generate random sector: %w", err) } root := rhp2.SectorRoot(§or) - release, err := vm.Write(root, §or) + err := vm.Write(root, §or) if err != nil { return types.Hash256{}, fmt.Errorf("failed to write sector: %w", err) } - defer release() err = vm.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: expiration}}) if err != nil { return types.Hash256{}, fmt.Errorf("failed to add temporary sector: %w", err) } - return root, release() + return root, nil } func TestSectorCache(t *testing.T) { @@ -1180,6 +1138,91 @@ func TestSectorCache(t *testing.T) { } } +func TestStoragePrune(t *testing.T) { + const sectors = 10 + dir := t.TempDir() + + // create the database + log := zaptest.NewLogger(t) + db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // initialize the storage manager + vm, err := storage.NewVolumeManager(db, storage.WithLogger(log.Named("volumes")), storage.WithCacheSize(0), storage.WithPruneInterval(500*time.Millisecond)) + if err != nil { + t.Fatal(err) + } + defer vm.Close() + + result := make(chan error, 1) + volumeFilePath := filepath.Join(t.TempDir(), "hostdata.dat") + vol, err := vm.AddVolume(context.Background(), volumeFilePath, sectors, result) + if err != nil { + t.Fatal(err) + } else if err := <-result; err != nil { + t.Fatal(err) + } + + assertUsedSectors := func(t *testing.T, used uint64) { + t.Helper() + + time.Sleep(2 * time.Second) // note: longer than prune interval for timing issues + volume, err := vm.Volume(vol.ID) + if err != nil { + t.Fatal(err) + } else if volume.UsedSectors != used { + t.Fatalf("expected %v used sectors, got %v", used, volume.UsedSectors) + } + + m, err := db.Metrics(time.Now()) + if err != nil { + t.Fatal(err) + } else if m.Storage.PhysicalSectors != used { + t.Fatalf("expected %v used sectors, got %v", used, m.Storage.PhysicalSectors) + } + } + + assertUsedSectors(t, 0) + + storeRandomSector := func(t *testing.T, expiration uint64) types.Hash256 { + t.Helper() + + var sector [rhp2.SectorSize]byte + if _, err := frand.Read(sector[:256]); err != nil { + t.Fatal("failed to generate random sector:", err) + } + root := rhp2.SectorRoot(§or) + if err = vm.StoreSector(root, §or, expiration); err != nil { + t.Fatal("failed to store sector:", err) + } + return root + } + + roots := make([]types.Hash256, 0, sectors) + // fill the volume + for i := 0; i < cap(roots); i++ { + storeRandomSector(t, uint64(i+1)) + } + + // ensure the sectors are not pruned immediately + assertUsedSectors(t, sectors) + + // expire half the sectors + if err := db.ExpireTempSectors(5); err != nil { + t.Fatal(err) + } + assertUsedSectors(t, 5) + + // expire the remaining sectors + if err := db.ExpireTempSectors(10); err != nil { + t.Fatal(err) + } + assertUsedSectors(t, 0) +} + func BenchmarkVolumeManagerWrite(b *testing.B) { dir := b.TempDir() @@ -1221,11 +1264,9 @@ func BenchmarkVolumeManagerWrite(b *testing.B) { // fill the volume for i := 0; i < b.N; i++ { root, sector := roots[i], sectors[i] - release, err := vm.Write(root, §or) + err := vm.Write(root, §or) if err != nil { b.Fatal(i, err) - } else if err := release(); err != nil { - b.Fatal(i, err) } } } @@ -1298,11 +1339,9 @@ func BenchmarkVolumeManagerRead(b *testing.B) { var sector [rhp2.SectorSize]byte frand.Read(sector[:256]) root := rhp2.SectorRoot(§or) - release, err := vm.Write(root, §or) + err := vm.Write(root, §or) if err != nil { b.Fatal(i, err) - } else if err := release(); err != nil { - b.Fatal(i, err) } written = append(written, root) } @@ -1350,11 +1389,9 @@ func BenchmarkVolumeRemove(b *testing.B) { var sector [rhp2.SectorSize]byte frand.Read(sector[:256]) root := rhp2.SectorRoot(§or) - release, err := vm.Write(root, §or) + err := vm.Write(root, §or) if err != nil { b.Fatal(i, err) - } else if err := release(); err != nil { - b.Fatal(i, err) } } diff --git a/persist/sqlite/contracts.go b/persist/sqlite/contracts.go index 9e995157..4b604743 100644 --- a/persist/sqlite/contracts.go +++ b/persist/sqlite/contracts.go @@ -25,7 +25,7 @@ type ( var _ contracts.ContractStore = (*Store)(nil) -func (s *Store) batchExpireContractSectors(height uint64) (expired int, removed []types.Hash256, err error) { +func (s *Store) batchExpireContractSectors(height uint64) (expired int, err error) { err = s.transaction(func(tx *txn) (err error) { sectorIDs, err := deleteExpiredContractSectors(tx, height) if err != nil { @@ -37,14 +37,12 @@ func (s *Store) batchExpireContractSectors(height uint64) (expired int, removed if err := incrementNumericStat(tx, metricContractSectors, -len(sectorIDs), time.Now()); err != nil { return fmt.Errorf("failed to decrement contract sectors: %w", err) } - - removed, err = pruneSectors(tx, sectorIDs) - return err + return nil }) return } -func (s *Store) batchExpireV2ContractSectors(height uint64) (expired int, removed []types.Hash256, err error) { +func (s *Store) batchExpireV2ContractSectors(height uint64) (expired int, err error) { err = s.transaction(func(tx *txn) (err error) { sectorIDs, err := deleteExpiredV2ContractSectors(tx, height) if err != nil { @@ -56,9 +54,7 @@ func (s *Store) batchExpireV2ContractSectors(height uint64) (expired int, remove if err := incrementNumericStat(tx, metricContractSectors, -len(sectorIDs), time.Now()); err != nil { return fmt.Errorf("failed to decrement contract sectors: %w", err) } - - removed, err = pruneSectors(tx, sectorIDs) - return err + return nil }) return } @@ -492,13 +488,13 @@ func (s *Store) ExpireContractSectors(height uint64) error { log := s.log.Named("ExpireContractSectors").With(zap.Uint64("height", height)) // delete in batches to avoid holding a lock on the database for too long for i := 0; ; i++ { - expired, removed, err := s.batchExpireContractSectors(height) + expired, err := s.batchExpireContractSectors(height) if err != nil { return fmt.Errorf("failed to prune sectors: %w", err) } else if expired == 0 { return nil } - log.Debug("removed sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i)) + log.Debug("removed sectors", zap.Int("expired", expired), zap.Int("batch", i)) jitterSleep(time.Millisecond) // allow other transactions to run } } @@ -509,13 +505,13 @@ func (s *Store) ExpireV2ContractSectors(height uint64) error { log := s.log.Named("ExpireV2ContractSectors").With(zap.Uint64("height", height)) // delete in batches to avoid holding a lock on the database for too long for i := 0; ; i++ { - expired, removed, err := s.batchExpireV2ContractSectors(height) + expired, err := s.batchExpireV2ContractSectors(height) if err != nil { return fmt.Errorf("failed to prune sectors: %w", err) } else if expired == 0 { return nil } - log.Debug("removed sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i)) + log.Debug("removed sectors", zap.Int("expired", expired), zap.Int("batch", i)) jitterSleep(time.Millisecond) // allow other transactions to run } } @@ -573,10 +569,6 @@ WHERE contract_id=$1 AND root_index=$2`, contractID, index) if err != nil { return types.Hash256{}, fmt.Errorf("failed to update sector ID: %w", err) } - // prune the old sector ID - if _, err := pruneSectors(tx, []int64{ref.sectorID}); err != nil { - return types.Hash256{}, fmt.Errorf("failed to prune old sector: %w", err) - } return ref.root, nil } @@ -653,7 +645,6 @@ LIMIT 1`) return nil, fmt.Errorf("failed to prepare delete statement: %w", err) } - sectorIDs := make([]int64, 0, n) roots := make([]types.Hash256, n) for i := 0; i < int(n); i++ { var contractSectorID int64 @@ -670,7 +661,6 @@ LIMIT 1`) return nil, fmt.Errorf("expected 1 row affected, got %v", n) } - sectorIDs = append(sectorIDs, sectorID) roots[len(roots)-i-1] = root // reverse order } @@ -678,12 +668,7 @@ LIMIT 1`) return nil, fmt.Errorf("failed to decrement contract sectors: %w", err) } - removed, err := pruneSectors(tx, sectorIDs) - if err != nil { - return nil, fmt.Errorf("failed to prune sectors: %w", err) - } - - log.Debug("trimmed sectors", zap.Stringers("trimmed", roots), zap.Stringers("removed", removed)) + log.Debug("trimmed sectors", zap.Stringers("trimmed", roots)) return roots, nil } diff --git a/persist/sqlite/contracts_test.go b/persist/sqlite/contracts_test.go index 5cd7d0f3..fa61abdb 100644 --- a/persist/sqlite/contracts_test.go +++ b/persist/sqlite/contracts_test.go @@ -1,6 +1,7 @@ package sqlite import ( + "context" "errors" "fmt" "path/filepath" @@ -113,6 +114,11 @@ func TestReviseContract(t *testing.T) { // checkConsistency is a helper function that verifies the expected sector // roots are consistent with the database checkConsistency := func(roots []types.Hash256, expected int) error { + // prune all possible sectors + if err := db.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { + return fmt.Errorf("failed to prune sectors: %w", err) + } + dbRoot, err := db.dbRoots(contract.Revision.ParentID) if err != nil { return fmt.Errorf("failed to get sector roots: %w", err) @@ -133,11 +139,9 @@ func TestReviseContract(t *testing.T) { return fmt.Errorf("sector root mismatch: expected %v, got %v", roots[i], root) } - _, release, err := db.SectorLocation(root) + _, err = db.SectorLocation(root) if err != nil { return fmt.Errorf("failed to get sector location: %w", err) - } else if err := release(); err != nil { - return fmt.Errorf("failed to release sector location: %w", err) } } @@ -279,35 +283,25 @@ func TestReviseContract(t *testing.T) { func() { t.Log("revising contract:", test.name) oldRoots := append([]types.Hash256(nil), roots...) - // update the expected roots - var releaseFuncs []func() error - defer func() { - for _, release := range releaseFuncs { - if err := release(); err != nil { - t.Fatal(err) - } - } - }() + for i, change := range test.changes { switch change.Action { case contracts.SectorActionAppend: // add a random sector root root := frand.Entropy256() - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { t.Fatal(err) } - releaseFuncs = append(releaseFuncs, release) test.changes[i].Root = root roots = append(roots, root) case contracts.SectorActionUpdate: // replace with a random sector root root := frand.Entropy256() - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { t.Fatal(err) } - releaseFuncs = append(releaseFuncs, release) test.changes[i].Root = root if test.errors && change.A >= uint64(len(roots)) { // test failure @@ -337,12 +331,6 @@ func TestReviseContract(t *testing.T) { t.Fatal("expected error") } - for _, release := range releaseFuncs { - if err := release(); err != nil { - t.Fatal(err) - } - } - if err := checkConsistency(roots, test.sectors); err != nil { t.Fatal(err) } @@ -580,28 +568,20 @@ WHERE c.contract_id=$1 AND csr.root_index= $2`) appendSectors := func(t *testing.T, n int) { t.Helper() - var releaseFn []func() error var appended []types.Hash256 for i := 0; i < n; i++ { root := frand.Entropy256() - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { t.Fatal("failed to store sector:", err) } appended = append(appended, root) - releaseFn = append(releaseFn, release) } newRoots := append(append([]types.Hash256(nil), roots...), appended...) if err := db.ReviseV2Contract(contract.ID, contract.V2FileContract, roots, newRoots, proto4.Usage{}); err != nil { t.Fatal("failed to revise contract:", err) } - for _, fn := range releaseFn { - if err := fn(); err != nil { - t.Fatal("failed to release sector:", err) - } - } - checkRootConsistency(t, newRoots) checkMetricConsistency(t, uint64(len(newRoots))) roots = newRoots @@ -720,28 +700,21 @@ func BenchmarkTrimSectors(b *testing.B) { roots := make([]types.Hash256, 0, b.N) appendActions := make([]contracts.SectorChange, 0, b.N) - releaseFuncs := make([]func() error, 0, b.N) for i := 0; i < b.N; i++ { root := frand.Entropy256() roots = append(roots, root) appendActions = append(appendActions, contracts.SectorChange{Action: contracts.SectorActionAppend, Root: root}) - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { b.Fatal(err) } - releaseFuncs = append(releaseFuncs, release) } if err := db.ReviseContract(contract, nil, contracts.Usage{}, appendActions); err != nil { b.Fatal(err) } - for _, fn := range releaseFuncs { - if err := fn(); err != nil { - b.Fatal(err) - } - } b.ResetTimer() b.ReportAllocs() @@ -787,13 +760,11 @@ func BenchmarkV2AppendSectors(b *testing.B) { root := types.Hash256(frand.Entropy256()) roots = append(roots, root) - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { b.Fatal(err) } else if err := db.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: 100}}); err != nil { b.Fatal(err) - } else if err := release(); err != nil { - b.Fatal(err) } } @@ -841,13 +812,11 @@ func BenchmarkV2TrimSectors(b *testing.B) { root := types.Hash256(frand.Entropy256()) roots = append(roots, root) - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) if err != nil { b.Fatal(err) } else if err := db.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: 100}}); err != nil { b.Fatal(err) - } else if err := release(); err != nil { - b.Fatal(err) } } diff --git a/persist/sqlite/init.sql b/persist/sqlite/init.sql index 4b347250..1015a34a 100644 --- a/persist/sqlite/init.sql +++ b/persist/sqlite/init.sql @@ -30,12 +30,6 @@ CREATE TABLE stored_sectors ( CREATE INDEX stored_sectors_sector_root ON stored_sectors(sector_root); CREATE INDEX stored_sectors_last_access ON stored_sectors(last_access_timestamp); -CREATE TABLE locked_sectors ( -- should be cleared at startup. currently persisted for simplicity, but may be moved to memory - id INTEGER PRIMARY KEY, - sector_id INTEGER NOT NULL REFERENCES stored_sectors(id) -); -CREATE INDEX locked_sectors_sector_id ON locked_sectors(sector_id); - CREATE TABLE storage_volumes ( id INTEGER PRIMARY KEY, disk_path TEXT UNIQUE NOT NULL, @@ -61,12 +55,6 @@ CREATE INDEX volume_sectors_volume_id ON volume_sectors(volume_id); CREATE INDEX volume_sectors_volume_index ON volume_sectors(volume_index ASC); CREATE INDEX volume_sectors_sector_id ON volume_sectors(sector_id); -CREATE TABLE locked_volume_sectors ( -- should be cleared at startup. currently persisted for simplicity, but may be moved to memory - id INTEGER PRIMARY KEY, - volume_sector_id INTEGER REFERENCES volume_sectors(id) ON DELETE CASCADE -); -CREATE INDEX locked_volume_sectors_sector_id ON locked_volume_sectors(volume_sector_id); - CREATE TABLE contract_renters ( id INTEGER PRIMARY KEY, public_key BLOB UNIQUE NOT NULL diff --git a/persist/sqlite/migrations.go b/persist/sqlite/migrations.go index 0a2ab6af..e38d215e 100644 --- a/persist/sqlite/migrations.go +++ b/persist/sqlite/migrations.go @@ -10,6 +10,18 @@ import ( "go.uber.org/zap" ) +// migrateVersion34 removes the lock tables +func migrateVersion34(tx *txn, log *zap.Logger) error { + _, err := tx.Exec(`DROP TABLE locked_sectors; +DROP TABLE locked_volume_sectors;`) + if err != nil { + return fmt.Errorf("failed to remove lock tables: %w", err) + } else if err := recalcVolumeMetrics(tx, log); err != nil { + return fmt.Errorf("failed to recalculate volume metrics: %w", err) + } + return nil +} + // migrateVersion33 adds the contract_v2_account_funding table. func migrateVersion33(tx *txn, _ *zap.Logger) error { _, err := tx.Exec(`CREATE TABLE contract_v2_account_funding ( @@ -974,4 +986,5 @@ var migrations = []func(tx *txn, log *zap.Logger) error{ migrateVersion31, migrateVersion32, migrateVersion33, + migrateVersion34, } diff --git a/persist/sqlite/recalc.go b/persist/sqlite/recalc.go index 91dc7e70..42a5d797 100644 --- a/persist/sqlite/recalc.go +++ b/persist/sqlite/recalc.go @@ -82,6 +82,58 @@ func recalcContractAccountFunding(tx *txn, _ *zap.Logger) error { return nil } +func recalcVolumeMetrics(tx *txn, log *zap.Logger) error { + const query = `SELECT volume_id, COUNT(*) AS total_sectors, COUNT(CASE WHEN sector_id IS NOT NULL THEN 1 END) AS used_sectors +FROM volume_sectors +GROUP BY volume_id` + + type volCount struct { + ID int64 + UsedSectors uint64 + TotalSectors uint64 + } + rows, err := tx.Query(query) + if err != nil { + return fmt.Errorf("failed to query volume metrics: %w", err) + } + defer rows.Close() + + var totalSectors, physicalSectors uint64 + var volumes []volCount + for rows.Next() { + var vol volCount + if err := rows.Scan(&vol.ID, &vol.TotalSectors, &vol.UsedSectors); err != nil { + return fmt.Errorf("failed to scan volume: %w", err) + } + volumes = append(volumes, vol) + totalSectors += vol.TotalSectors + physicalSectors += vol.UsedSectors + log.Debug("calculated volume metrics", zap.Int64("id", vol.ID), zap.Uint64("used", vol.UsedSectors), zap.Uint64("total", vol.TotalSectors)) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to get rows: %w", err) + } + + stmt, err := tx.Prepare(`UPDATE storage_volumes SET used_sectors=$1, total_sectors=$2 WHERE id=$3`) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + for _, vol := range volumes { + if _, err := stmt.Exec(vol.UsedSectors, vol.TotalSectors, vol.ID); err != nil { + return fmt.Errorf("failed to update volume: %w", err) + } + } + + if err := setNumericStat(tx, metricTotalSectors, totalSectors, time.Now()); err != nil { + return fmt.Errorf("failed to set total sectors: %w", err) + } else if err := setNumericStat(tx, metricPhysicalSectors, physicalSectors, time.Now()); err != nil { + return fmt.Errorf("failed to set physical sectors: %w", err) + } + return nil +} + func recalcContractMetrics(tx *txn, log *zap.Logger) error { rows, err := tx.Query(`SELECT contract_status, locked_collateral, risked_collateral, rpc_revenue, storage_revenue, ingress_revenue, egress_revenue, account_funding, registry_read, registry_write FROM contracts WHERE contract_status IN (?, ?);`, contracts.ContractStatusActive, contracts.ContractStatusSuccessful) if err != nil { diff --git a/persist/sqlite/sectors.go b/persist/sqlite/sectors.go index 50c6b6e0..bcac9a13 100644 --- a/persist/sqlite/sectors.go +++ b/persist/sqlite/sectors.go @@ -1,6 +1,7 @@ package sqlite import ( + "context" "database/sql" "errors" "fmt" @@ -12,7 +13,7 @@ import ( ) func deleteTempSectors(tx *txn, height uint64) (sectorIDs []int64, err error) { - const query = `DELETE FROM temp_storage_sector_roots + const query = `DELETE FROM temp_storage_sector_roots WHERE id IN (SELECT id FROM temp_storage_sector_roots WHERE expiration_height <= $1 LIMIT $2) RETURNING sector_id;` @@ -46,8 +47,6 @@ func (s *Store) batchExpireTempSectors(height uint64) (expired int, pruned []typ if err := incrementNumericStat(tx, metricTempSectors, -len(sectorIDs), time.Now()); err != nil { return fmt.Errorf("failed to update metric: %w", err) } - - pruned, err = pruneSectors(tx, sectorIDs) return err }) return @@ -81,12 +80,9 @@ func (s *Store) RemoveSector(root types.Hash256) (err error) { } // SectorLocation returns the location of a sector or an error if the -// sector is not found. The sector is locked until release is -// called. -func (s *Store) SectorLocation(root types.Hash256) (storage.SectorLocation, func() error, error) { - var lockID int64 - var location storage.SectorLocation - err := s.transaction(func(tx *txn) error { +// sector is not found. +func (s *Store) SectorLocation(root types.Hash256) (location storage.SectorLocation, err error) { + err = s.transaction(func(tx *txn) error { sectorID, err := sectorDBID(tx, root) if errors.Is(err, sql.ErrNoRows) { return storage.ErrSectorNotFound @@ -97,25 +93,36 @@ func (s *Store) SectorLocation(root types.Hash256) (storage.SectorLocation, func if err != nil { return fmt.Errorf("failed to get sector location: %w", err) } - lockID, err = lockSector(tx, sectorID) - if err != nil { - return fmt.Errorf("failed to lock sector: %w", err) - } return nil }) - if err != nil { - return storage.SectorLocation{}, nil, err - } - unlock := func() error { - return s.transaction(func(tx *txn) error { - return unlockSector(tx, s.log.Named("SectorLocation"), lockID) - }) - } - return location, unlock, nil + return +} + +// AddTempSector adds a sector to temporary storage. The sectors will be deleted +// after the expiration height +func (s *Store) AddTempSector(root types.Hash256, expiration uint64) error { + return s.transaction(func(tx *txn) error { + // ensure the sector is written to a volume + var sectorID int64 + err := tx.QueryRow(`SELECT ss.id FROM stored_sectors ss +INNER JOIN volume_sectors vs ON (vs.sector_id = ss.id) +WHERE ss.sector_root=$1`, encode(root)).Scan(§orID) + if errors.Is(err, sql.ErrNoRows) { + return storage.ErrSectorNotFound + } else if err != nil { + return fmt.Errorf("failed to find sector: %w", err) + } else if err := incrementNumericStat(tx, metricTempSectors, 1, time.Now()); err != nil { + return fmt.Errorf("failed to update metric: %w", err) + } + _, err = tx.Exec(`INSERT INTO temp_storage_sector_roots (sector_id, expiration_height) VALUES ($1, $2)`, sectorID, expiration) + return err + }) } // AddTemporarySectors adds the roots of sectors that are temporarily stored // on the host. The sectors will be deleted after the expiration height. +// +// Deprecated: use AddTempSector func (s *Store) AddTemporarySectors(sectors []storage.TempSector) error { return s.transaction(func(tx *txn) error { stmt, err := tx.Prepare(`INSERT INTO temp_storage_sector_roots (sector_id, expiration_height) SELECT id, $1 FROM stored_sectors WHERE sector_root=$2 RETURNING id;`) @@ -173,234 +180,149 @@ func (s *Store) SectorReferences(root types.Hash256) (refs storage.SectorReferen if err != nil { return fmt.Errorf("failed to get temp storage: %w", err) } - - // check if the sector is locked - refs.Locks, err = getSectorLockCount(tx, dbID) - if err != nil { - return fmt.Errorf("failed to get locks: %w", err) - } return nil }) return } -func contractSectorRefs(tx *txn, sectorID int64) (contractIDs []types.FileContractID, err error) { - rows, err := tx.Query(`SELECT DISTINCT contract_id FROM contract_sector_roots WHERE sector_id=$1;`, sectorID) - if err != nil { - return nil, fmt.Errorf("failed to select contracts: %w", err) - } - defer rows.Close() +// HasSector returns true if the sector root is stored on the host +func (s *Store) HasSector(root types.Hash256) (exists bool, err error) { + err = s.transaction(func(tx *txn) error { + const query = `SELECT ss.id +FROM stored_sectors ss +WHERE ss.sector_root=$1 AND (EXISTS (SELECT 1 FROM contract_sector_roots csr WHERE ss.id = csr.sector_id) + OR EXISTS (SELECT 1 FROM contract_v2_sector_roots csr2 WHERE ss.id = csr2.sector_id) + OR EXISTS (SELECT 1 FROM temp_storage_sector_roots tsr WHERE ss.id = tsr.sector_id));` - for rows.Next() { - var contractID types.FileContractID - if err := rows.Scan(decode(&contractID)); err != nil { - return nil, fmt.Errorf("failed to scan contract id: %w", err) + var sectorID int64 + err := tx.QueryRow(query, encode(root)).Scan(§orID) + if err == nil { + exists = true + return nil + } else if errors.Is(err, sql.ErrNoRows) { + return nil } - contractIDs = append(contractIDs, contractID) - } + return err + }) return } -func getTempStorageCount(tx *txn, sectorID int64) (n int, err error) { - err = tx.QueryRow(`SELECT COUNT(*) FROM temp_storage_sector_roots WHERE sector_id=$1;`, sectorID).Scan(&n) - return +type volumeSectorRef struct { + VolumeID int64 + VolumeSectorID int64 } -func getSectorLockCount(tx *txn, sectorID int64) (n int, err error) { - err = tx.QueryRow(`SELECT COUNT(*) FROM locked_sectors WHERE sector_id=$1;`, sectorID).Scan(&n) - return -} +func pruneableVolumeSectors(tx *txn, lastAccess time.Time) (refs []volumeSectorRef, err error) { + const query = `SELECT vs.id, vs.volume_id FROM volume_sectors vs +INNER JOIN stored_sectors ss ON vs.sector_id=ss.id +LEFT JOIN contract_sector_roots csr ON ss.id=csr.sector_id +LEFT JOIN contract_v2_sector_roots csr2 ON ss.id=csr2.sector_id +LEFT JOIN temp_storage_sector_roots tsr ON ss.id=tsr.sector_id +WHERE ss.last_access_timestamp < $1 AND csr.sector_id IS NULL AND csr2.sector_id IS NULL AND tsr.sector_id IS NULL +LIMIT $2;` -func incrementVolumeUsage(tx *txn, volumeID int64, delta int) error { - var used int64 - err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors=used_sectors+$1 WHERE id=$2 RETURNING used_sectors;`, delta, volumeID).Scan(&used) + rows, err := tx.Query(query, encode(lastAccess), sqlSectorBatchSize) if err != nil { - return fmt.Errorf("failed to update volume: %w", err) - } else if used < 0 { - panic("volume usage is negative") // developer error - } else if err = incrementNumericStat(tx, metricPhysicalSectors, delta, time.Now()); err != nil { - return fmt.Errorf("failed to update metric: %w", err) + return nil, fmt.Errorf("failed to select volume sectors: %w", err) } - return nil -} - -func pruneSectors(tx *txn, ids []int64) (pruned []types.Hash256, err error) { - hasContractRefStmt, err := tx.Prepare(`SELECT EXISTS(SELECT 1 FROM contract_sector_roots WHERE sector_id=$1)`) - if err != nil { - return nil, fmt.Errorf("failed to prepare contract reference query: %w", err) - } - defer hasContractRefStmt.Close() - - hasV2ContractRefStmt, err := tx.Prepare(`SELECT EXISTS(SELECT 1 FROM contract_v2_sector_roots WHERE sector_id=$1)`) - if err != nil { - return nil, fmt.Errorf("failed to prepare v2 contract reference query: %w", err) - } - defer hasV2ContractRefStmt.Close() - - hasTempRefStmt, err := tx.Prepare(`SELECT EXISTS(SELECT 1 FROM temp_storage_sector_roots WHERE sector_id=$1)`) - if err != nil { - return nil, fmt.Errorf("failed to prepare temp reference query: %w", err) - } - defer hasTempRefStmt.Close() - - hasLockStmt, err := tx.Prepare(`SELECT EXISTS(SELECT 1 FROM locked_sectors WHERE sector_id=$1)`) - if err != nil { - return nil, fmt.Errorf("failed to prepare lock reference query: %w", err) - } - defer hasLockStmt.Close() - - clearVolumeStmt, err := tx.Prepare(`UPDATE volume_sectors SET sector_id=NULL WHERE sector_id=$1 RETURNING volume_id`) - if err != nil { - return nil, fmt.Errorf("failed to prepare volume reference query: %w", err) - } - defer clearVolumeStmt.Close() + defer rows.Close() - deleteSectorStmt, err := tx.Prepare(`DELETE FROM stored_sectors WHERE id=$1 RETURNING sector_root`) - if err != nil { - return nil, fmt.Errorf("failed to prepare delete sector query: %w", err) + for rows.Next() { + var ref volumeSectorRef + if err := rows.Scan(&ref.VolumeSectorID, &ref.VolumeID); err != nil { + return nil, fmt.Errorf("failed to scan volume sector: %w", err) + } + refs = append(refs, ref) } - defer deleteSectorStmt.Close() + return +} - volumeDelta := make(map[int64]int) - for _, id := range ids { - var exists bool - err := hasContractRefStmt.QueryRow(id).Scan(&exists) - if err != nil { - return nil, fmt.Errorf("failed to check contract references: %w", err) - } else if exists { - continue // sector has a contract reference +// PruneSectors removes volume references for sectors that have not been accessed since the provided +// timestamp and are no longer referenced by a contract or temp storage. +func (s *Store) PruneSectors(ctx context.Context, lastAccess time.Time) error { + // note: last access can be removed after v2 when sectors are immediately committed to temp storage + for i := 0; ; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: } - err = hasV2ContractRefStmt.QueryRow(id).Scan(&exists) - if err != nil { - return nil, fmt.Errorf("failed to check v2 contract references: %w", err) - } else if exists { - continue // sector has a contract reference - } + var done bool + err := s.transaction(func(tx *txn) error { + refs, err := pruneableVolumeSectors(tx, lastAccess) + if err != nil { + return fmt.Errorf("failed to select volume sectors: %w", err) + } else if len(refs) == 0 { + done = true + return nil + } - err = hasTempRefStmt.QueryRow(id).Scan(&exists) - if err != nil { - return nil, fmt.Errorf("failed to check temp references: %w", err) - } else if exists { - continue // sector has a temp storage reference - } + updateSectorStmt, err := tx.Prepare(`UPDATE volume_sectors SET sector_id=null WHERE id=$1;`) + if err != nil { + return fmt.Errorf("failed to prepare update: %w", err) + } + defer updateSectorStmt.Close() + + volumeDeltas := make(map[int64]int) + for _, ref := range refs { + if res, err := updateSectorStmt.Exec(ref.VolumeSectorID); err != nil { + return fmt.Errorf("failed to update sector: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != 1 { + return fmt.Errorf("failed to update sector: %w", storage.ErrSectorNotFound) + } + + volumeDeltas[ref.VolumeID]-- + } - err = hasLockStmt.QueryRow(id).Scan(&exists) + for volumeID, delta := range volumeDeltas { + if err := incrementVolumeUsage(tx, volumeID, delta); err != nil { + return fmt.Errorf("failed to update volume %d usage: %w", volumeID, err) + } + } + return nil + }) if err != nil { - return nil, fmt.Errorf("failed to check lock references: %w", err) - } else if exists { - continue // sector is locked - } - - var volumeDBID int64 - err = clearVolumeStmt.QueryRow(id).Scan(&volumeDBID) - if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found - return nil, fmt.Errorf("failed to clear volume references: %w", err) - } else if err == nil { - volumeDelta[volumeDBID]-- // sector was removed from a volume - } - - var root types.Hash256 - err = deleteSectorStmt.QueryRow(id).Scan(decode(&root)) - if err != nil && !errors.Is(err, sql.ErrNoRows) { // ignore rows not found - return nil, fmt.Errorf("failed to delete sector: %w", err) - } else if err == nil { - pruned = append(pruned, root) - } - } - - // decrement the usage of all changed volumes - for volumeDBID, delta := range volumeDelta { - if err := incrementVolumeUsage(tx, volumeDBID, delta); err != nil { - return nil, fmt.Errorf("failed to update volume usage: %w", err) + return fmt.Errorf("failed to prune sectors: %w", err) + } else if done { + return nil } } - return -} - -// lockSector locks a sector root. The lock must be released by calling -// unlockSector. A sector must be locked when it is being read or written -// to prevent it from being removed by prune sector. -func lockSector(tx *txn, sectorDBID int64) (lockID int64, err error) { - err = tx.QueryRow(`INSERT INTO locked_sectors (sector_id) VALUES ($1) RETURNING id;`, sectorDBID).Scan(&lockID) - return } -// deleteLocks removes the lock records with the given ids and returns the -// sector ids of the deleted locks. -func deleteLocks(tx *txn, ids []int64) (sectorIDs []int64, err error) { - if len(ids) == 0 { - return nil, nil - } - - query := `DELETE FROM locked_sectors WHERE id IN (` + queryPlaceHolders(len(ids)) + `) RETURNING sector_id;` - rows, err := tx.Query(query, queryArgs(ids)...) +func contractSectorRefs(tx *txn, sectorID int64) (contractIDs []types.FileContractID, err error) { + rows, err := tx.Query(`SELECT DISTINCT contract_id FROM contract_sector_roots WHERE sector_id=$1;`, sectorID) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to select contracts: %w", err) } defer rows.Close() for rows.Next() { - var sectorID int64 - if err := rows.Scan(§orID); err != nil { - return nil, fmt.Errorf("failed to scan sector id: %w", err) + var contractID types.FileContractID + if err := rows.Scan(decode(&contractID)); err != nil { + return nil, fmt.Errorf("failed to scan contract id: %w", err) } - sectorIDs = append(sectorIDs, sectorID) + contractIDs = append(contractIDs, contractID) } return } -// unlockSector unlocks a sector root. -func unlockSector(tx *txn, log *zap.Logger, lockIDs ...int64) error { - if len(lockIDs) == 0 { - return nil - } - - sectorIDs, err := deleteLocks(tx, lockIDs) - if err != nil { - return fmt.Errorf("failed to delete locks: %w", err) - } - - pruned, err := pruneSectors(tx, sectorIDs) - if err != nil { - return fmt.Errorf("failed to prune sectors: %w", err) - } - log.Debug("unlocked sectors", zap.Int("unlocked", len(lockIDs)), zap.Stringers("removed", pruned)) - return nil -} - -// lockLocations locks multiple sector locations and returns a list of lock -// IDs. The lock ids must be unlocked by unlockLocations. Volume locations -// should be locked during writes to prevent the location from being written -// to by another goroutine. -func lockLocations(tx *txn, locations []storage.SectorLocation) (locks []int64, err error) { - if len(locations) == 0 { - return nil, nil - } - stmt, err := tx.Prepare(`INSERT INTO locked_volume_sectors (volume_sector_id) VALUES ($1) RETURNING id;`) - if err != nil { - return nil, fmt.Errorf("failed to prepare query: %w", err) - } - defer stmt.Close() - for _, location := range locations { - var lockID int64 - err := stmt.QueryRow(location.ID).Scan(&lockID) - if err != nil { - return nil, fmt.Errorf("failed to lock location %v:%v: %w", location.Volume, location.Index, err) - } - locks = append(locks, lockID) - } +func getTempStorageCount(tx *txn, sectorID int64) (n int, err error) { + err = tx.QueryRow(`SELECT COUNT(*) FROM temp_storage_sector_roots WHERE sector_id=$1;`, sectorID).Scan(&n) return } -// unlockLocations unlocks multiple locked sector locations. It is safe to -// call multiple times. -func unlockLocations(tx *txn, ids []int64) error { - if len(ids) == 0 { - return nil +func incrementVolumeUsage(tx *txn, volumeID int64, delta int) error { + var used int64 + err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors=used_sectors+$1 WHERE id=$2 RETURNING used_sectors;`, delta, volumeID).Scan(&used) + if err != nil { + return fmt.Errorf("failed to update volume: %w", err) + } else if used < 0 { + panic("volume usage is negative") // developer error + } else if err = incrementNumericStat(tx, metricPhysicalSectors, delta, time.Now()); err != nil { + return fmt.Errorf("failed to update metric: %w", err) } - - query := `DELETE FROM locked_volume_sectors WHERE id IN (` + queryPlaceHolders(len(ids)) + `);` - _, err := tx.Exec(query, queryArgs(ids)...) - return err + return nil } diff --git a/persist/sqlite/store.go b/persist/sqlite/store.go index 6ec1de4b..0b2859e6 100644 --- a/persist/sqlite/store.go +++ b/persist/sqlite/store.go @@ -120,44 +120,6 @@ func doTransaction(db *sql.DB, log *zap.Logger, fn func(tx *txn) error) error { return nil } -func clearLockedSectors(tx *txn, log *zap.Logger) error { - rows, err := tx.Query(`DELETE FROM locked_sectors RETURNING sector_id`) - if err != nil { - return err - } - defer rows.Close() - var sectorIDs []int64 - for rows.Next() { - var sectorID int64 - if err := rows.Scan(§orID); err != nil { - return fmt.Errorf("failed to scan sector id: %w", err) - } - } - - removed, err := pruneSectors(tx, sectorIDs) - if err != nil { - return fmt.Errorf("failed to prune sectors: %w", err) - } - log.Debug("cleared locked sectors", zap.Int("locked", len(sectorIDs)), zap.Stringers("removed", removed)) - return nil -} - -func clearLockedLocations(tx *txn) error { - _, err := tx.Exec(`DELETE FROM locked_volume_sectors`) - return err -} - -func (s *Store) clearLocks() error { - return s.transaction(func(tx *txn) error { - if err := clearLockedLocations(tx); err != nil { - return fmt.Errorf("failed to clear locked locations: %w", err) - } else if err = clearLockedSectors(tx, s.log.Named("clearLockedSectors")); err != nil { - return fmt.Errorf("failed to clear locked sectors: %w", err) - } - return nil - }) -} - func sqlConn(ctx context.Context, db *sql.DB) (c *sqlite3.SQLiteConn, err error) { if err := db.PingContext(ctx); err != nil { return nil, fmt.Errorf("failed to ping database: %w", err) @@ -285,10 +247,6 @@ func OpenDatabase(fp string, log *zap.Logger) (*Store, error) { } if err := store.init(); err != nil { return nil, err - } else if err = store.clearLocks(); err != nil { - // clear any locked sectors, metadata not synced to disk is safe to - // overwrite. - return nil, fmt.Errorf("failed to clear locked sectors table: %w", err) } sqliteVersion, _, _ := sqlite3.Version() log.Debug("database initialized", zap.String("sqliteVersion", sqliteVersion), zap.Int("schemaVersion", len(migrations)+1), zap.String("path", fp)) diff --git a/persist/sqlite/store_test.go b/persist/sqlite/store_test.go index 7d360c5a..ece67051 100644 --- a/persist/sqlite/store_test.go +++ b/persist/sqlite/store_test.go @@ -3,7 +3,6 @@ package sqlite import ( "context" "errors" - "fmt" "path/filepath" "reflect" "testing" @@ -11,9 +10,7 @@ import ( "github.com/mattn/go-sqlite3" "go.sia.tech/hostd/host/settings" - "go.sia.tech/hostd/host/storage" "go.uber.org/zap/zaptest" - "lukechampine.com/frand" ) func TestTransactionRetry(t *testing.T) { @@ -119,86 +116,6 @@ func TestTransactionRetry(t *testing.T) { }) } -func TestClearLockedSectors(t *testing.T) { - const sectors = 100 - db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), zaptest.NewLogger(t)) - if err != nil { - t.Fatal(err) - } - defer db.Close() - - id, err := db.AddVolume("foo", false) - if err != nil { - t.Fatal(err) - } else if err = db.GrowVolume(id, sectors); err != nil { - t.Fatal(err) - } else if err = db.SetAvailable(id, true); err != nil { - t.Fatal(err) - } - - assertSectors := func(locked, temp int) { - t.Helper() - // check that the sectors are locked - var dbLocked, dbTemp int - err := db.transaction(func(tx *txn) error { - if err := tx.QueryRow(`SELECT COUNT(*) FROM locked_volume_sectors`).Scan(&dbLocked); err != nil { - return fmt.Errorf("query locked sectors: %w", err) - } else if err := tx.QueryRow(`SELECT COUNT(*) FROM temp_storage_sector_roots`).Scan(&dbTemp); err != nil { - return fmt.Errorf("query temp sectors: %w", err) - } - return nil - }) - if err != nil { - t.Fatal(err) - } else if dbLocked != locked { - t.Fatalf("expected %v locked sectors, got %v", locked, dbLocked) - } else if dbTemp != temp { - t.Fatalf("expected %v temp sectors, got %v", temp, dbTemp) - } - - m, err := db.Metrics(time.Now()) - if err != nil { - t.Fatal(err) - } else if m.Storage.TempSectors != uint64(temp) { - t.Fatalf("expected %v temp sectors, got %v", temp, m.Storage.TempSectors) - } - } - - // write temp sectors to the database - for i := 1; i <= sectors; i++ { - sectorRoot := frand.Entropy256() - _, err := db.StoreSector(sectorRoot, func(storage.SectorLocation, bool) error { - return nil - }) - if err != nil { - t.Fatal("add sector", i, err) - } - - // only store the first half of the sectors as temp sectors - if i > sectors/2 { - continue - } - - err = db.AddTemporarySectors([]storage.TempSector{ - {Root: sectorRoot, Expiration: uint64(i + 1)}, - }) - if err != nil { - t.Fatal("add temp sector", i, err) - } - } - - // check that the sectors have been stored and locked - assertSectors(sectors, sectors/2) - - // clear the locked sectors - if err = db.clearLocks(); err != nil { - t.Fatal(err) - } - - // check that all the locks were removed and half the sectors deleted - assertSectors(0, sectors/2) -} - func TestBackup(t *testing.T) { srcPath := filepath.Join(t.TempDir(), "test.db") db, err := OpenDatabase(srcPath, zaptest.NewLogger(t)) diff --git a/persist/sqlite/volumes.go b/persist/sqlite/volumes.go index e36597fa..4510df92 100644 --- a/persist/sqlite/volumes.go +++ b/persist/sqlite/volumes.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "math" "time" "go.sia.tech/core/types" @@ -13,123 +12,6 @@ import ( "go.uber.org/zap" ) -func (s *Store) migrateSector(volumeID int64, minIndex uint64, marker int64, migrateFn storage.MigrateFunc, log *zap.Logger) (int64, bool, error) { - start := time.Now() - - var locationLocks []int64 - var sectorLock int64 - var oldLoc, newLoc storage.SectorLocation - err := s.transaction(func(tx *txn) (err error) { - oldLoc, err = sectorForMigration(tx, volumeID, marker) - if errors.Is(err, sql.ErrNoRows) { - marker = math.MaxInt64 - return nil - } else if err != nil { - return fmt.Errorf("failed to get sector for migration: %w", err) - } - marker = int64(oldLoc.Index) - - sectorDBID, err := sectorDBID(tx, oldLoc.Root) - if err != nil { - return fmt.Errorf("failed to get sector id: %w", err) - } - - sectorLock, err = lockSector(tx, sectorDBID) - if err != nil { - return fmt.Errorf("failed to lock sector: %w", err) - } - - newLoc, err = emptyLocationForMigration(tx, volumeID) - if errors.Is(err, storage.ErrNotEnoughStorage) && minIndex > 0 { - // if there is no space in other volumes, try to migrate within the - // same volume - newLoc, err = locationWithinVolume(tx, volumeID, uint64(minIndex)) - if err != nil { - return fmt.Errorf("failed to get empty location in volume: %w", err) - } - } else if err != nil { - return fmt.Errorf("failed to get empty location: %w", err) - } - - newLoc.Root = oldLoc.Root - - // lock the old and new locations - locationLocks, err = lockLocations(tx, []storage.SectorLocation{oldLoc, newLoc}) - if err != nil { - return fmt.Errorf("failed to lock sectors: %w", err) - } - return nil - }) - if errors.Is(err, storage.ErrNotEnoughStorage) { - return marker, false, nil - } else if err != nil { - return 0, false, fmt.Errorf("failed to get new location: %w", err) - } else if marker == math.MaxInt64 { - return marker, false, nil - } - // unlock the locations - defer func() { - err = s.transaction(func(tx *txn) error { - if err := unlockLocations(tx, locationLocks); err != nil { - return fmt.Errorf("failed to unlock sector locations: %w", err) - } else if err := unlockSector(tx, log.Named("unlock"), sectorLock); err != nil { - return fmt.Errorf("failed to unlock sector: %w", err) - } - return nil - }) - if err != nil { - log.Error("failed to unlock sectors", zap.Error(err)) - } - }() - - // call the migrateFn with the new location, data should be copied to the - // new location and synced to disk - if err := migrateFn(newLoc); err != nil { - log.Error("failed to migrate sector data", zap.Error(err)) - return marker, false, nil - } - - // update the sector location in a separate transaction - err = s.transaction(func(tx *txn) error { - // get the sector ID - var sectorID int64 - err := tx.QueryRow(`SELECT sector_id FROM volume_sectors WHERE id=$1`, oldLoc.ID).Scan(§orID) - if err != nil { - return fmt.Errorf("failed to get sector id: %w", err) - } - - // clear the old sector - var oldVolumeID int64 - err = tx.QueryRow(`UPDATE volume_sectors SET sector_id=null WHERE id=$1 AND sector_id=$2 RETURNING volume_id`, oldLoc.ID, sectorID).Scan(&oldVolumeID) - if err != nil { - return fmt.Errorf("failed to clear sector location: %w", err) - } - - // update the old volume metadata - if err := incrementVolumeUsage(tx, oldVolumeID, -1); err != nil { - return fmt.Errorf("failed to update old volume metadata: %w", err) - } - - // add the sector to the new location - var newVolumeID int64 - err = tx.QueryRow(`UPDATE volume_sectors SET sector_id=$1 WHERE id=$2 RETURNING volume_id`, sectorID, newLoc.ID).Scan(&newVolumeID) - if err != nil { - return fmt.Errorf("failed to update sector location: %w", err) - } - - // update the new volume metadata - if err := incrementVolumeUsage(tx, newVolumeID, 1); err != nil { - return fmt.Errorf("failed to update new volume metadata: %w", err) - } - return nil - }) - if err != nil { - return 0, false, fmt.Errorf("failed to update sector metadata: %w", err) - } - log.Debug("migrated sector", zap.Uint64("oldIndex", oldLoc.Index), zap.Stringer("root", newLoc.Root), zap.Int64("newVolume", newLoc.Volume), zap.Uint64("newIndex", newLoc.Index), zap.Duration("elapsed", time.Since(start))) - return marker, true, nil -} - func forceDeleteVolumeSectors(tx *txn, volumeID int64) (removed, lost int64, err error) { const query = `DELETE FROM volume_sectors WHERE id IN (SELECT id FROM volume_sectors WHERE volume_id=$1 LIMIT $2) RETURNING sector_id IS NULL AS empty` @@ -265,55 +147,34 @@ WHERE v.id=$1` } // StoreSector calls fn with an empty location in a writable volume. If -// the sector root already exists, fn is called with the existing -// location and exists is true. Unless exists is true, The sector must -// be written to disk within fn. If fn returns an error, the metadata is -// rolled back. If no space is available, ErrNotEnoughStorage is -// returned. The location is locked until release is called. -// -// The sector should be referenced by either a contract or temp store -// before release is called to prevent it from being pruned -func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocation, exists bool) error) (func() error, error) { - var sectorLockID int64 - var locationLocks []int64 +// the sector root already exists, nil is returned. The sector should be +// written to disk within fn. If fn returns an error, the metadata is +// rolled back and the error is returned. If no space is available, +// ErrNotEnoughStorage is returned. +func (s *Store) StoreSector(root types.Hash256, fn storage.StoreFunc) error { var location storage.SectorLocation var exists bool - log := s.log.Named("StoreSector").With(zap.Stringer("root", root)) err := s.transaction(func(tx *txn) error { sectorID, err := insertSectorDBID(tx, root) if err != nil { return fmt.Errorf("failed to get sector id: %w", err) } - // lock the sector - sectorLockID, err = lockSector(tx, sectorID) - if err != nil { - return fmt.Errorf("failed to lock sector: %w", err) - } - // check if the sector is already stored on disk location, err = sectorLocation(tx, sectorID, root) - exists = err == nil - if errors.Is(err, storage.ErrSectorNotFound) { - location, err = emptyLocation(tx) - if err != nil { - return fmt.Errorf("failed to get empty location: %w", err) - } - } else if err != nil { + if err != nil && !errors.Is(err, storage.ErrSectorNotFound) { return fmt.Errorf("failed to check existing sector location: %w", err) + } else if err == nil { + exists = true + return nil } - // lock the location - locationLocks, err = lockLocations(tx, []storage.SectorLocation{location}) + location, err = emptyLocation(tx) if err != nil { - return fmt.Errorf("failed to lock sector location: %w", err) + return fmt.Errorf("failed to get empty location: %w", err) } - // if the sector already exists, return the existing location - if exists { - return nil - } res, err := tx.Exec(`UPDATE volume_sectors SET sector_id=$1 WHERE id=$2`, sectorID, location.ID) if err != nil { return fmt.Errorf("failed to commit sector location: %w", err) @@ -330,27 +191,29 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati return nil }) if err != nil { - return nil, err + return err + } else if exists { + return nil } - log = log.With(zap.Int64("volume", location.Volume), zap.Uint64("index", location.Index)) - log.Debug("stored sector") - unlock := func() error { - return s.transaction(func(tx *txn) error { - if err := unlockLocations(tx, locationLocks); err != nil { - return fmt.Errorf("failed to unlock sector location: %w", err) - } else if err := unlockSector(tx, log.Named("unlock"), sectorLockID); err != nil { - return fmt.Errorf("failed to unlock sector: %w", err) + + // call fn with the location + if err := fn(location); err != nil { + rollbackErr := s.transaction(func(tx *txn) error { + _, err := tx.Exec(`UPDATE volume_sectors SET sector_id=null WHERE id=$1`, location.ID) + if err != nil { + return fmt.Errorf("failed to rollback sector location: %w", err) + } else if err := incrementVolumeUsage(tx, location.Volume, -1); err != nil { + return fmt.Errorf("failed to update volume metadata: %w", err) } return nil }) + if rollbackErr != nil { + // rollbacks are best-effort. The dangling reference will be picked up by prune regardless. + s.log.Error("failed to rollback volume metadata: %w", zap.NamedError("rollbackErr", rollbackErr), zap.Stringer("root", root), zap.Error(err)) + } + return err } - - // call fn with the location - if err := fn(location, exists); err != nil { - unlock() - return nil, fmt.Errorf("failed to store sector: %w", err) - } - return unlock, nil + return nil } // MigrateSectors migrates each occupied sector of a volume starting at @@ -358,37 +221,92 @@ func (s *Store) StoreSector(root types.Hash256, fn func(loc storage.SectorLocati // The sector data should be copied to the new location and synced // to disk immediately. If migrateFn returns an error, that sector will be // considered failed and the migration will continue. If the context is -// canceled, the migration will stop and the error will be returned. The -// number of sectors migrated and failed will always be returned, even if an +// canceled, the migration will stop and the error will be returned. If the host +// runs out of writable storage, migration will stop and ErrNotEnoughStorage will +// be returned. The number of sectors migrated and failed will always be returned, even if an // error occurs. func (s *Store) MigrateSectors(ctx context.Context, volumeID int64, startIndex uint64, migrateFn storage.MigrateFunc) (migrated, failed int, err error) { - log := s.log.Named("migrate").With(zap.Int64("oldVolume", volumeID), zap.Uint64("startIndex", startIndex)) - // the migration function is called in a loop until all sectors are migrated - // marker is used to skip sectors that tried to migrate but failed. - // when removing a volume, marker is -1 to also migrate the first sector - marker := int64(startIndex) - 1 - for i := 0; ; i++ { + log := s.log.Named("migrate").With(zap.Uint64("startIndex", startIndex)) + for index := startIndex; ; index++ { if ctx.Err() != nil { err = ctx.Err() return } - var successful bool - marker, successful, err = s.migrateSector(volumeID, startIndex, marker, migrateFn, log) + var done bool + err = s.transaction(func(tx *txn) error { + const query = `SELECT vs.id, vs.volume_id, vs.volume_index, ss.sector_root, vs.sector_id FROM volume_sectors vs +LEFT JOIN stored_sectors ss ON vs.sector_id=ss.id +WHERE vs.volume_id=$1 AND vs.volume_index=$2 +LIMIT 1;` + + var nullSectorID sql.NullInt64 + var from storage.SectorLocation + err := tx.QueryRow(query, volumeID, index).Scan(&from.ID, &from.Volume, &from.Index, decodeNullable(&from.Root), &nullSectorID) + if errors.Is(err, sql.ErrNoRows) { + done = true + return nil + } else if err != nil { + return fmt.Errorf("failed to get sector: %w", err) + } else if !nullSectorID.Valid { + return nil // skip empty sectors + } + sectorID := nullSectorID.Int64 + + to, err := emptyLocationForMigration(tx, volumeID, startIndex) + if err != nil { + return fmt.Errorf("failed to get empty location: %w", err) + } + to.Root = from.Root + + // this does introduce a performance concern where the database is now locked while + // waiting on disk I/O. This is acceptable since it's extremely important that migrations + // are atomic. + if migrateErr := migrateFn(from, to); migrateErr != nil { + log.Error("failed to migrate sector", zap.Error(migrateErr), zap.Uint64("index", index), zap.Stringer("root", from.Root)) + failed++ + return nil + } + + res, err := tx.Exec(`UPDATE volume_sectors SET sector_id=NULL WHERE id=$1 AND sector_id=$2`, from.ID, sectorID) + if err != nil { + return fmt.Errorf("failed to clear old sector location: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != 1 { + return errors.New("failed to clear old sector location: no rows affected") + } + + res, err = tx.Exec(`UPDATE volume_sectors SET sector_id=$1 WHERE id=$2`, sectorID, to.ID) + if err != nil { + return fmt.Errorf("failed to update sector location: %w", err) + } else if n, err := res.RowsAffected(); err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } else if n != 1 { + return errors.New("failed to update sector location: no rows affected") + } + + migrated++ + log.Debug("migrated sector", zap.Uint64("fromIndex", from.Index), zap.Int64("fromVolume", from.Volume), zap.Uint64("toIndex", to.Index), zap.Int64("toVolume", to.Volume), zap.Stringer("root", from.Root)) + if from.Volume == to.Volume { + return nil // skip updating metrics if the volume is not changing + } + + if err := incrementVolumeUsage(tx, from.Volume, -1); err != nil { + return fmt.Errorf("failed to update old volume metadata: %w", err) + } else if err := incrementVolumeUsage(tx, to.Volume, 1); err != nil { + return fmt.Errorf("failed to update new volume metadata: %w", err) + } + return nil + }) if err != nil { - err = fmt.Errorf("failed to migrate sector: %w", err) + err = fmt.Errorf("failed to migrated sector: %w", err) return - } else if marker == math.MaxInt64 { + } else if done { return } - if successful { - migrated++ - } else { - failed++ - } - - if i%256 == 0 { + if index%256 == 0 { jitterSleep(time.Millisecond) // allow other transactions to run } } @@ -520,7 +438,7 @@ func (s *Store) SetAvailable(volumeID int64, available bool) error { // sectorDBID returns the ID of a sector root in the stored_sectors table. func sectorDBID(tx *txn, root types.Hash256) (id int64, err error) { - err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, encode(root)).Scan(&id) + err = tx.QueryRow(`UPDATE stored_sectors SET last_access_timestamp=$1 WHERE sector_root=$2 RETURNING id`, encode(time.Now()), encode(root)).Scan(&id) if errors.Is(err, sql.ErrNoRows) { err = storage.ErrSectorNotFound } @@ -531,12 +449,7 @@ func sectorDBID(tx *txn, root types.Hash256) (id int64, err error) { // does not already exist. If the sector root already exists, the ID is // returned. func insertSectorDBID(tx *txn, root types.Hash256) (id int64, err error) { - id, err = sectorDBID(tx, root) - if errors.Is(err, storage.ErrSectorNotFound) { - // insert the sector root - err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) RETURNING id`, encode(root), encode(time.Now())).Scan(&id) - return - } + err = tx.QueryRow(`INSERT INTO stored_sectors (sector_root, last_access_timestamp) VALUES ($1, $2) ON CONFLICT (sector_root) DO UPDATE SET last_access_timestamp=EXCLUDED.last_access_timestamp RETURNING id`, encode(root), encode(time.Now())).Scan(&id) return } @@ -595,11 +508,10 @@ WHERE v.sector_id=$1` // emptyLocation returns an empty location in a writable volume. If there is no // space available, ErrNotEnoughStorage is returned. func emptyLocation(tx *txn) (loc storage.SectorLocation, err error) { - const query = `SELECT vs.id, vs.volume_id, vs.volume_index - FROM volume_sectors vs INDEXED BY volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound - LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) + const query = `SELECT vs.id, vs.volume_id, vs.volume_index + FROM volume_sectors vs INNER JOIN storage_volumes sv ON (sv.id=vs.volume_id) - WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND sv.available=true AND sv.read_only=false + WHERE vs.sector_id IS NULL AND sv.available=true AND sv.read_only=false ORDER BY vs.sector_writes ASC LIMIT 1;` err = tx.QueryRow(query).Scan(&loc.ID, &loc.Volume, &loc.Index) @@ -615,15 +527,14 @@ func emptyLocation(tx *txn) (loc storage.SectorLocation, err error) { // emptyLocationForMigration returns an empty location in a writable volume. If there is no // space available, ErrNotEnoughStorage is returned. -func emptyLocationForMigration(tx *txn, volumeID int64) (loc storage.SectorLocation, err error) { - const query = `SELECT vs.id, vs.volume_id, vs.volume_index - FROM volume_sectors vs INDEXED BY volume_sectors_sector_writes_volume_id_sector_id_volume_index_compound - LEFT JOIN locked_volume_sectors lvs ON (lvs.volume_sector_id=vs.id) +func emptyLocationForMigration(tx *txn, volumeID int64, maxIndex uint64) (loc storage.SectorLocation, err error) { + const query = `SELECT vs.id, vs.volume_id, vs.volume_index + FROM volume_sectors vs INNER JOIN storage_volumes sv ON (sv.id=vs.volume_id) - WHERE vs.sector_id IS NULL AND lvs.volume_sector_id IS NULL AND sv.available=true AND sv.read_only=false AND vs.volume_id <> $1 + WHERE vs.sector_id IS NULL AND sv.available=true AND (vs.volume_id <> $1 AND sv.read_only=false OR (vs.volume_id=$1 AND vs.volume_index < $2)) ORDER BY vs.sector_writes ASC LIMIT 1;` - err = tx.QueryRow(query, volumeID).Scan(&loc.ID, &loc.Volume, &loc.Index) + err = tx.QueryRow(query, volumeID, maxIndex).Scan(&loc.ID, &loc.Volume, &loc.Index) if errors.Is(err, sql.ErrNoRows) { err = storage.ErrNotEnoughStorage return @@ -634,37 +545,6 @@ func emptyLocationForMigration(tx *txn, volumeID int64) (loc storage.SectorLocat return } -// sectorForMigration returns the location of the first occupied sector in the -// volume starting at minIndex and greater than marker. -func sectorForMigration(tx *txn, volumeID int64, marker int64) (loc storage.SectorLocation, err error) { - const query = `SELECT vs.id, vs.volume_id, vs.volume_index, s.sector_root - FROM volume_sectors vs - INNER JOIN stored_sectors s ON (s.id=vs.sector_id) - WHERE vs.sector_id IS NOT NULL AND vs.volume_id=$1 AND vs.volume_index > $2 - ORDER BY vs.volume_index ASC - LIMIT 1` - - err = tx.QueryRow(query, volumeID, marker).Scan(&loc.ID, &loc.Volume, &loc.Index, decode(&loc.Root)) - return -} - -// locationWithinVolume returns an empty location within the same volume as -// the given volumeID. If there is no space in the volume, ErrNotEnoughStorage -// is returned. -func locationWithinVolume(tx *txn, volumeID int64, maxIndex uint64) (loc storage.SectorLocation, err error) { - const query = `SELECT vs.id, vs.volume_id, vs.volume_index - FROM volume_sectors vs - WHERE vs.sector_id IS NULL AND vs.id NOT IN (SELECT volume_sector_id FROM locked_volume_sectors) - AND vs.volume_id=$1 AND vs.volume_index<$2 - LIMIT 1;` - - err = tx.QueryRow(query, volumeID, maxIndex).Scan(&loc.ID, &loc.Volume, &loc.Index) - if errors.Is(err, sql.ErrNoRows) { - return storage.SectorLocation{}, storage.ErrNotEnoughStorage - } - return -} - func scanVolume(s scanner) (volume storage.Volume, err error) { err = s.Scan(&volume.ID, &volume.LocalPath, &volume.ReadOnly, &volume.Available, &volume.TotalSectors, &volume.UsedSectors) return diff --git a/persist/sqlite/volumes_test.go b/persist/sqlite/volumes_test.go index 68cfd6ce..cdb0a835 100644 --- a/persist/sqlite/volumes_test.go +++ b/persist/sqlite/volumes_test.go @@ -44,11 +44,9 @@ func TestVolumeSetReadOnly(t *testing.T) { } // try to add a sector to the volume - release, err := db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation, exists bool) error { return nil }) + err = db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation) error { return nil }) if err != nil { t.Fatal(err) - } else if err := release(); err != nil { // immediately release the sector so it can be used again - t.Fatal(err) } // set the volume to read-only @@ -58,7 +56,7 @@ func TestVolumeSetReadOnly(t *testing.T) { // try to add another sector to the volume, should fail with // ErrNotEnoughStorage - _, err = db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation, exists bool) error { return nil }) + err = db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation) error { return nil }) if !errors.Is(err, storage.ErrNotEnoughStorage) { t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } @@ -82,7 +80,7 @@ func TestAddSector(t *testing.T) { root := frand.Entropy256() // try to store a sector in the empty volume, should return // ErrNotEnoughStorage - _, err = db.StoreSector(root, func(storage.SectorLocation, bool) error { return nil }) + err = db.StoreSector(root, func(storage.SectorLocation) error { return nil }) if !errors.Is(err, storage.ErrNotEnoughStorage) { t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } @@ -92,7 +90,7 @@ func TestAddSector(t *testing.T) { t.Fatal(err) } // store the sector - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { + err = db.StoreSector(root, func(loc storage.SectorLocation) error { // check that the sector was stored in the expected location if loc.Volume != volumeID { t.Fatalf("expected volume ID %v, got %v", volumeID, loc.Volume) @@ -104,18 +102,11 @@ func TestAddSector(t *testing.T) { if err != nil { t.Fatal(err) } - defer func() { - if err := release(); err != nil { - t.Fatal("failed to release sector1:", err) - } - }() // check the location was added - loc, release, err := db.SectorLocation(root) + loc, err := db.SectorLocation(root) if err != nil { t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } if loc.Volume != volumeID { @@ -134,22 +125,13 @@ func TestAddSector(t *testing.T) { t.Fatalf("expected 1 used sector, got %v", volumes[0].UsedSectors) } - // store the sector again, exists should be true - release, err = db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { - switch { - case !exists: - t.Fatal("sector does not exist") - case loc.Volume != volumeID: - t.Fatalf("expected volume ID %v, got %v", volumeID, loc.Volume) - case loc.Index != 0: - t.Fatalf("expected sector index 0, got %v", loc.Index) - } + // store the sector again, should be a no-op + err = db.StoreSector(root, func(loc storage.SectorLocation) error { + t.Fatal("store function called twice") return nil }) if err != nil { t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } volumes, err = db.Volumes() @@ -164,12 +146,78 @@ func TestAddSector(t *testing.T) { // try to store another sector in the volume, should return // ErrNotEnoughStorage - _, err = db.StoreSector(frand.Entropy256(), func(storage.SectorLocation, bool) error { return nil }) + err = db.StoreSector(frand.Entropy256(), func(storage.SectorLocation) error { return nil }) if !errors.Is(err, storage.ErrNotEnoughStorage) { t.Fatalf("expected ErrNotEnoughStorage, got %v", err) } } +func TestHasSector(t *testing.T) { + log := zaptest.NewLogger(t) + db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), log) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + volumeID, err := db.AddVolume("test", false) + if err != nil { + t.Fatal(err) + } else if err := db.SetAvailable(volumeID, true); err != nil { + t.Fatal(err) + } else if err := db.GrowVolume(volumeID, 1); err != nil { + t.Fatal(err) + } + + root := frand.Entropy256() + // store the sector + err = db.StoreSector(root, func(loc storage.SectorLocation) error { + // check that the sector was stored in the expected location + if loc.Volume != volumeID { + t.Fatalf("expected volume ID %v, got %v", volumeID, loc.Volume) + } else if loc.Index != 0 { + t.Fatalf("expected sector index 0, got %v", loc.Index) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + // check the location was added + loc, err := db.SectorLocation(root) + if err != nil { + t.Fatal(err) + } + + if loc.Volume != volumeID { + t.Fatalf("expected volume ID %v, got %v", volumeID, loc.Volume) + } else if loc.Index != 0 { + t.Fatalf("expected sector index 0, got %v", loc.Index) + } + + // the sector should not exist since it is not referenced + exists, err := db.HasSector(root) + if err != nil { + t.Fatal(err) + } else if exists { + t.Fatal("expected sector to not exist") + } + + // add a temporary sector + if err = db.AddTempSector(root, 1); err != nil { + t.Fatal(err) + } + + // the sector should now exist since it is referenced by a temp sector + exists, err = db.HasSector(root) + if err != nil { + t.Fatal(err) + } else if !exists { + t.Fatal("expected sector to exist") + } +} + func TestVolumeAdd(t *testing.T) { log := zaptest.NewLogger(t) db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), log) @@ -337,22 +385,18 @@ func TestShrinkVolume(t *testing.T) { } // add a few sectors - var releaseFns []func() error for i := 0; i < 5; i++ { - release, err := db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation, exists bool) error { + err := db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation) error { if loc.Volume != volume.ID { t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) } else if loc.Index != uint64(i) { t.Fatalf("expected sector index %v, got %v", i, loc.Index) - } else if exists { - t.Fatal("sector exists") } return nil }) if err != nil { t.Fatal(err) } - releaseFns = append(releaseFns, release) } // check that the volume cannot be shrunk below the used sectors @@ -368,12 +412,6 @@ func TestShrinkVolume(t *testing.T) { } else if m.Storage.TotalSectors != 5 { t.Fatalf("expected %v total sectors, got %v", 5, m.Storage.TotalSectors) } - - for _, fn := range releaseFns { - if err := fn(); err != nil { - t.Fatal(err) - } - } } func TestRemoveVolume(t *testing.T) { @@ -404,13 +442,11 @@ func TestRemoveVolume(t *testing.T) { // add a few sectors for i := 0; i < 5; i++ { sectorRoot := frand.Entropy256() - release, err := db.StoreSector(sectorRoot, func(loc storage.SectorLocation, exists bool) error { + err := db.StoreSector(sectorRoot, func(loc storage.SectorLocation) error { if loc.Volume != volume.ID { t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) } else if loc.Index != uint64(i) { t.Fatalf("expected sector index 0, got %v", loc.Index) - } else if exists { - t.Fatal("sector exists") } return nil }) @@ -421,8 +457,6 @@ func TestRemoveVolume(t *testing.T) { err = db.AddTemporarySectors([]storage.TempSector{{Root: sectorRoot, Expiration: uint64(i)}}) if err != nil { t.Fatal(err) - } else if err := release(); err != nil { - t.Fatal(err) } } @@ -443,6 +477,8 @@ func TestRemoveVolume(t *testing.T) { // expire all of the temporary sectors if err := db.ExpireTempSectors(5); err != nil { t.Fatal(err) + } else if err := db.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { + t.Fatal(err) } // check that the volume can be removed @@ -479,23 +515,17 @@ func TestMigrateSectors(t *testing.T) { for i := range roots { root := frand.Entropy256() roots[i] = root - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { + err := db.StoreSector(root, func(loc storage.SectorLocation) error { if loc.Volume != volume.ID { t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) } else if loc.Index != uint64(i) { t.Fatalf("expected sector index %v, got %v", i, loc.Index) - } else if exists { - t.Fatal("sector already exists") } return nil }) if err != nil { t.Fatal(err) - } - - if err := db.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: uint64(i)}}); err != nil { - t.Fatal(err) - } else if err := release(); err != nil { + } else if err := db.AddTemporarySectors([]storage.TempSector{{Root: root, Expiration: uint64(i)}}); err != nil { t.Fatal(err) } } @@ -508,16 +538,15 @@ func TestMigrateSectors(t *testing.T) { } roots = roots[initialSectors/2:] - // migrate the remaining sectors to the first half of the volume var i int - migrated, failed, err := db.MigrateSectors(context.Background(), volume.ID, initialSectors/2, func(loc storage.SectorLocation) error { - if loc.Volume != volume.ID { - t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) - } else if loc.Index != uint64(i) { - t.Fatalf("expected sector index %v, got %v", i, loc.Index) - } else if loc.Root != roots[i] { - t.Fatalf("expected sector root index %d %v, got %v", i, roots[i], loc.Root) + migrated, failed, err := db.MigrateSectors(context.Background(), volume.ID, initialSectors/2, func(from, to storage.SectorLocation) error { + if to.Volume != volume.ID { + t.Fatalf("expected volume ID %v, got %v", volume.ID, to.Volume) + } else if to.Index != uint64(i) { + t.Fatalf("expected sector index %v, got %v", i, to.Index) + } else if to.Root != roots[i] { + t.Fatalf("expected sector root index %d %v, got %v", i, roots[i], to.Root) } i++ return nil @@ -534,14 +563,12 @@ func TestMigrateSectors(t *testing.T) { // check that the sector metadata has been updated for i, root := range roots { - if loc, release, err := db.SectorLocation(root); err != nil { + if loc, err := db.SectorLocation(root); err != nil { t.Fatal(err) } else if loc.Volume != volume.ID { t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) } else if loc.Index != uint64(i) { t.Fatalf("expected sector index %v, got %v", i, loc.Index) - } else if err := release(); err != nil { - t.Fatal(err) } } @@ -554,14 +581,14 @@ func TestMigrateSectors(t *testing.T) { } // migrate the remaining sectors from the first volume; should partially complete - migrated, failed, err = db.MigrateSectors(context.Background(), volume.ID, 0, func(loc storage.SectorLocation) error { + migrated, failed, err = db.MigrateSectors(context.Background(), volume.ID, 0, func(from, to storage.SectorLocation) error { return nil }) - if err != nil { - t.Fatal(err) + if !errors.Is(err, storage.ErrNotEnoughStorage) { + t.Fatalf("expected not enough storage error, got %v", err) } else if migrated != initialSectors/4 { t.Fatalf("expected %v migrated sectors, got %v", initialSectors/4, migrated) - } else if failed != len(roots)-(initialSectors/4) { + } else if failed != 0 { t.Fatalf("expected %v failed sectors, got %v", initialSectors-(initialSectors/4), failed) } @@ -587,7 +614,7 @@ func TestMigrateSectors(t *testing.T) { } func TestPrune(t *testing.T) { - const sectors = 100 + const sectors = 75 log := zaptest.NewLogger(t) db, err := OpenDatabase(filepath.Join(t.TempDir(), "test.db"), log) @@ -603,16 +630,13 @@ func TestPrune(t *testing.T) { // store enough sectors to fill the volume roots := make([]types.Hash256, 0, sectors) - releaseFns := make([]func() error, 0, sectors) for i := 0; i < sectors; i++ { root := frand.Entropy256() - release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { + err := db.StoreSector(root, func(loc storage.SectorLocation) error { if loc.Volume != volume.ID { t.Fatalf("expected volume ID %v, got %v", volume.ID, loc.Volume) } else if loc.Index != uint64(i) { t.Fatalf("expected sector index %v, got %v", i, loc.Index) - } else if exists { - t.Fatal("sector already exists") } return nil }) @@ -620,48 +644,61 @@ func TestPrune(t *testing.T) { t.Fatal(err) } roots = append(roots, root) - releaseFns = append(releaseFns, release) } renterKey := types.NewPrivateKeyFromSeed(frand.Bytes(32)) hostKey := types.NewPrivateKeyFromSeed(frand.Bytes(32)) - // add a contract to store the sectors - contractUnlockConditions := types.UnlockConditions{ - PublicKeys: []types.UnlockKey{ - renterKey.PublicKey().UnlockKey(), - hostKey.PublicKey().UnlockKey(), - }, - SignaturesRequired: 2, - } - c := contracts.SignedRevision{ - Revision: types.FileContractRevision{ - UnlockConditions: contractUnlockConditions, - ParentID: types.FileContractID(frand.Entropy256()), - FileContract: types.FileContract{ - UnlockHash: contractUnlockConditions.UnlockHash(), - WindowStart: 90, - WindowEnd: 100, + addContract := func(t *testing.T, expiration uint64) contracts.SignedRevision { + // add a contract to store the sectors + contractUnlockConditions := types.UnlockConditions{ + PublicKeys: []types.UnlockKey{ + renterKey.PublicKey().UnlockKey(), + hostKey.PublicKey().UnlockKey(), }, - }, - } - if err := db.AddContract(c, []types.Transaction{}, types.MaxCurrency, contracts.Usage{}, 100); err != nil { - t.Fatal(err) - } - contractSectors, tempSectors, lockedSectors, deletedSectors := roots[:20], roots[20:40], roots[40:60], roots[60:] - // append the contract sectors to the contract - var changes []contracts.SectorChange - for _, root := range contractSectors { - changes = append(changes, contracts.SectorChange{ - Root: root, - Action: contracts.SectorActionAppend, - }) + SignaturesRequired: 2, + } + c := contracts.SignedRevision{ + Revision: types.FileContractRevision{ + ParentID: types.FileContractID(frand.Entropy256()), + UnlockConditions: contractUnlockConditions, + FileContract: types.FileContract{ + UnlockHash: contractUnlockConditions.UnlockHash(), + WindowStart: expiration, + WindowEnd: expiration, + }, + }, + } + if err := db.AddContract(c, []types.Transaction{}, types.MaxCurrency, contracts.Usage{}, 100); err != nil { + t.Fatal(err) + } + return c } - err = db.ReviseContract(c, []types.Hash256{}, contracts.Usage{}, changes) - if err != nil { - t.Fatal(err) + + addSectorsToContract := func(t *testing.T, c *contracts.SignedRevision, sectors []types.Hash256) { + // append the contract sectors to the contract + var changes []contracts.SectorChange + for _, root := range sectors { + changes = append(changes, contracts.SectorChange{ + Root: root, + Action: contracts.SectorActionAppend, + }) + } + err = db.ReviseContract(*c, []types.Hash256{}, contracts.Usage{}, changes) + if err != nil { + t.Fatal(err) + } } + contract1Sectors, contract2Sectors, tempSectors, unreferencedSectors := roots[:10], roots[10:25], roots[25:50], roots[50:] + + c1 := addContract(t, 100) + c2 := addContract(t, 110) + + // add the sectors to the contract + addSectorsToContract(t, &c1, contract1Sectors) + addSectorsToContract(t, &c2, contract2Sectors) + // add the temporary sectors var tempRoots []storage.TempSector for _, root := range tempSectors { @@ -674,133 +711,72 @@ func TestPrune(t *testing.T) { t.Fatal(err) } - // lock the remaining sectors - var locks []int64 - for _, root := range lockedSectors { - err := db.transaction(func(tx *txn) error { - sectorID, err := sectorDBID(tx, root) - if err != nil { - return err - } - lockID, err := lockSector(tx, sectorID) - if err != nil { - return err - } - locks = append(locks, lockID) - return nil - }) - if err != nil { - t.Fatal(err) - } - } - - // remove the initial locks - for _, fn := range releaseFns { - if err := fn(); err != nil { - t.Fatal(err) - } - } - - checkConsistency := func(contract, temp, locked, deleted []types.Hash256) error { - for _, root := range contract { - if _, release, err := db.SectorLocation(root); err != nil { - return fmt.Errorf("sector %v not found: %w", root, err) - } else if err := release(); err != nil { - return fmt.Errorf("failed to release sector %v: %w", root, err) - } - } - - for _, root := range temp { - if _, release, err := db.SectorLocation(root); err != nil { - return fmt.Errorf("sector %v not found: %w", root, err) - } else if err := release(); err != nil { - return fmt.Errorf("failed to release sector %v: %w", root, err) - } - } + assertSectors := func(t *testing.T, contract, temp uint64, available, deleted []types.Hash256) { + t.Helper() - for _, root := range locked { - if _, release, err := db.SectorLocation(root); err != nil { - return fmt.Errorf("sector %v not found: %w", root, err) - } else if err := release(); err != nil { - return fmt.Errorf("failed to release sector %v: %w", root, err) + for _, root := range available { + if _, err := db.SectorLocation(root); err != nil { + t.Fatalf("sector %v not found: %s", available, err) } } - - for i, root := range deleted { - if _, _, err := db.SectorLocation(root); !errors.Is(err, storage.ErrSectorNotFound) { - return fmt.Errorf("expected ErrSectorNotFound for sector %d %q, got %v", i, root, err) + for _, root := range deleted { + if _, err := db.SectorLocation(root); !errors.Is(err, storage.ErrSectorNotFound) { + t.Fatalf("expected ErrSectorNotFound, got %v", err) } } - // check the volume usage - expectedSectors := uint64(len(contract) + len(temp) + len(locked)) used, _, err := db.StorageUsage() if err != nil { - return fmt.Errorf("failed to get storage usage: %w", err) - } else if used != expectedSectors { - return fmt.Errorf("expected %v sectors, got %v", expectedSectors, used) + t.Fatalf("failed to get storage usage: %v", err) + } else if used != uint64(len(available)) { + t.Fatalf("expected %v sectors, got %v", used, len(available)) } - // check the metrics m, err := db.Metrics(time.Now()) if err != nil { - return fmt.Errorf("failed to get metrics: %w", err) - } else if m.Storage.PhysicalSectors != expectedSectors { - return fmt.Errorf("expected %v total sectors, got %v", expectedSectors, m.Storage.PhysicalSectors) - } else if m.Storage.ContractSectors != uint64(len(contract)) { - return fmt.Errorf("expected %v contract sectors, got %v", len(contract), m.Storage.ContractSectors) - } else if m.Storage.TempSectors != uint64(len(temp)) { - return fmt.Errorf("expected %v temporary sectors, got %v", len(temp), m.Storage.TempSectors) + t.Fatalf("failed to get metrics: %v", err) + } else if m.Storage.PhysicalSectors != uint64(len(available)) { + t.Fatalf("expected %v physical sectors, got %v", len(available), m.Storage.PhysicalSectors) + } else if m.Storage.ContractSectors != contract { + t.Fatalf("expected %v contract sectors, got %v", contract, m.Storage.ContractSectors) + } else if m.Storage.TempSectors != temp { + t.Fatalf("expected %v temporary sectors, got %v", temp, m.Storage.TempSectors) } - return nil } + assertSectors(t, 25, 25, roots, nil) - if err := checkConsistency(contractSectors, tempSectors, lockedSectors, deletedSectors); err != nil { + // prune unreferenced sectors + available, deleted := roots[:len(roots)-len(unreferencedSectors)], unreferencedSectors + if err := db.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { t.Fatal(err) } + assertSectors(t, 25, 25, available, deleted) - // unlock locked sectors - err = db.transaction(func(tx *txn) error { - return unlockSector(tx, log.Named("unlockSector"), locks...) - }) - if err != nil { + // expire one of the contract's sectors + if err := db.ExpireContractSectors(101); err != nil { t.Fatal(err) - } - - if err := checkConsistency(contractSectors, tempSectors, nil, roots[60:]); err != nil { + } else if err := db.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { t.Fatal(err) } + available, deleted = append(contract2Sectors, tempSectors...), append(deleted, contract1Sectors...) + assertSectors(t, 15, 25, available, deleted) // expire the temp sectors - if err := db.ExpireTempSectors(100); err != nil { + if err := db.ExpireTempSectors(101); err != nil { t.Fatal(err) - } - - if err := checkConsistency(contractSectors, nil, nil, roots[40:]); err != nil { - t.Fatal(err) - } - - // trim half of the contract sectors - changes = []contracts.SectorChange{ - {Action: contracts.SectorActionTrim, A: uint64(len(contractSectors) / 2)}, - } - if err := db.ReviseContract(c, contractSectors, contracts.Usage{}, changes); err != nil { + } else if err := db.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { t.Fatal(err) } - contractSectors = contractSectors[:len(contractSectors)/2] + available, deleted = contract2Sectors, append(deleted, tempSectors...) + assertSectors(t, 15, 0, available, deleted) - if err := checkConsistency(contractSectors, nil, nil, roots[50:]); err != nil { + if err := db.ExpireContractSectors(111); err != nil { t.Fatal(err) - } - - // expire the rest of the contract sectors - if err := db.ExpireContractSectors(c.Revision.WindowEnd + 1); err != nil { - t.Fatal(err) - } - - if err := checkConsistency(nil, nil, nil, roots); err != nil { + } else if err := db.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { t.Fatal(err) } + available, deleted = nil, append(deleted, contract1Sectors...) + assertSectors(t, 0, 0, available, deleted) } func BenchmarkVolumeGrow(b *testing.B) { @@ -870,11 +846,9 @@ func BenchmarkVolumeMigrate(b *testing.B) { roots := make([]types.Hash256, b.N) for i := range roots { roots[i] = frand.Entropy256() - release, err := db.StoreSector(roots[i], func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(roots[i], func(loc storage.SectorLocation) error { return nil }) if err != nil { b.Fatalf("failed to store sector %v: %v", i, err) - } else if err := release(); err != nil { - b.Fatal(err) } } @@ -888,7 +862,7 @@ func BenchmarkVolumeMigrate(b *testing.B) { b.ReportMetric(float64(b.N), "sectors") // migrate all sectors from the first volume to the second - migrated, failed, err := db.MigrateSectors(context.Background(), volume1.ID, 0, func(loc storage.SectorLocation) error { + migrated, failed, err := db.MigrateSectors(context.Background(), volume1.ID, 0, func(from, to storage.SectorLocation) error { return nil }) if err != nil { @@ -918,9 +892,39 @@ func BenchmarkStoreSector(b *testing.B) { b.ReportMetric(float64(b.N), "sectors") for i := 0; i < b.N; i++ { - _, err := db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation, exists bool) error { return nil }) + err := db.StoreSector(frand.Entropy256(), func(loc storage.SectorLocation) error { return nil }) if err != nil { b.Fatal(err) } } } + +func BenchmarkReadSector(b *testing.B) { + log := zaptest.NewLogger(b) + db, err := OpenDatabase(filepath.Join(b.TempDir(), "test.db"), log) + if err != nil { + b.Fatal(err) + } + defer db.Close() + + _, err = addTestVolume(db, "test", uint64(b.N*2)) + if err != nil { + b.Fatal(err) + } + + root := frand.Entropy256() + err = db.StoreSector(root, func(loc storage.SectorLocation) error { return nil }) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + b.ReportMetric(float64(b.N), "sectors") + + for i := 0; i < b.N; i++ { + if _, err := db.SectorLocation(root); err != nil { + b.Fatal(err) + } + } +} diff --git a/rhp/v2/rhp.go b/rhp/v2/rhp.go index 40884e4b..813f44b9 100644 --- a/rhp/v2/rhp.go +++ b/rhp/v2/rhp.go @@ -50,10 +50,8 @@ type ( // Sectors reads and writes sectors to persistent storage Sectors interface { - // Write writes a sector to persistent storage. release should only be - // called after the contract roots have been committed to prevent the - // sector from being deleted. - Write(root types.Hash256, data *[rhp2.SectorSize]byte) (release func() error, _ error) + // Write writes a sector to persistent storage + Write(root types.Hash256, data *[rhp2.SectorSize]byte) error // Read reads the sector with the given root from the manager. Read(root types.Hash256) (*[rhp2.SectorSize]byte, error) // Sync syncs the data files of changed volumes. diff --git a/rhp/v2/rpc.go b/rhp/v2/rpc.go index d9476dd0..e18363ee 100644 --- a/rhp/v2/rpc.go +++ b/rhp/v2/rpc.go @@ -614,15 +614,6 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) (contracts.Usage } defer contractUpdater.Close() - var releaseSectors []func() error - defer func() { - for _, release := range releaseSectors { - if err := release(); err != nil { - log.Warn("failed to release sector", zap.Error(err)) - } - } - }() - oldRoots := contractUpdater.SectorRoots() for _, action := range req.Actions { switch action.Type { @@ -634,13 +625,12 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) (contracts.Usage } sector := (*[rhp2.SectorSize]byte)(action.Data) root := rhp2.SectorRoot(sector) - release, err := sh.sectors.Write(root, sector) + err := sh.sectors.Write(root, sector) if err != nil { err := fmt.Errorf("append action: failed to write sector: %w", err) s.t.WriteResponseErr(err) return contracts.Usage{}, err } - releaseSectors = append(releaseSectors, release) contractUpdater.AppendSector(root) case rhp2.RPCWriteActionTrim: if err := contractUpdater.TrimSectors(action.A); err != nil { @@ -687,13 +677,12 @@ func (sh *SessionHandler) rpcWrite(s *session, log *zap.Logger) (contracts.Usage s.t.WriteResponseErr(err) return contracts.Usage{}, err } - release, err := sh.sectors.Write(root, sector) - if err != nil { + + if err = sh.sectors.Write(root, sector); err != nil { err := fmt.Errorf("append action: failed to write sector: %w", err) s.t.WriteResponseErr(err) return contracts.Usage{}, err } - releaseSectors = append(releaseSectors, release) } } diff --git a/rhp/v3/execute.go b/rhp/v3/execute.go index 45185c80..1d139110 100644 --- a/rhp/v3/execute.go +++ b/rhp/v3/execute.go @@ -45,12 +45,11 @@ type ( updater *contracts.ContractUpdater tempSectors []storage.TempSector - finalize bool - releaseFuncs []func() error + finalize bool log *zap.Logger contracts ContractManager - storage StorageManager + sectors Sectors registry RegistryManager committed bool @@ -103,13 +102,12 @@ func (pe *programExecutor) executeAppendSector(instr *rhp3.InstrAppendSector, lo return nil, nil, fmt.Errorf("failed to pay for instruction: %w", err) } - release, err := pe.storage.Write(root, sector) + err = pe.sectors.Write(root, sector) if errors.Is(err, storage.ErrNotEnoughStorage) { return nil, nil, err } else if err != nil { return nil, nil, ErrHostInternalError } - pe.releaseFuncs = append(pe.releaseFuncs, release) pe.updater.AppendSector(root) if !instr.ProofRequired { @@ -134,12 +132,12 @@ func (pe *programExecutor) executeAppendSectorRoot(instr *rhp3.InstrAppendSector return nil, nil, fmt.Errorf("failed to pay for instruction: %w", err) } - // lock the sector to prevent it from being garbage collected - release, err := pe.storage.LockSector(root) + exists, err := pe.sectors.HasSector(root) if err != nil { return nil, nil, fmt.Errorf("failed to read sector: %w", err) + } else if !exists { + return nil, nil, storage.ErrSectorNotFound } - pe.releaseFuncs = append(pe.releaseFuncs, release) pe.updater.AppendSector(root) if !instr.ProofRequired { return nil, nil, nil @@ -188,17 +186,13 @@ func (pe *programExecutor) executeHasSector(instr *rhp3.InstrHasSector) ([]byte, return nil, nil, fmt.Errorf("failed to pay for instruction: %w", err) } - var has bool - release, err := pe.storage.LockSector(root) - if err != nil && !errors.Is(err, storage.ErrSectorNotFound) { + exists, err := pe.sectors.HasSector(root) + if err != nil { return nil, nil, fmt.Errorf("failed to locate sector %q: %w", root, err) - } else if err == nil { - has = true - pe.releaseFuncs = append(pe.releaseFuncs, release) } output := make([]byte, 1) - if has { + if exists { output[0] = 1 } return output, nil, nil @@ -227,7 +221,7 @@ func (pe *programExecutor) executeReadOffset(instr *rhp3.InstrReadOffset, log *z return nil, nil, fmt.Errorf("failed to get root: %w", err) } - sector, err := pe.storage.Read(root) + sector, err := pe.sectors.Read(root) if err != nil { return nil, nil, fmt.Errorf("failed to read sector: %w", err) } @@ -276,7 +270,7 @@ func (pe *programExecutor) executeReadSector(instr *rhp3.InstrReadSector, log *z } // read the sector - sector, err := pe.storage.Read(root) + sector, err := pe.sectors.Read(root) if errors.Is(err, storage.ErrSectorNotFound) { log.Debug("failed to read sector", zap.String("root", root.String()), zap.Error(err)) return nil, nil, storage.ErrSectorNotFound @@ -365,7 +359,7 @@ func (pe *programExecutor) executeUpdateSector(instr *rhp3.InstrUpdateSector, _ return nil, nil, fmt.Errorf("failed to get root: %w", err) } - sector, err := pe.storage.Read(oldRoot) + sector, err := pe.sectors.Read(oldRoot) if err != nil { return nil, nil, fmt.Errorf("failed to read sector: %w", err) } @@ -378,11 +372,9 @@ func (pe *programExecutor) executeUpdateSector(instr *rhp3.InstrUpdateSector, _ // store the new sector newRoot := rhp2.SectorRoot((*[rhp2.SectorSize]byte)(sector)) - release, err := pe.storage.Write(newRoot, sector) - if err != nil { + if err := pe.sectors.Write(newRoot, sector); err != nil { return nil, nil, fmt.Errorf("failed to write sector: %w", err) } - pe.releaseFuncs = append(pe.releaseFuncs, release) if err := pe.updater.UpdateSector(newRoot, sectorIndex); err != nil { return nil, nil, fmt.Errorf("failed to update sector: %w", err) } @@ -411,11 +403,9 @@ func (pe *programExecutor) executeStoreSector(instr *rhp3.InstrStoreSector, log } // store the sector - release, err := pe.storage.Write(root, sector) - if err != nil { + if err := pe.sectors.Write(root, sector); err != nil { return nil, fmt.Errorf("failed to write sector: %w", err) } - pe.releaseFuncs = append(pe.releaseFuncs, release) // add the sector to the program state pe.tempSectors = append(pe.tempSectors, storage.TempSector{ @@ -631,31 +621,12 @@ func (pe *programExecutor) executeProgram(ctx context.Context) <-chan rhp3.RPCEx return outputs } -func (pe *programExecutor) release() error { - for len(pe.releaseFuncs) > 0 { - release := pe.releaseFuncs[0] - if err := release(); err != nil { - return err - } - pe.releaseFuncs = pe.releaseFuncs[1:] - } - return nil -} - func (pe *programExecutor) rollback() error { if pe.committed { return nil } pe.committed = true - defer func() { - // release all of the locked sectors. Any sectors not referenced by a - // contract or temporary storage will eventually be garbage collected. - if err := pe.release(); err != nil { - pe.log.Error("failed to release sectors", zap.Error(err)) - } - }() - if pe.updater != nil { pe.updater.Close() } @@ -677,15 +648,7 @@ func (pe *programExecutor) commit(s *rhp3.Stream) error { } pe.committed = true - defer func() { - // release all of the locked sectors. Any sectors not referenced by a - // contract or temporary storage will eventually be garbage collected. - if err := pe.release(); err != nil { - pe.log.Error("failed to release sectors", zap.Error(err)) - } - }() - - if err := pe.storage.Sync(); err != nil { + if err := pe.sectors.Sync(); err != nil { s.WriteResponseErr(fmt.Errorf("failed to commit storage: %w", ErrHostInternalError)) return fmt.Errorf("failed to sync storage: %w", err) } @@ -767,7 +730,7 @@ func (pe *programExecutor) commit(s *rhp3.Stream) error { // commit the temporary sectors if len(pe.tempSectors) > 0 { - if err := pe.storage.AddTemporarySectors(pe.tempSectors); err != nil { + if err := pe.sectors.AddTemporarySectors(pe.tempSectors); err != nil { return fmt.Errorf("failed to commit temporary sectors: %w", err) } } @@ -878,7 +841,7 @@ func (sh *SessionHandler) newExecutor(instructions []rhp3.Instruction, data []by log: log, contracts: sh.contracts, - storage: sh.storage, + sectors: sh.sectors, registry: sh.registry, } diff --git a/rhp/v3/rhp.go b/rhp/v3/rhp.go index 536f6bd4..3b6ff796 100644 --- a/rhp/v3/rhp.go +++ b/rhp/v3/rhp.go @@ -49,16 +49,12 @@ type ( ReviseContract(contractID types.FileContractID) (*contracts.ContractUpdater, error) } - // A StorageManager manages the storage of sectors on disk. - StorageManager interface { - // LockSector locks the sector with the given root. If the sector does not - // exist, an error is returned. Release must be called when the sector is no - // longer needed. - LockSector(root types.Hash256) (func() error, error) - // Write writes a sector to persistent storage. release should only be - // called after the contract roots have been committed to prevent the - // sector from being deleted. - Write(root types.Hash256, data *[rhp2.SectorSize]byte) (release func() error, _ error) + // Sectors reads and writes sectors to persistent storage. + Sectors interface { + // HasSector returns true if the sector with the given root is stored + HasSector(root types.Hash256) (bool, error) + // Write writes a sector to persistent storage. + Write(root types.Hash256, data *[rhp2.SectorSize]byte) error // Read reads the sector with the given root from the manager. Read(root types.Hash256) (*[rhp2.SectorSize]byte, error) // Sync syncs the data files of changed volumes. @@ -118,7 +114,7 @@ type ( accounts AccountManager contracts ContractManager registry RegistryManager - storage StorageManager + sectors Sectors settings SettingsReporter chain ChainManager @@ -270,7 +266,7 @@ func (sh *SessionHandler) LocalAddr() string { } // NewSessionHandler creates a new SessionHandler -func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, chain ChainManager, syncer Syncer, wallet Wallet, accounts AccountManager, contracts ContractManager, registry RegistryManager, storage StorageManager, settings SettingsReporter, log *zap.Logger) *SessionHandler { +func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, chain ChainManager, syncer Syncer, wallet Wallet, accounts AccountManager, contracts ContractManager, registry RegistryManager, sectors Sectors, settings SettingsReporter, log *zap.Logger) *SessionHandler { sh := &SessionHandler{ privateKey: hostKey, @@ -284,7 +280,7 @@ func NewSessionHandler(l net.Listener, hostKey types.PrivateKey, chain ChainMana contracts: contracts, registry: registry, settings: settings, - storage: storage, + sectors: sectors, log: log, tg: threadgroup.New(), diff --git a/rhp/v3/rpc_test.go b/rhp/v3/rpc_test.go index 270fd5a3..393019de 100644 --- a/rhp/v3/rpc_test.go +++ b/rhp/v3/rpc_test.go @@ -9,6 +9,7 @@ import ( "reflect" "strings" "testing" + "time" crhp2 "go.sia.tech/core/rhp/v2" crhp3 "go.sia.tech/core/rhp/v3" @@ -325,6 +326,10 @@ func TestStoreSector(t *testing.T) { // mine until the sector expires testutil.MineAndSync(t, node, node.Wallet.Address(), 10) + // ensure the dereferenced sector has been pruned + if err := node.Store.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil { + t.Fatal(err) + } // check that the sector was deleted usage = pt.ReadSectorCost(crhp2.SectorSize)