Skip to content

Commit

Permalink
Merge pull request #519 from SiaFoundation/nate/refactor-volumes
Browse files Browse the repository at this point in the history
Refactor volumes
  • Loading branch information
n8maninger authored Dec 10, 2024
2 parents da67bda + 8c657f6 commit 8403d49
Show file tree
Hide file tree
Showing 25 changed files with 1,012 additions and 1,325 deletions.
7 changes: 7 additions & 0 deletions .changeset/refactor_sector_management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
default: minor
---

# Refactor Sector Management

Improves sector lookups by 50% on average by removing the sector lock tables and moving reference pruning out of the hot path.
2 changes: 1 addition & 1 deletion api/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (a *api) handleGETVerifySector(jc jape.Context) {

// if the sector is not referenced return the empty response without
// attempting to read the sector data
if len(refs.Contracts) == 0 && refs.TempStorage == 0 && refs.Locks == 0 {
if len(refs.Contracts) == 0 && refs.TempStorage == 0 {
jc.Encode(resp)
return
}
Expand Down
1 change: 0 additions & 1 deletion host/accounts/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestUsageTotal(t *testing.T) {
for i := 0; i < uv.NumField(); i++ {
v := types.NewCurrency(frand.Uint64n(math.MaxUint64), 0)
total = total.Add(v)
t.Log("setting field", uv.Type().Field(i).Name, "to", v.ExactString())
uv.Field(i).Set(reflect.ValueOf(v))
}

Expand Down
17 changes: 1 addition & 16 deletions host/contracts/contracts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,12 @@ func TestContractUpdater(t *testing.T) {
}
defer updater.Close()

var releaseFuncs []func() error
defer func() {
for _, release := range releaseFuncs {
if err := release(); err != nil {
t.Fatal(err)
}
}
}()

for i := 0; i < test.append; i++ {
root := frand.Entropy256()
release, err := node.Store.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil })
err := node.Store.StoreSector(root, func(loc storage.SectorLocation) error { return nil })
if err != nil {
t.Fatal(err)
}
releaseFuncs = append(releaseFuncs, release)
updater.AppendSector(root)
roots = append(roots, root)
}
Expand All @@ -135,11 +125,6 @@ func TestContractUpdater(t *testing.T) {
} else if err := updater.Close(); err != nil {
t.Fatal(err)
}
for _, release := range releaseFuncs {
if err := release(); err != nil {
t.Fatal(err)
}
}

// check that the sector roots are correct in the database
allRoots, err := node.Store.SectorRoots()
Expand Down
10 changes: 1 addition & 9 deletions host/contracts/integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,14 @@ func TestCheckIntegrity(t *testing.T) {
defer updater.Close()

var roots []types.Hash256
var releases []func() error
for i := 0; i < 5; i++ {
var sector [rhp2.SectorSize]byte
frand.Read(sector[:256])
root := rhp2.SectorRoot(&sector)
release, err := host.Volumes.Write(root, &sector)
err := host.Volumes.Write(root, &sector)
if err != nil {
t.Fatal(err)
}
releases = append(releases, release)
roots = append(roots, root)
updater.AppendSector(root)
}
Expand All @@ -126,12 +124,6 @@ func TestCheckIntegrity(t *testing.T) {
t.Fatal(err)
}

for _, release := range releases {
if err := release(); err != nil {
t.Fatal(err)
}
}

// helper func to serialize integrity check
checkIntegrity := func() (issues, checked, sectors uint64, err error) {
results, sectors, err := host.Contracts.CheckIntegrity(context.Background(), rev.Revision.ParentID)
Expand Down
62 changes: 15 additions & 47 deletions host/contracts/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,25 +350,15 @@ func TestContractLifecycle(t *testing.T) {
assertContractStatus(t, node.Contracts, rev.Revision.ParentID, contracts.ContractStatusActive)
assertContractMetrics(t, node.Store, 1, 0, hostCollateral, types.ZeroCurrency)

var releaseFuncs []func() error
defer func() {
for _, release := range releaseFuncs {
if err := release(); err != nil {
t.Fatal(err)
}
}
}()

var roots []types.Hash256
for i := 0; i < 5; i++ {
var sector [rhp2.SectorSize]byte
frand.Read(sector[:256])
root := rhp2.SectorRoot(&sector)
release, err := node.Volumes.Write(root, &sector)
if err != nil {

if err := node.Volumes.Write(root, &sector); err != nil {
t.Fatal(err)
}
releaseFuncs = append(releaseFuncs, release)
roots = append(roots, root)
}

Expand Down Expand Up @@ -404,11 +394,6 @@ func TestContractLifecycle(t *testing.T) {
if err != nil {
t.Fatal(err)
}
for _, release := range releaseFuncs {
if err := release(); err != nil {
t.Fatal(err)
}
}

assertContractMetrics(t, node.Store, 1, 0, hostCollateral, collateral)

Expand Down Expand Up @@ -637,17 +622,15 @@ func TestContractLifecycle(t *testing.T) {
assertContractMetrics(t, node.Store, 1, 0, hostCollateral, types.ZeroCurrency)

// add sectors to the volume manager
var releaseFuncs []func() error
var roots []types.Hash256
for i := 0; i < 5; i++ {
var sector [rhp2.SectorSize]byte
frand.Read(sector[:])
root := rhp2.SectorRoot(&sector)
release, err := node.Volumes.Write(root, &sector)
err := node.Volumes.Write(root, &sector)
if err != nil {
t.Fatal(err)
}
releaseFuncs = append(releaseFuncs, release)
roots = append(roots, root)
}

Expand Down Expand Up @@ -684,13 +667,6 @@ func TestContractLifecycle(t *testing.T) {
t.Fatal(err)
}

// release the sectors
for _, release := range releaseFuncs {
if err := release(); err != nil {
t.Fatal(err)
}
}

assertContractStatus(t, node.Contracts, rev.Revision.ParentID, contracts.ContractStatusActive)
assertContractMetrics(t, node.Store, 1, 0, hostCollateral, collateral)

Expand Down Expand Up @@ -752,6 +728,11 @@ func TestV2ContractLifecycle(t *testing.T) {
assertContractMetrics := func(t *testing.T, locked, risked types.Currency) {
t.Helper()

// ensure any dereferenced sectors have been pruned
if err := node.Store.PruneSectors(context.Background(), time.Now().Add(time.Hour)); err != nil {
t.Fatal(err)
}

m, err := node.Store.Metrics(time.Now())
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -854,11 +835,9 @@ func TestV2ContractLifecycle(t *testing.T) {
root := rhp2.SectorRoot(&sector)
roots := []types.Hash256{root}

release, err := node.Volumes.Write(root, &sector)
if err != nil {
if err := node.Volumes.Write(root, &sector); err != nil {
t.Fatal(err)
}
defer release()

fc.Filesize = proto4.SectorSize
fc.Capacity = proto4.SectorSize
Expand All @@ -873,14 +852,12 @@ func TestV2ContractLifecycle(t *testing.T) {
fc.HostSignature = hostKey.SignHash(sigHash)
fc.RenterSignature = renterKey.SignHash(sigHash)

err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{
err := node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{
Storage: cost,
RiskedCollateral: collateral,
})
if err != nil {
t.Fatal(err)
} else if err := release(); err != nil {
t.Fatal(err)
}
// metrics should not have been updated, contract is still pending
assertContractMetrics(t, types.ZeroCurrency, types.ZeroCurrency)
Expand Down Expand Up @@ -914,11 +891,9 @@ func TestV2ContractLifecycle(t *testing.T) {
root := frand.Entropy256() // random root
roots := []types.Hash256{root}

release, err := node.Volumes.Write(root, &sector)
if err != nil {
if err := node.Volumes.Write(root, &sector); err != nil {
t.Fatal(err)
}
defer release()

fc.Filesize = proto4.SectorSize
fc.Capacity = proto4.SectorSize
Expand All @@ -933,14 +908,12 @@ func TestV2ContractLifecycle(t *testing.T) {
fc.HostSignature = hostKey.SignHash(sigHash)
fc.RenterSignature = renterKey.SignHash(sigHash)

err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{
err := node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{
Storage: cost,
RiskedCollateral: collateral,
})
if err != nil {
t.Fatal(err)
} else if err := release(); err != nil {
t.Fatal(err)
}
// metrics should not have been updated, contract is still pending
assertContractMetrics(t, types.ZeroCurrency, types.ZeroCurrency)
Expand Down Expand Up @@ -974,11 +947,9 @@ func TestV2ContractLifecycle(t *testing.T) {
root := rhp2.SectorRoot(&sector)
roots := []types.Hash256{root}

release, err := node.Volumes.Write(root, &sector)
if err != nil {
if err := node.Volumes.Write(root, &sector); err != nil {
t.Fatal(err)
}
defer release()

fc.Filesize = proto4.SectorSize
fc.Capacity = proto4.SectorSize
Expand All @@ -993,14 +964,12 @@ func TestV2ContractLifecycle(t *testing.T) {
fc.HostSignature = hostKey.SignHash(sigHash)
fc.RenterSignature = renterKey.SignHash(sigHash)

err = node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{
err := node.Contracts.ReviseV2Contract(contractID, fc, roots, proto4.Usage{
Storage: cost,
RiskedCollateral: collateral,
})
if err != nil {
t.Fatal(err)
} else if err := release(); err != nil {
t.Fatal(err)
}

// mine to confirm the contract
Expand Down Expand Up @@ -1264,11 +1233,10 @@ func TestSectorRoots(t *testing.T) {
for i := 0; i < sectors; i++ {
root, err := func() (types.Hash256, error) {
root := frand.Entropy256()
release, err := node.Store.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil })
err := node.Store.StoreSector(root, func(loc storage.SectorLocation) error { return nil })
if err != nil {
return types.Hash256{}, fmt.Errorf("failed to store sector: %w", err)
}
defer release()

updater, err := node.Contracts.ReviseContract(rev.Revision.ParentID)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion host/storage/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package storage

import "go.uber.org/zap"
import (
"time"

"go.uber.org/zap"
)

// A VolumeManagerOption configures a VolumeManager.
type VolumeManagerOption func(*VolumeManager)
Expand All @@ -25,3 +29,11 @@ func WithCacheSize(cacheSize int) VolumeManagerOption {
s.cacheSize = cacheSize
}
}

// WithPruneInterval sets the time between cleaning up dereferenced
// sectors.
func WithPruneInterval(d time.Duration) VolumeManagerOption {
return func(vm *VolumeManager) {
vm.pruneInterval = d
}
}
43 changes: 28 additions & 15 deletions host/storage/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package storage
import (
"context"
"errors"
"time"

"go.sia.tech/core/types"
)

type (
// MigrateFunc is a callback function that is called for each sector that
// needs to be migrated If the function returns an error, the sector should
// be skipped and migration should continue.
MigrateFunc func(location SectorLocation) error
// StoreFunc is called for every sector that needs written
// to disk.
StoreFunc func(loc SectorLocation) error
// MigrateFunc is called for every sector that needs migration.
// The sector should be migrated from 'from' to 'to' during
// migrate func.
MigrateFunc func(from, to SectorLocation) error

// A VolumeStore stores and retrieves information about storage volumes.
VolumeStore interface {
Expand Down Expand Up @@ -42,31 +46,40 @@ type (
// SetAvailable sets the available flag on a volume.
SetAvailable(volumeID int64, available bool) error

// PruneSectors removes all sectors that have not been accessed since
// lastAccess and are no longer referenced by a contract or temp storage.
// If the context is canceled, pruning is stopped and the function returns
// with the error.
PruneSectors(ctx context.Context, lastAccess time.Time) error

// MigrateSectors returns a new location for each occupied sector of a
// volume starting at min. The sector data should be copied to the new
// location and synced to disk during migrateFn. If migrateFn returns an
// error, migration will continue, but that sector is not migrated.
MigrateSectors(ctx context.Context, volumeID int64, min uint64, migrateFn MigrateFunc) (migrated, failed int, err error)
MigrateSectors(ctx context.Context, volumeID int64, min uint64, fn MigrateFunc) (migrated, failed int, err error)
// StoreSector calls fn with an empty location in a writable volume. If
// the sector root already exists, fn is called with the existing
// location and exists is true. Unless exists is true, The sector must
// be written to disk within fn. If fn returns an error, the metadata is
// rolled back. If no space is available, ErrNotEnoughStorage is
// returned. The location is locked until release is called.
//
// The sector should be referenced by either a contract or temp store
// before release is called to prevent Prune() from removing it.
StoreSector(root types.Hash256, fn func(loc SectorLocation, exists bool) error) (release func() error, err error)
// the sector root already exists, nil is returned. The sector should be
// written to disk within fn. If fn returns an error, the metadata is
// rolled back and the error is returned. If no space is available,
// ErrNotEnoughStorage is returned.
StoreSector(root types.Hash256, fn StoreFunc) error
// RemoveSector removes the metadata of a sector and returns its
// location in the volume.
RemoveSector(root types.Hash256) error
// HasSector returns true if the sector is stored by the host.
HasSector(root types.Hash256) (bool, error)
// SectorLocation returns the location of a sector or an error if the
// sector is not found. The location is locked until release is
// called.
SectorLocation(root types.Hash256) (loc SectorLocation, release func() error, err error)
SectorLocation(root types.Hash256) (loc SectorLocation, err error)
// AddTempSector adds a sector to temporary storage. The sectors will be deleted
// after the expiration height
AddTempSector(root types.Hash256, expiration uint64) error
// AddTemporarySectors adds a list of sectors to the temporary store.
// The sectors are not referenced by a contract and will be removed
// at the expiration height.
//
// Deprecated: use AddTempSector
AddTemporarySectors(sectors []TempSector) error
// ExpireTempSectors removes all temporary sectors that expired before
// the given height.
Expand Down
Loading

0 comments on commit 8403d49

Please sign in to comment.