diff --git a/cmd/evm/runner.go b/cmd/evm/runner.go index 556a0e7cbe1..e25e4f51e72 100644 --- a/cmd/evm/runner.go +++ b/cmd/evm/runner.go @@ -171,7 +171,7 @@ func runCmd(ctx *cli.Context) error { } else { genesisConfig = new(types.Genesis) } - agg, err := state2.NewAggregator(context.Background(), datadir.New(os.TempDir()), config3.DefaultStepSize, db, log.New()) + agg, err := state2.NewAggregator(context.Background(), datadir.New(os.TempDir()), config3.DefaultStepSize, db, log.New(), false) if err != nil { return err } diff --git a/cmd/evm/staterunner.go b/cmd/evm/staterunner.go index 6b2270207bc..3e4c0c68384 100644 --- a/cmd/evm/staterunner.go +++ b/cmd/evm/staterunner.go @@ -141,7 +141,7 @@ func aggregateResultsFromStateTests( MustOpen() defer _db.Close() - agg, err := libstate.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, _db, log.New()) + agg, err := libstate.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, _db, log.New(), false) if err != nil { return nil, err } diff --git a/cmd/integration/commands/flags.go b/cmd/integration/commands/flags.go index a61fcaab2d3..d2cfe832c82 100644 --- a/cmd/integration/commands/flags.go +++ b/cmd/integration/commands/flags.go @@ -56,6 +56,8 @@ var ( chainTipMode bool syncCfg = ethconfig.Defaults.Sync + + csvOutput string ) func must(err error) { diff --git a/cmd/integration/commands/idx_compare.go b/cmd/integration/commands/idx_compare.go new file mode 100644 index 00000000000..00cbd98ce5f --- /dev/null +++ b/cmd/integration/commands/idx_compare.go @@ -0,0 +1,194 @@ +package commands + +import ( + "bytes" + "encoding/binary" + "io/fs" + "log" + "os" + "path/filepath" + "strings" + + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/common/hexutility" + "github.com/erigontech/erigon-lib/recsplit" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/seg" + "github.com/spf13/cobra" +) + +func readEliasFanoOrOpt(v []byte, baseTxNum uint64) *eliasfano32.EliasFano { + if v[0]&0b10000000 == 0 { + ef, _ := eliasfano32.ReadEliasFano(v) + return ef + } + + // not eliasfano, decode + count := (len(v) - 1) / 4 + max := uint64(binary.BigEndian.Uint32(v[len(v)-4:])) + baseTxNum + ef := eliasfano32.NewEliasFano(uint64(count), max) + for i := 1; i <= len(v)-4; i += 4 { + n := uint64(binary.BigEndian.Uint32(v[i:i+4])) + baseTxNum + ef.AddOffset(n) + } + ef.Build() + return ef +} + +func compareOpt4(vOrig, vOpt []byte, baseTxNum uint64) bool { + efOrig, _ := eliasfano32.ReadEliasFano(vOrig) + efOpt := readEliasFanoOrOpt(vOpt, baseTxNum) + + if efOpt.Count() > efOrig.Count() { + log.Print("Optimized eliasfano is longer") + return false + } + if efOrig.Count() > efOpt.Count() { + log.Print("Optimized eliasfano is shorter") + return false + } + + itOrig := efOrig.Iterator() + itOpt := efOpt.Iterator() + for itOrig.HasNext() { + nOrig, err := itOrig.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + nOpt, err := itOpt.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + if nOrig != nOpt { + log.Printf("values mismatch: orig=%d new=%d", nOrig, nOpt) + log.Printf("orig=%v new=%v", hexutility.Encode(vOrig), hexutility.Encode(vOpt)) + return false + } + } + + return true +} + +var idxCompare = &cobra.Command{ + Use: "idx_compare", + Short: "After an idx_optimize execution, deep compare original and optimized .ef files", + Run: func(cmd *cobra.Command, args []string) { + ctx, _ := common.RootContext() + + idxPath := filepath.Join(datadirCli, "snapshots", "idx") + idxDir := os.DirFS(idxPath) + + files, err := fs.ReadDir(idxDir, ".") + if err != nil { + log.Fatalf("Failed to read directory contents: %v", err) + } + + log.Println("Comparing idx files:") + for _, file := range files { + if file.IsDir() || !strings.HasSuffix(file.Name(), ".ef") { + continue + } + + log.Printf("Checking file %s...", file.Name()) + + efi, err := recsplit.OpenIndex(datadirCli + "/snapshots/accessor/" + file.Name() + "i.new") + if err != nil { + log.Fatalf("Failed to open index: %v", err) + } + defer efi.Close() + + reader := efi.GetReaderFromPool() + defer reader.Close() + + // original .ef file + idxOrig, err := seg.NewDecompressor(datadirCli + "/snapshots/idx/" + file.Name()) + if err != nil { + log.Fatalf("Failed to open decompressor: %v", err) + } + defer idxOrig.Close() + + // reencoded optimized .ef.new file + idxOpt, err := seg.NewDecompressor(datadirCli + "/snapshots/idx/" + file.Name() + ".new") + if err != nil { + log.Fatalf("Failed to open decompressor: %v", err) + } + defer idxOpt.Close() + + g := idxOrig.MakeGetter() + readerOrig := seg.NewReader(g, seg.CompressNone) + readerOrig.Reset(0) + + g = idxOpt.MakeGetter() + readerOpt := seg.NewReader(g, seg.CompressNone) + readerOpt.Reset(0) + + // .ef.new MUST have a magic kv pair with baseTxNum + if !readerOpt.HasNext() { + log.Fatalf("reader doesn't have magic kv!") + } + k, _ := readerOpt.Next(nil) + if !bytes.Equal(k, MAGIC_KEY_BASE_TX_NUM) { + log.Fatalf("magic k is incorrect: %v", hexutility.Encode(k)) + } + if !readerOpt.HasNext() { + log.Fatalf("reader doesn't have magic number!") + } + v, prevKeyOffset := readerOpt.Next(nil) + if len(v) != 8 { + log.Fatalf("baseTxNum is not a uint64: %v", hexutility.Encode(v)) + } + baseTxNum := binary.BigEndian.Uint64(v) + + for readerOrig.HasNext() { + if !readerOpt.HasNext() { + log.Fatal("opt reader doesn't have next!") + } + + kOrig, _ := readerOrig.Next(nil) + kOpt, _ := readerOpt.Next(nil) + if !bytes.Equal(kOrig, kOpt) { + log.Fatalf("key mismatch!") + } + + if !readerOrig.HasNext() { + log.Fatal("orig reader doesn't have next!") + } + if !readerOpt.HasNext() { + log.Fatal("opt reader doesn't have next!") + } + + // orig/opt value comparison + vOrig, _ := readerOrig.Next(nil) + vOpt, nextKeyOffset := readerOpt.Next(nil) + if !compareOpt4(vOrig, vOpt, baseTxNum) { + log.Fatalf("value mismatch!") + } + + // checks new efi lookup points to the same value + offset, found := reader.TwoLayerLookup(kOpt) + if !found { + log.Fatalf("key %v not found in efi", hexutility.Encode(kOpt)) + } + if offset != prevKeyOffset { + log.Fatalf("offset mismatch: %d != %d", offset, prevKeyOffset) + } + prevKeyOffset = nextKeyOffset + + select { + case <-ctx.Done(): + return + default: + } + } + idxOrig.Close() + idxOpt.Close() + reader.Close() + efi.Close() + } + }, +} + +func init() { + withDataDir(idxCompare) + rootCmd.AddCommand(idxCompare) +} diff --git a/cmd/integration/commands/idx_optimize.go b/cmd/integration/commands/idx_optimize.go new file mode 100644 index 00000000000..7a533ccdd8f --- /dev/null +++ b/cmd/integration/commands/idx_optimize.go @@ -0,0 +1,198 @@ +package commands + +import ( + "encoding/binary" + "io/fs" + "log" + "os" + "path/filepath" + "strings" + + "github.com/erigontech/erigon-lib/common/background" + "github.com/erigontech/erigon-lib/common/hexutility" + "github.com/erigontech/erigon-lib/config3" + lllog "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/recsplit" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/state" + + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/seg" + "github.com/erigontech/erigon/turbo/debug" + "github.com/spf13/cobra" +) + +var MAGIC_KEY_BASE_TX_NUM = hexutility.MustDecodeHex("0x8453FFFFFFFFFFFFFFFFFFFF") + +// Delta encoding starting from 1st elem; only for ef sequences < 16 elems +// +// Encode all elems as deltas from baseTxId; they can fit into uint32 +// because max delta is bounded by 64 * stepSize == 100M +// hence size == count * sizeof(uint32) + 1 byte for encoding type +func doOpt4(baseTxNum uint64, v []byte) ([]byte, error) { + ef, _ := eliasfano32.ReadEliasFano(v) + count := ef.Count() + if count < 16 { + if ef.Max()-ef.Min()+1 < uint64(0xffffffff) { + return convertEF(baseTxNum, ef) + } + } + + return v, nil // DO NOT OPTIMIZE; plain elias fano +} + +func convertEF(baseTxNum uint64, ef *eliasfano32.EliasFano) ([]byte, error) { + b := make([]byte, 0, 1+ef.Count()*4) + b = append(b, 0b10000000) + for it := ef.Iterator(); it.HasNext(); { + n, err := it.Next() + if err != nil { + return nil, err + } + n -= baseTxNum + + bn := make([]byte, 4) + binary.BigEndian.PutUint32(bn, uint32(n)) + b = append(b, bn...) + } + return b, nil +} + +var idxOptimize = &cobra.Command{ + Use: "idx_optimize", + Short: "Scan .ef files, backup them up, reencode and optimize them, rebuild .efi files", + Run: func(cmd *cobra.Command, args []string) { + ctx, _ := common.RootContext() + logger := debug.SetupCobra(cmd, "integration") + + // accessorDir := filepath.Join(datadirCli, "snapshots", "accessor") + idxPath := filepath.Join(datadirCli, "snapshots", "idx") + idxDir := os.DirFS(idxPath) + + files, err := fs.ReadDir(idxDir, ".") + if err != nil { + log.Fatalf("Failed to read directory contents: %v", err) + } + + log.Println("Sumarizing idx files...") + cEF := 0 + for _, file := range files { + if file.IsDir() || !strings.HasSuffix(file.Name(), ".ef") { + continue + } + cEF++ + } + + log.Println("Optimizing idx files...") + cOpt := 0 + for _, file := range files { + if file.IsDir() || !strings.HasSuffix(file.Name(), ".ef") { + continue + } + + efInfo, err := parseEFFilename(file.Name()) + if err != nil { + logger.Error("Failed to parse file info: ", err) + } + log.Printf("Optimizing file %s [%d/%d]...", file.Name(), cOpt, cEF) + + // only optimize frozen files for this experiment, because we are not + // implementing collation, merge, etc. support now + // if efInfo.stepSize < 64 { + // log.Printf("Skipping file %s, step size %d < 64", file.Name(), efInfo.stepSize) + // continue + // } + cOpt++ + baseTxNum := efInfo.startStep * config3.DefaultStepSize + + tmpDir := datadirCli + "/temp" + + idxInput, err := seg.NewDecompressor(datadirCli + "/snapshots/idx/" + file.Name()) + if err != nil { + log.Fatalf("Failed to open decompressor: %v", err) + } + defer idxInput.Close() + + idxOutput, err := seg.NewCompressor(ctx, "optimizoor", datadirCli+"/snapshots/idx/"+file.Name()+".new", tmpDir, seg.DefaultCfg, lllog.LvlInfo, logger) + if err != nil { + log.Fatalf("Failed to open compressor: %v", err) + } + defer idxOutput.Close() + + // Summarize 1 idx file + g := idxInput.MakeGetter() + reader := seg.NewReader(g, seg.CompressNone) + reader.Reset(0) + + writer := seg.NewWriter(idxOutput, seg.CompressNone) + writer.AddWord(MAGIC_KEY_BASE_TX_NUM) + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, baseTxNum) + writer.AddWord(b) + + for reader.HasNext() { + k, _ := reader.Next(nil) + if !reader.HasNext() { + log.Fatal("reader doesn't have next!") + } + if err := writer.AddWord(k); err != nil { + log.Fatalf("error while writing key %v", err) + } + + v, _ := reader.Next(nil) + v, err := doOpt4(baseTxNum, v) + if err != nil { + log.Fatalf("error while optimizing value %v", err) + } + if err := writer.AddWord(v); err != nil { + log.Fatalf("error while writing value %v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + } + if err := writer.Compress(); err != nil { + log.Fatalf("error while writing optimized file %v", err) + } + idxInput.Close() + writer.Close() + idxOutput.Close() + + // rebuid .efi; COPIED FROM InvertedIndex.buildMapAccessor + salt, err := state.GetStateIndicesSalt(datadirCli + "/snapshots/") + if err != nil { + log.Fatalf("Failed to build accessor: %v", err) + } + idxPath := datadirCli + "/snapshots/accessor/" + file.Name() + "i.new" + cfg := recsplit.RecSplitArgs{ + Enums: true, + LessFalsePositives: true, + + BucketSize: 2000, + LeafSize: 8, + TmpDir: tmpDir, + IndexFile: idxPath, + Salt: salt, + NoFsync: false, + } + data, err := seg.NewDecompressor(datadirCli + "/snapshots/idx/" + file.Name() + ".new") + if err != nil { + log.Fatalf("Failed to build accessor: %v", err) + } + ps := background.NewProgressSet() + if err := state.BuildAccessor(ctx, data, seg.CompressNone, idxPath, false, cfg, ps, logger); err != nil { + log.Fatalf("Failed to build accessor: %v", err) + } + } + + log.Printf("Optimized %d of %d files!!!", cOpt, cEF) + }, +} + +func init() { + withDataDir(idxOptimize) + rootCmd.AddCommand(idxOptimize) +} diff --git a/cmd/integration/commands/idx_stat.go b/cmd/integration/commands/idx_stat.go new file mode 100644 index 00000000000..d3724add3ab --- /dev/null +++ b/cmd/integration/commands/idx_stat.go @@ -0,0 +1,488 @@ +package commands + +import ( + "bufio" + "encoding/csv" + "io/fs" + "log" + "math" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + + "github.com/RoaringBitmap/roaring/roaring64" + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/config3" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/seg" + "github.com/erigontech/erigon/turbo/cli" + "github.com/erigontech/erigon/turbo/debug" + "github.com/spf13/cobra" +) + +const SIM_COUNT = 9 + +var simulations = [SIM_COUNT]simFunc{ + simOpt1, + simOpt2, + simOpt3, + simOpt4, + simOpt5, + simOpt6, + simOpt7, + simOpt8, + simOpt9, +} + +type simFunc func(v []byte, ef *eliasfano32.EliasFano, baseTxNum uint64) int + +type IdxSummaryEntry struct { + records uint64 + totalBytesK int + totalBytesV int + totalOptBytesV [SIM_COUNT]int + min uint64 + max uint64 +} + +type efFileInfo struct { + prefix string + stepSize uint64 + startStep uint64 + endStep uint64 +} + +func parseEFFilename(fileName string) (*efFileInfo, error) { + parts := strings.Split(fileName, ".") + stepParts := strings.Split(parts[1], "-") + startStep, err := strconv.ParseUint(stepParts[0], 10, 64) + if err != nil { + return nil, err + } + endStep, err := strconv.ParseUint(stepParts[1], 10, 64) + if err != nil { + return nil, err + } + + return &efFileInfo{ + prefix: parts[0], + stepSize: endStep - startStep, + startStep: startStep, + endStep: endStep, + }, nil +} + +// Naive optimization: for ef sequences < 16 elems; concatenate all +// raw uint64, hence size == count * sizeof(uint64) + 1 byte for encoding type +func simOpt1(v []byte, ef *eliasfano32.EliasFano, _ uint64) int { + count := ef.Count() + if count < 16 { + return int(count)*8 + 1 // number of txNums * size of uint64 + 1 byte to signal encoding type + } + + return len(v) // DO NOT OPTIMIZE; plain elias fano +} + +// Delta encoding starting from 2nd elem; only for ef sequences < 16 elems +// +// Encode 1st uint64 elem raw; next elems are deltas of the 1st; they can fit into uint32 +// because max delta is bounded by 64 * stepSize == 100M +// hence size == sizeof(uint64) + (count - 1) * sizeof(uint32) + 1 byte for encoding type +func simOpt2(v []byte, ef *eliasfano32.EliasFano, _ uint64) int { + count := ef.Count() + if count < 16 { + if ef.Max()-ef.Min()+1 < uint64(0xffffffff) { + return 8 + int(count-1)*4 + 1 + } + } + + return len(v) // DO NOT OPTIMIZE; plain elias fano +} + +func simOpt3(v []byte, ef *eliasfano32.EliasFano, _ uint64) int { + count := ef.Count() + if count < 32 { + if ef.Max()-ef.Min()+1 < uint64(0xff) { + return 8 + int(count-1) + 1 + } else if ef.Max()-ef.Min()+1 < uint64(0xffff) { + return 8 + int(count-1)*2 + 1 + } else if ef.Max()-ef.Min()+1 < uint64(0xffffff) { + return 8 + int(count-1)*3 + 1 + } else if ef.Max()-ef.Min()+1 < uint64(0xffffffff) { + return 8 + int(count-1)*4 + 1 + } + } + + return len(v) // DO NOT OPTIMIZE; plain elias fano +} + +// Delta encoding starting from 1st elem; only for ef sequences < 16 elems +// +// Encode all elems as deltas from baseTxId; they can fit into uint32 +// because max delta is bounded by 64 * stepSize == 100M +// hence size == count * sizeof(uint32) + 1 byte for encoding type +func simOpt4(v []byte, ef *eliasfano32.EliasFano, _ uint64) int { + count := ef.Count() + if count < 16 { + if ef.Max()-ef.Min()+1 < uint64(0xffffffff) { + return int(count)*4 + 1 + } + } + + return len(v) // DO NOT OPTIMIZE; plain elias fano +} + +// Same as simOpt4, but applies to sequences up to 99 elems +func simOpt5(v []byte, ef *eliasfano32.EliasFano, _ uint64) int { + count := ef.Count() + if count < 100 { + if ef.Max()-ef.Min()+1 < uint64(0xffffffff) { + return int(count)*4 + 1 + } + } + + return len(v) // DO NOT OPTIMIZE; plain elias fano +} + +// Same as simOpt4, but: +// +// - Drops 4 bytes from ef.Count() (uint64) (bc bounded by 64 steps; fits in uint32) +// - Drops 4 bytes from ef.Max() (uint64) (bc bounded by 64 steps; fits in uint32) +// - Rebase ef sequence to baseTxNum +func simOpt6(v []byte, ef *eliasfano32.EliasFano, baseTxNum uint64) int { + count := ef.Count() + if count < 16 { + if ef.Max()-ef.Min()+1 < uint64(0xffffffff) { + return int(count)*4 + 1 + } + } + + optEf := eliasfano32.NewEliasFano(ef.Count(), ef.Max()-baseTxNum) + for it := ef.Iterator(); it.HasNext(); { + n, err := it.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + optEf.AddOffset(n - baseTxNum) + } + optEf.Build() + var b []byte + b = optEf.AppendBytes(b[:0]) + + return len(b) - 8 // rebased ef - 4 bytes (count) - 4 bytes (max) +} + +// Same as simOpt6, but: +// +// - NOT: Drops 4 bytes from ef.Count() (uint64) (bc bounded by 64 steps; fits in uint32) +// - NOT: Drops 4 bytes from ef.Max() (uint64) (bc bounded by 64 steps; fits in uint32) +// - Rebase ef sequence to baseTxNum <- THIS +func simOpt7(v []byte, ef *eliasfano32.EliasFano, baseTxNum uint64) int { + count := ef.Count() + if count < 16 { + if ef.Max()-ef.Min()+1 < uint64(0xffffffff) { + return int(count)*4 + 1 + } + } + + optEf := eliasfano32.NewEliasFano(ef.Count(), ef.Max()-baseTxNum) + for it := ef.Iterator(); it.HasNext(); { + n, err := it.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + optEf.AddOffset(n - baseTxNum) + } + optEf.Build() + var b []byte + b = optEf.AppendBytes(b[:0]) + + return len(b) // rebased ef +} + +// reencode everything using roaring bitmaps: ef -> bm +// +// size: 1 byte (encoding type) + len(bm) +func simOpt8(v []byte, ef *eliasfano32.EliasFano, _ uint64) int { + bm := roaring64.NewBitmap() + + for it := ef.Iterator(); it.HasNext(); { + n, err := it.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + bm.Add(n) + } + bm.RunOptimize() + + return 1 + int(bm.GetSerializedSizeInBytes()) // 1 byte encoding type + len(serialized bm) +} + +// rebased roaring bitmaps: same as opt-8, but reduce each value +// by baseTxNum stored once somewhere in the file +// +// size: 1 byte (encoding type) + len(bm) +func simOpt9(v []byte, ef *eliasfano32.EliasFano, baseTxNum uint64) int { + bm := roaring64.NewBitmap() + + for it := ef.Iterator(); it.HasNext(); { + n, err := it.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + n -= baseTxNum + bm.Add(n) + } + bm.RunOptimize() + + return 1 + int(bm.GetSerializedSizeInBytes()) // 1 byte encoding type + len(serialized bm) +} + +var idxStat = &cobra.Command{ + Use: "idx_stat", + Short: "Scan .ef files, generates statistics about their contents and simulates optimizations", + Run: func(cmd *cobra.Command, args []string) { + ctx, _ := common.RootContext() + logger := debug.SetupCobra(cmd, "integration") + + // accessorDir := filepath.Join(datadirCli, "snapshots", "accessor") + idxPath := filepath.Join(datadirCli, "snapshots", "idx") + idxDir := os.DirFS(idxPath) + + files, err := fs.ReadDir(idxDir, ".") + if err != nil { + log.Fatalf("Failed to read directory contents: %v", err) + } + + output, err := os.Create(csvOutput) + if err != nil { + log.Fatalf("Failed to create output csv: %v", err) + } + defer output.Close() + bufWriter := bufio.NewWriter(output) + csvWriter := csv.NewWriter(bufWriter) + headers := []string{ + "filename", + "type", + "start step", + "end step", + "step size", + "sequence number of elems", + "record count", + "cumulative key size", + "cumulative value size", + } + for i := range SIM_COUNT { + headers = append(headers, "cumulative value optimized size v"+strconv.Itoa(i+1)) + } + csvWriter.Write(headers) + + log.Println("Reading idx files:") + grandTotalRecords := 0 + grandTotalBytesK := 0 + grandTotalBytesV := 0 + + var grandTotalOptBytesV [SIM_COUNT]int + + grandTotalFiles := 0 + grandIdxMap := make(map[uint64]*IdxSummaryEntry) + for _, file := range files { + if file.IsDir() || !strings.HasSuffix(file.Name(), ".ef") { + continue + } + + info, err := file.Info() + if err != nil { + logger.Error("Failed to get file info: ", err) + } + efInfo, err := parseEFFilename(file.Name()) + if err != nil { + logger.Error("Failed to parse file info: ", err) + } + baseTxNum := efInfo.startStep * config3.DefaultStepSize + + idx, err := seg.NewDecompressor(datadirCli + "/snapshots/idx/" + file.Name()) + if err != nil { + log.Fatalf("Failed to open decompressor: %v", err) + } + defer idx.Close() + + // Summarize 1 idx file + g := idx.MakeGetter() + reader := seg.NewReader(g, seg.CompressNone) + reader.Reset(0) + + idxMap := make(map[uint64]*IdxSummaryEntry) + for reader.HasNext() { + k, _ := reader.Next(nil) + if !reader.HasNext() { + log.Fatal("reader doesn't have next!") + } + + v, _ := reader.Next(nil) + ef, _ := eliasfano32.ReadEliasFano(v) + + select { + case <-ctx.Done(): + return + default: + } + + // group all occurrences >= N + bucket := ef.Count() + if bucket >= 1_000_000 { + bucket = 1_000_000 + } else if bucket >= 100_000 { + bucket = 100_000 + } else if bucket >= 10_000 { + bucket = 10_000 + } else if bucket >= 1_000 { + bucket = 1_000 + } else if bucket >= 100 { + bucket = 100 + } else if bucket >= 16 { + bucket = 16 + } + + t, ok := idxMap[bucket] + if !ok { + t = &IdxSummaryEntry{ + min: math.MaxInt, + } + idxMap[bucket] = t + } + t.records++ + t.totalBytesK += len(k) + t.totalBytesV += len(v) + + for i, sim := range simulations { + t.totalOptBytesV[i] += sim(v, ef, baseTxNum) + } + t.min = min(t.min, ef.Min()) + t.max = max(t.max, ef.Max()) + } + idx.Close() + + // Print idx file summary + logger.Info("Summary for", "idx", file.Name()) + totalRecords := 0 + totalBytesK := 0 + totalBytesV := 0 + var totalOptBytesV [SIM_COUNT]int + keys := make([]uint64, 0, len(idxMap)) + for b, t := range idxMap { + keys = append(keys, b) + + totalRecords += int(t.records) + totalBytesK += t.totalBytesK + totalBytesV += t.totalBytesV + for i := range totalOptBytesV { + totalOptBytesV[i] += t.totalOptBytesV[i] + } + + gt, ok := grandIdxMap[b] + if !ok { + gt = &IdxSummaryEntry{ + min: math.MaxInt, + } + grandIdxMap[b] = gt + } + gt.records += t.records + gt.totalBytesK += t.totalBytesK + gt.totalBytesV += t.totalBytesV + for i := range gt.totalOptBytesV { + gt.totalOptBytesV[i] += t.totalOptBytesV[i] + } + } + + slices.Sort(keys) + for _, b := range keys { + t := idxMap[b] + var effec [SIM_COUNT]string + for i := range effec { + effec[i] = "yes" + if t.totalOptBytesV[i] > t.totalBytesV { + effec[i] = "NO" + } + } + + logger.Info(info.Name(), + "bucket", b, + "recs", t.records, + "k", t.totalBytesK, + "v", t.totalBytesV, + ) + + data := []string{ + info.Name(), + efInfo.prefix, + strconv.FormatUint(efInfo.startStep, 10), + strconv.FormatUint(efInfo.endStep, 10), + strconv.FormatUint(efInfo.stepSize, 10), + strconv.FormatUint(b, 10), + strconv.FormatUint(t.records, 10), + strconv.Itoa(t.totalBytesK), + strconv.Itoa(t.totalBytesV), + } + for _, v := range t.totalOptBytesV { + data = append(data, strconv.Itoa(v)) + } + csvWriter.Write(data) + if err := csvWriter.Error(); err != nil { + log.Fatalf("Error writing csv: %v", err) + } + } + logger.Info("TOTALS", + "totalBytes", totalBytesK+totalBytesV, + "totalBytesFile", info.Size(), + "%keys", 100*float64(totalBytesK)/float64(totalBytesK+totalBytesV), + "%values", 100*float64(totalBytesV)/float64(totalBytesK+totalBytesV), + ) + + grandTotalRecords += totalRecords + grandTotalBytesK += totalBytesK + grandTotalBytesV += totalBytesV + for i, v := range totalOptBytesV { + grandTotalOptBytesV[i] += v + } + grandTotalFiles += int(info.Size()) + } + + // GRAND TOTALS + keys := make([]uint64, 0, len(grandIdxMap)) + for b := range grandIdxMap { + keys = append(keys, b) + } + slices.Sort(keys) + for _, b := range keys { + gt := grandIdxMap[b] + logger.Info("ALL", + "bucket", b, + "recs", gt.records, + "k", gt.totalBytesK, + "v", gt.totalBytesV, + "%value", 100*(float64(gt.totalBytesV)/float64(grandTotalBytesV)), + ) + } + + logger.Info("GRAND TOTALS", + "totalBytes", grandTotalBytesK+grandTotalBytesV, + "totalBytesFile", grandTotalFiles, + "%keys", 100*float64(grandTotalBytesK)/float64(grandTotalBytesK+grandTotalBytesV), + "%values", 100*float64(grandTotalBytesV)/float64(grandTotalBytesK+grandTotalBytesV), + ) + + csvWriter.Flush() + if err := csvWriter.Error(); err != nil { + log.Fatalf("Error writing csv: %v", err) + } + }, +} + +func init() { + withDataDir(idxStat) + idxStat.Flags().StringVar(&csvOutput, "csv", cli.CsvOutput.Value, cli.CsvOutput.Usage) + rootCmd.AddCommand(idxStat) +} diff --git a/cmd/integration/commands/idx_verify.go b/cmd/integration/commands/idx_verify.go new file mode 100644 index 00000000000..a86ee25cb7b --- /dev/null +++ b/cmd/integration/commands/idx_verify.go @@ -0,0 +1,199 @@ +package commands + +import ( + "bytes" + "io/fs" + "log" + "os" + "path/filepath" + "strings" + + "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/common/hexutility" + "github.com/erigontech/erigon-lib/config3" + "github.com/erigontech/erigon-lib/recsplit" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/recsplit/multiencseq" + "github.com/erigontech/erigon-lib/seg" + "github.com/spf13/cobra" +) + +var idxVerify = &cobra.Command{ + Use: "idx_verify", + Short: "After a genesis sync + snapshot regen, deep compare original and optimized .ef files of 2 E3 instances", + Run: func(cmd *cobra.Command, args []string) { + ctx, _ := common.RootContext() + + sourceIdxPath := filepath.Join(sourceDirCli, "snapshots", "idx") + sourceIdxDir := os.DirFS(sourceIdxPath) + + files, err := fs.ReadDir(sourceIdxDir, ".") + if err != nil { + log.Fatalf("Failed to read directory contents: %v", err) + } + + log.Println("Comparing idx files:") + F: + for _, file := range files { + if file.IsDir() || !strings.HasSuffix(file.Name(), ".ef") { + continue + } + + log.Printf("Deep checking file %s...", file.Name()) + + efInfo, err := parseEFFilename(file.Name()) + if err != nil { + log.Fatalf("Failed to parse file info: %v", err) + } + baseTxNum := efInfo.startStep * config3.DefaultStepSize + + targetEfi, err := recsplit.OpenIndex(targetDirCli + "/snapshots/accessor/" + file.Name() + "i") + if err != nil { + log.Fatalf("Failed to open index: %v", err) + } + defer targetEfi.Close() + + targetEfiReader := targetEfi.GetReaderFromPool() + defer targetEfiReader.Close() + + // original .ef file + sourceIdx, err := seg.NewDecompressor(sourceDirCli + "/snapshots/idx/" + file.Name()) + if err != nil { + log.Fatalf("Failed to open decompressor: %v", err) + } + defer sourceIdx.Close() + + // reencoded optimized .ef file + targetIdx, err := seg.NewDecompressor(targetDirCli + "/snapshots/idx/" + file.Name()) + if err != nil { + log.Fatalf("Failed to open decompressor: %v", err) + } + defer targetIdx.Close() + + g := sourceIdx.MakeGetter() + sourceReader := seg.NewReader(g, seg.CompressNone) + sourceReader.Reset(0) + + g = targetIdx.MakeGetter() + targetReader := seg.NewReader(g, seg.CompressNone) + targetReader.Reset(0) + + prevKeyOffset := uint64(0) + for sourceReader.HasNext() { + if !targetReader.HasNext() { + log.Printf("target reader doesn't have next!") + log.Println("skipping to next file...") + continue F + } + + sourceK, _ := sourceReader.Next(nil) + targetK, _ := targetReader.Next(nil) + if !bytes.Equal(sourceK, targetK) { + log.Printf("key mismatch!") + log.Println("skipping to next file...") + continue F + } + + if !sourceReader.HasNext() { + log.Println("source reader doesn't have next!") + log.Println("skipping to next file...") + continue F + } + if !targetReader.HasNext() { + log.Println("target reader doesn't have next!") + log.Println("skipping to next file...") + continue F + } + + // source/target semantic value comparison + sourceV, _ := sourceReader.Next(nil) + targetV, nextKeyOffset := targetReader.Next(nil) + if !compareSequences(sourceK, sourceV, targetV, baseTxNum) { + log.Println("value mismatch!") + log.Println("skipping to next file...") + continue F + } + + // checks new efi lookup points to the same value + offset, found := targetEfiReader.TwoLayerLookup(targetK) + if !found { + log.Printf("key %v not found in efi", hexutility.Encode(targetK)) + log.Println("skipping to next file...") + continue F + } + if offset != prevKeyOffset { + log.Printf("offset mismatch: %d != %d", offset, prevKeyOffset) + log.Println("skipping to next file...") + continue F + } + prevKeyOffset = nextKeyOffset + + select { + case <-ctx.Done(): + return + default: + } + } + sourceIdx.Close() + targetIdx.Close() + targetEfiReader.Close() + targetEfi.Close() + } + }, +} + +func compareSequences(sourceK, sourceV, targetV []byte, baseTxNum uint64) bool { + sourceEf, _ := eliasfano32.ReadEliasFano(sourceV) + targetSeq := multiencseq.ReadMultiEncSeq(baseTxNum, targetV) + + if targetSeq.Count() > sourceEf.Count() { + log.Print("Optimized eliasfano is longer") + log.Printf("key=%s", hexutility.Encode(sourceK)) + log.Printf("source min=%d max=%d count=%d", sourceEf.Min(), sourceEf.Max(), sourceEf.Count()) + log.Printf("target min=%d max=%d count=%d", targetSeq.Min(), targetSeq.Max(), targetSeq.Count()) + return false + } + if sourceEf.Count() > targetSeq.Count() { + log.Print("Optimized eliasfano is shorter") + log.Printf("key=%s", hexutility.Encode(sourceK)) + log.Printf("source min=%d max=%d count=%d", sourceEf.Min(), sourceEf.Max(), sourceEf.Count()) + log.Printf("target min=%d max=%d count=%d", targetSeq.Min(), targetSeq.Max(), targetSeq.Count()) + return false + } + + sourceIt := sourceEf.Iterator() + targetIt := targetSeq.Iterator(0) + for sourceIt.HasNext() { + sourceN, err := sourceIt.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + targetN, err := targetIt.Next() + if err != nil { + log.Fatalf("Failed to read next: %v", err) + } + if sourceN != targetN { + log.Printf("values mismatch: source=%d target=%d", sourceN, targetN) + log.Printf("key=%s", hexutility.Encode(sourceK)) + log.Printf("source min=%d max=%d count=%d", sourceEf.Min(), sourceEf.Max(), sourceEf.Count()) + log.Printf("target min=%d max=%d count=%d", targetSeq.Min(), targetSeq.Max(), targetSeq.Count()) + return false + } + } + + return true +} + +func init() { + idxVerify.Flags().StringVar(&sourceDirCli, "sourcedir", "", "data directory of original E3 instance") + must(idxVerify.MarkFlagRequired("sourcedir")) + must(idxVerify.MarkFlagDirname("sourcedir")) + + idxVerify.Flags().StringVar(&targetDirCli, "targetdir", "", "data directory of optimized E3 instance") + must(idxVerify.MarkFlagRequired("targetdir")) + must(idxVerify.MarkFlagDirname("targetdir")) + + rootCmd.AddCommand(idxVerify) +} + +var sourceDirCli, targetDirCli string diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 04ace959944..21c95b473ed 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -1330,7 +1330,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl blockReader := freezeblocks.NewBlockReader(_allSnapshotsSingleton, _allBorSnapshotsSingleton, _heimdallStoreSingleton, _bridgeStoreSingleton) txNums := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader)) - _aggSingleton, err = libstate.NewAggregator(ctx, dirs, config3.DefaultStepSize, db, logger) + _aggSingleton, err = libstate.NewAggregator(ctx, dirs, config3.DefaultStepSize, db, logger, false) if err != nil { panic(err) } diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index 30d26464933..d6bc1dcfdc2 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -423,7 +423,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, heimdallStore, bridgeStore) txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader)) - agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.DefaultStepSize, rawDB, logger) + agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.DefaultStepSize, rawDB, logger, false) if err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err) } diff --git a/cmd/state/commands/opcode_tracer.go b/cmd/state/commands/opcode_tracer.go index a5133f2a46c..5ec34c77a7f 100644 --- a/cmd/state/commands/opcode_tracer.go +++ b/cmd/state/commands/opcode_tracer.go @@ -436,7 +436,7 @@ func OpcodeTracer(genesis *types.Genesis, blockNum uint64, chaindata string, num rawChainDb := mdbx.MustOpen(dirs.Chaindata) defer rawChainDb.Close() - agg, err := state2.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, rawChainDb, log.New()) + agg, err := state2.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, rawChainDb, log.New(), false) if err != nil { return err } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b6e63c7e184..6c870f69038 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1098,6 +1098,12 @@ var ( Usage: "Enable 'chaos monkey' to generate spontaneous network/consensus/etc failures. Use ONLY for testing", Value: false, } + + ExperimentalEFOptimizationFlag = cli.BoolFlag{ + Name: "experimental.ef-optimization", + Usage: "Enable experimental elias-fano encoding optimization for indexes. ****FORWARD-COMPATIBLE ONLY****: once enabled on an existing node, new .ef files can't be read if this flag is disabled. Enable it on genesis sync to build snapshots 100% using this new format.", + Value: false, + } ) var MetricFlags = []cli.Flag{&MetricsEnabledFlag, &MetricsHTTPFlag, &MetricsPortFlag, &DiagDisabledFlag, &DiagEndpointAddrFlag, &DiagEndpointPortFlag, &DiagSpeedTestFlag} @@ -2002,6 +2008,10 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C if ctx.IsSet(TxPoolGossipDisableFlag.Name) { cfg.DisableTxPoolGossip = ctx.Bool(TxPoolGossipDisableFlag.Name) } + + if ctx.IsSet(ExperimentalEFOptimizationFlag.Name) { + cfg.ExperimentalEFOptimization = ctx.Bool(ExperimentalEFOptimizationFlag.Name) + } } // SetDNSDiscoveryDefaults configures DNS discovery with the given URL if diff --git a/core/genesis_write.go b/core/genesis_write.go index 4d6b52767ee..f24387c8717 100644 --- a/core/genesis_write.go +++ b/core/genesis_write.go @@ -505,7 +505,7 @@ func GenesisToBlock(g *types.Genesis, dirs datadir.Dirs, logger log.Logger) (*ty genesisTmpDB := mdbx.New(kv.TemporaryDB, logger).InMem(dirs.DataDir).MapSize(2 * datasize.GB).GrowthStep(1 * datasize.MB).MustOpen() defer genesisTmpDB.Close() - agg, err := state2.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, genesisTmpDB, logger) + agg, err := state2.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, genesisTmpDB, logger, false) if err != nil { return err } diff --git a/core/state/state_test.go b/core/state/state_test.go index 7e3ad911053..d12c5ce0a78 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -122,7 +122,7 @@ func (s *StateSuite) SetUpTest(c *checker.C) { db := memdb.NewStateDB("") defer db.Close() - agg, err := stateLib.NewAggregator(context.Background(), datadir.New(""), 16, db, log.New()) + agg, err := stateLib.NewAggregator(context.Background(), datadir.New(""), 16, db, log.New(), false) if err != nil { panic(err) } @@ -394,7 +394,7 @@ func NewTestTemporalDb(tb testing.TB) (kv.TemporalRwDB, kv.TemporalRwTx, *state. db := memdb.NewStateDB(tb.TempDir()) tb.Cleanup(db.Close) - agg, err := state.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New()) + agg, err := state.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New(), false) if err != nil { tb.Fatal(err) } diff --git a/core/test/domains_restart_test.go b/core/test/domains_restart_test.go index 952906fbb6b..dad2549cf7a 100644 --- a/core/test/domains_restart_test.go +++ b/core/test/domains_restart_test.go @@ -66,7 +66,7 @@ func testDbAndAggregatorv3(t *testing.T, fpath string, aggStep uint64) (kv.RwDB, db := mdbx.New(kv.ChainDB, logger).Path(dirs.Chaindata).MustOpen() t.Cleanup(db.Close) - agg, err := state.NewAggregator(context.Background(), dirs, aggStep, db, logger) + agg, err := state.NewAggregator(context.Background(), dirs, aggStep, db, logger, false) require.NoError(t, err) t.Cleanup(agg.Close) err = agg.OpenFolder() diff --git a/core/vm/gas_table_test.go b/core/vm/gas_table_test.go index fc33308a2eb..35f531f058a 100644 --- a/core/vm/gas_table_test.go +++ b/core/vm/gas_table_test.go @@ -102,7 +102,7 @@ func testTemporalDB(t *testing.T) *temporal.DB { t.Cleanup(db.Close) - agg, err := state3.NewAggregator(context.Background(), datadir.New(t.TempDir()), 16, db, log.New()) + agg, err := state3.NewAggregator(context.Background(), datadir.New(t.TempDir()), 16, db, log.New(), false) require.NoError(t, err) t.Cleanup(agg.Close) diff --git a/core/vm/runtime/runtime.go b/core/vm/runtime/runtime.go index 77748306170..50173808493 100644 --- a/core/vm/runtime/runtime.go +++ b/core/vm/runtime/runtime.go @@ -131,7 +131,7 @@ func Execute(code, input []byte, cfg *Config, tempdir string) ([]byte, *state.In if !externalState { db := memdb.NewStateDB(tempdir) defer db.Close() - agg, err := state3.NewAggregator(context.Background(), datadir.New(tempdir), config3.DefaultStepSize, db, log.New()) + agg, err := state3.NewAggregator(context.Background(), datadir.New(tempdir), config3.DefaultStepSize, db, log.New(), false) if err != nil { return nil, nil, err } @@ -193,7 +193,7 @@ func Create(input []byte, cfg *Config, blockNr uint64) ([]byte, libcommon.Addres db := memdb.NewStateDB(tmp) defer db.Close() - agg, err := state3.NewAggregator(context.Background(), datadir.New(tmp), config3.DefaultStepSize, db, log.New()) + agg, err := state3.NewAggregator(context.Background(), datadir.New(tmp), config3.DefaultStepSize, db, log.New(), false) if err != nil { return nil, [20]byte{}, 0, err } diff --git a/core/vm/runtime/runtime_test.go b/core/vm/runtime/runtime_test.go index f3521a1dcf7..a8b38b3cdfc 100644 --- a/core/vm/runtime/runtime_test.go +++ b/core/vm/runtime/runtime_test.go @@ -56,7 +56,7 @@ func NewTestTemporalDb(tb testing.TB) (kv.RwDB, kv.RwTx, *stateLib.Aggregator) { db := memdb.NewStateDB(tb.TempDir()) tb.Cleanup(db.Close) - agg, err := stateLib.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New()) + agg, err := stateLib.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New(), false) if err != nil { tb.Fatal(err) } @@ -177,7 +177,7 @@ func testTemporalDB(t testing.TB) *temporal.DB { t.Cleanup(db.Close) - agg, err := stateLib.NewAggregator(context.Background(), datadir.New(t.TempDir()), 16, db, log.New()) + agg, err := stateLib.NewAggregator(context.Background(), datadir.New(t.TempDir()), 16, db, log.New(), false) require.NoError(t, err) t.Cleanup(agg.Close) diff --git a/erigon-lib/kv/membatchwithdb/memory_mutation_test.go b/erigon-lib/kv/membatchwithdb/memory_mutation_test.go index cd4ce3cd532..48b59eb28d2 100644 --- a/erigon-lib/kv/membatchwithdb/memory_mutation_test.go +++ b/erigon-lib/kv/membatchwithdb/memory_mutation_test.go @@ -212,7 +212,7 @@ func NewTestTemporalDb(tb testing.TB) (kv.RwDB, kv.RwTx, *stateLib.Aggregator) { db := memdb.NewStateDB(tb.TempDir()) tb.Cleanup(db.Close) - agg, err := stateLib.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New()) + agg, err := stateLib.NewAggregator(context.Background(), datadir.New(tb.TempDir()), 16, db, log.New(), false) if err != nil { tb.Fatal(err) } diff --git a/erigon-lib/kv/temporal/temporaltest/kv_temporal_testdb.go b/erigon-lib/kv/temporal/temporaltest/kv_temporal_testdb.go index aba7fec9396..481ea313eef 100644 --- a/erigon-lib/kv/temporal/temporaltest/kv_temporal_testdb.go +++ b/erigon-lib/kv/temporal/temporaltest/kv_temporal_testdb.go @@ -42,7 +42,7 @@ func NewTestDB(tb testing.TB, dirs datadir.Dirs) (kv.TemporalRwDB, *state.Aggreg rawDB = memdb.New(dirs.DataDir, kv.ChainDB) } - agg, err := state.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, rawDB, log.New()) + agg, err := state.NewAggregator(context.Background(), dirs, config3.DefaultStepSize, rawDB, log.New(), false) if err != nil { panic(err) } diff --git a/erigon-lib/recsplit/eliasfano32/rebased_elias_fano.go b/erigon-lib/recsplit/eliasfano32/rebased_elias_fano.go new file mode 100644 index 00000000000..3b7866c359c --- /dev/null +++ b/erigon-lib/recsplit/eliasfano32/rebased_elias_fano.go @@ -0,0 +1,79 @@ +package eliasfano32 + +// This is a wrapper of "plain" EliasFano for optimizing scenarios where the number sequence +// is constrained in a closed range [from, to], so we can store the entire sequence as deltas +// of "from" and save space. +// +// This is specially useful when the starting "from" is a huge number, so the binary representation +// of the Elias Fano sequence can be made smaller. +// +// The baseNum stores the base value which is added to each element when it is accessed. It is +// not meant to be stored together with the serialized data, but derived from some other source, +// like the start txNum of a snapshot file, so it can be globally applied to all sequences in the +// same file, resulting in huge space savings. +type RebasedEliasFano struct { + baseNum uint64 + ef EliasFano +} + +func (ref *RebasedEliasFano) Get(i uint64) uint64 { + return ref.baseNum + ref.ef.Get(i) +} + +func (ref *RebasedEliasFano) Min() uint64 { + return ref.baseNum + ref.ef.Min() +} + +func (ref *RebasedEliasFano) Max() uint64 { + return ref.baseNum + ref.ef.Max() +} + +func (ref *RebasedEliasFano) Count() uint64 { + return ref.ef.Count() +} + +func (ref *RebasedEliasFano) Reset(baseNum uint64, raw []byte) { + ref.baseNum = baseNum + ref.ef.Reset(raw) +} + +func (ref *RebasedEliasFano) Search(v uint64) (uint64, bool) { + n, found := ref.ef.Search(v - ref.baseNum) + return ref.baseNum + n, found +} + +func (ref *RebasedEliasFano) Iterator() *RebasedIterWrapper { + return &RebasedIterWrapper{ + baseNum: ref.baseNum, + it: ref.ef.Iterator(), + } +} + +func (ref *RebasedEliasFano) ReverseIterator() *RebasedIterWrapper { + return &RebasedIterWrapper{ + baseNum: ref.baseNum, + it: ref.ef.ReverseIterator(), + } +} + +type RebasedIterWrapper struct { + baseNum uint64 + it *EliasFanoIter +} + +func (it *RebasedIterWrapper) HasNext() bool { + return it.it.HasNext() +} + +func (it *RebasedIterWrapper) Next() (uint64, error) { + n, err := it.it.Next() + return it.baseNum + n, err +} + +func (it *RebasedIterWrapper) Seek(v uint64) { + it.it.Seek(v - it.baseNum) +} + +func (it *RebasedIterWrapper) Close() { + it.it.Close() +} diff --git a/erigon-lib/recsplit/multiencseq/sequence_builder.go b/erigon-lib/recsplit/multiencseq/sequence_builder.go new file mode 100644 index 00000000000..d33302b4321 --- /dev/null +++ b/erigon-lib/recsplit/multiencseq/sequence_builder.go @@ -0,0 +1,118 @@ +package multiencseq + +import ( + "encoding/binary" + + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" +) + +// Encode sequences up to this length using simple encoding. +// +// The choice of this constant is tightly coupled with the encoding type; we used +// the least significant bits of the encoding byte type to signal simple encoding + +// sequence size. +// +// In this case, we use the range [0b10000000, 0b10001111] to say the sequence must contain +// (N & 0b00001111) + 1 elements, i.e., 0 means 1 element, 1 means 2 elements, etc... +const SIMPLE_SEQUENCE_MAX_THRESHOLD = 16 + +// A SequenceBuilder is used to build serialized number sequences. +// +// It follows the following pattern: +// +// - New builder: NewBuilder() +// - Add offsets: AddOffset() +// - Build: Build() +// - Serialize: AppendBytes() +// +// It contains decision logic to choose the best encoding for the given sequence. +// +// This is the "writer" counterpart of SequenceReader. +type SequenceBuilder struct { + baseNum uint64 + ef *eliasfano32.EliasFano + optimize bool +} + +// Creates a new builder. The builder is not meant to be reused. The construction +// parameters may or may not be used during the build process depending on the +// encoding being used. +// +// The encoding being used depends on the parameters themselves and the characteristics +// of the number sequence. +// +// baseNum: this is used to calculate the deltas on simple encoding and on "rebased elias fano" +// count: this is the number of elements in the sequence, used in case of elias fano +// max: this is maximum value in the sequence, used in case of elias fano +// optimize: if false, the builder will always output plain elias fano; it is "legacy mode", +// and is used to be backwards compatible with E3 default format. If true, it will output +// forward compatible, optimized multiencoding sequences. +func NewBuilder(baseNum, count, max uint64, optimize bool) *SequenceBuilder { + return &SequenceBuilder{ + baseNum: baseNum, + ef: eliasfano32.NewEliasFano(count, max), + optimize: optimize, + } +} + +func (b *SequenceBuilder) AddOffset(offset uint64) { + b.ef.AddOffset(offset) +} + +func (b *SequenceBuilder) Build() { + b.ef.Build() +} + +func (b *SequenceBuilder) AppendBytes(buf []byte) []byte { + if !b.optimize { + return b.ef.AppendBytes(buf) + } + + if b.ef.Count() <= SIMPLE_SEQUENCE_MAX_THRESHOLD { + return b.simpleEncoding(buf) + } + + return b.rebasedEliasFano(buf) +} + +func (b *SequenceBuilder) simpleEncoding(buf []byte) []byte { + // Simple encoding type + size: [0x80, 0x8F] + count := b.ef.Count() + enc := byte(count-1) & byte(0b00001111) + enc |= byte(SimpleEncoding) + buf = append(buf, enc) + + // Encode elems + var bn [4]byte + for it := b.ef.Iterator(); it.HasNext(); { + n, err := it.Next() + if err != nil { + // TODO: err + panic(err) + } + n -= b.baseNum + + binary.BigEndian.PutUint32(bn[:], uint32(n)) + buf = append(buf, bn[:]...) + } + + return buf +} + +func (b *SequenceBuilder) rebasedEliasFano(buf []byte) []byte { + // Reserved encoding type 0x90 == rebased elias fano + buf = append(buf, byte(RebasedEliasFano)) + + // Rebased ef + rbef := eliasfano32.NewEliasFano(b.ef.Count(), b.ef.Max()-b.baseNum) + for it := b.ef.Iterator(); it.HasNext(); { + n, err := it.Next() + if err != nil { + panic(err) + } + + rbef.AddOffset(n - b.baseNum) + } + rbef.Build() + return rbef.AppendBytes(buf) +} diff --git a/erigon-lib/recsplit/multiencseq/sequence_builder_test.go b/erigon-lib/recsplit/multiencseq/sequence_builder_test.go new file mode 100644 index 00000000000..72e1e965013 --- /dev/null +++ b/erigon-lib/recsplit/multiencseq/sequence_builder_test.go @@ -0,0 +1,128 @@ +package multiencseq + +import ( + "testing" + + "github.com/erigontech/erigon-lib/common/hexutility" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/stretchr/testify/require" +) + +func TestMultiEncodingSeqBuilder(t *testing.T) { + + t.Run("no optimization always encode to plain elias fano", func(t *testing.T) { + builder := NewBuilder(1000, 5, 1031, false) + builder.AddOffset(1000) + builder.AddOffset(1007) + builder.AddOffset(1015) + builder.AddOffset(1027) + builder.AddOffset(1031) + builder.Build() + + b := make([]byte, 0) + b = builder.AppendBytes(b) + + // plain EF; builder must build identical serialized sequence + ef := eliasfano32.NewEliasFano(5, 1031) + ef.AddOffset(1000) + ef.AddOffset(1007) + ef.AddOffset(1015) + ef.AddOffset(1027) + ef.AddOffset(1031) + ef.Build() + bExpected := make([]byte, 0) + bExpected = ef.AppendBytes(bExpected) + + require.Equal(t, bExpected, b) + }) + + t.Run("singleton sequence", func(t *testing.T) { + builder := NewBuilder(1000, 1, 1005, true) + builder.AddOffset(1005) + builder.Build() + + b := make([]byte, 0) + b = builder.AppendBytes(b) + require.Equal(t, hexutility.MustDecodeHex("0x8000000005"), b) + }) + + t.Run("short sequences must use simple encoding", func(t *testing.T) { + builder := NewBuilder(1000, 16, 1035, true) + builder.AddOffset(1005) + builder.AddOffset(1007) + builder.AddOffset(1009) + builder.AddOffset(1011) + builder.AddOffset(1013) + builder.AddOffset(1015) + builder.AddOffset(1017) + builder.AddOffset(1019) + builder.AddOffset(1021) + builder.AddOffset(1023) + builder.AddOffset(1025) + builder.AddOffset(1027) + builder.AddOffset(1029) + builder.AddOffset(1031) + builder.AddOffset(1033) + builder.AddOffset(1035) + builder.Build() + + b := make([]byte, 0) + b = builder.AppendBytes(b) + require.Equal(t, hexutility.MustDecodeHex( + "0x8F"+ + "00000005"+ + "00000007"+ + "00000009"+ + "0000000B"+ + "0000000D"+ + "0000000F"+ + "00000011"+ + "00000013"+ + "00000015"+ + "00000017"+ + "00000019"+ + "0000001B"+ + "0000001D"+ + "0000001F"+ + "00000021"+ + "00000023"), b) + }) + + t.Run("large sequences must use rebased elias fano", func(t *testing.T) { + builder := NewBuilder(1000, 17, 1035, true) + builder.AddOffset(1005) + builder.AddOffset(1007) + builder.AddOffset(1009) + builder.AddOffset(1011) + builder.AddOffset(1013) + builder.AddOffset(1015) + builder.AddOffset(1017) + builder.AddOffset(1019) + builder.AddOffset(1021) + builder.AddOffset(1023) + builder.AddOffset(1025) + builder.AddOffset(1027) + builder.AddOffset(1029) + builder.AddOffset(1031) + builder.AddOffset(1033) + builder.AddOffset(1035) + builder.AddOffset(1037) + builder.Build() + + b := make([]byte, 0) + b = builder.AppendBytes(b) + require.Equal(t, b[0], byte(0x90), "encoding type is not 0x90") + + ef, _ := eliasfano32.ReadEliasFano(b[1:]) + require.Equal(t, uint64(17), ef.Count()) + curr := uint64(5) + for it := ef.Iterator(); it.HasNext(); { + n, err := it.Next() + + require.NoError(t, err) + require.Equal(t, curr, n) + + curr += 2 + } + }) +} diff --git a/erigon-lib/recsplit/multiencseq/sequence_reader.go b/erigon-lib/recsplit/multiencseq/sequence_reader.go new file mode 100644 index 00000000000..148bcd4c93a --- /dev/null +++ b/erigon-lib/recsplit/multiencseq/sequence_reader.go @@ -0,0 +1,170 @@ +package multiencseq + +import ( + "github.com/erigontech/erigon-lib/kv/stream" + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/recsplit/simpleseq" +) + +type EncodingType uint8 + +const ( + PlainEliasFano EncodingType = 0b0 + SimpleEncoding EncodingType = 0b10000000 + RebasedEliasFano EncodingType = 0b10010000 +) + +// SequenceReader is used to read serialized number sequences. +// +// It is aware of the different encoding types and can read them transparently. +// +// This is the "reader" counterpart of SequenceBuilder. +type SequenceReader struct { + currentEnc EncodingType + ref eliasfano32.RebasedEliasFano + sseq simpleseq.SimpleSequence +} + +func ReadMultiEncSeq(baseNum uint64, raw []byte) *SequenceReader { + var s SequenceReader + s.Reset(baseNum, raw) + return &s +} + +// This is a specialized "fast" Count method that shouldn't allocate new objects, but read the count directly +// from raw data +func Count(baseNum uint64, data []byte) uint64 { + // plain elias fano (legacy) + if data[0]&0b10000000 == 0 { + return eliasfano32.Count(data) + } + + // rebased elias fano + if data[0] == 0x90 { + return eliasfano32.Count(data[1:]) + } + + // simple encoding + if data[0]&0b11110000 == 0b10000000 { + return uint64(data[0]&0b00001111) + 1 + } + + panic("unknown encoding") +} + +// TODO: optimize me - to avoid object allocation (this TODO was inherited from elias_fano.go) +func Seek(baseNum uint64, data []byte, n uint64) (uint64, bool) { + seq := ReadMultiEncSeq(baseNum, data) + return seq.search(n) +} + +func (s *SequenceReader) Get(i uint64) uint64 { + if s.currentEnc == SimpleEncoding { + return s.sseq.Get(i) + } else if s.currentEnc == PlainEliasFano || s.currentEnc == RebasedEliasFano { + return s.ref.Get(i) + } + + panic("unknown encoding") +} + +func (s *SequenceReader) Min() uint64 { + if s.currentEnc == SimpleEncoding { + return s.sseq.Min() + } else if s.currentEnc == PlainEliasFano || s.currentEnc == RebasedEliasFano { + return s.ref.Min() + } + + panic("unknown encoding") +} + +func (s *SequenceReader) Max() uint64 { + if s.currentEnc == SimpleEncoding { + return s.sseq.Max() + } else if s.currentEnc == PlainEliasFano || s.currentEnc == RebasedEliasFano { + return s.ref.Max() + } + + panic("unknown encoding") +} + +func (s *SequenceReader) Count() uint64 { + if s.currentEnc == SimpleEncoding { + return s.sseq.Count() + } else if s.currentEnc == PlainEliasFano || s.currentEnc == RebasedEliasFano { + return s.ref.Count() + } + + panic("unknown encoding") +} + +func (s *SequenceReader) Reset(baseNum uint64, raw []byte) { + // plain elias fano (legacy) + if raw[0]&0b10000000 == 0 { + s.currentEnc = PlainEliasFano + s.ref.Reset(0, raw) + return + } + + // rebased elias fano + if raw[0] == 0x90 { + s.currentEnc = RebasedEliasFano + s.ref.Reset(baseNum, raw[1:]) + return + } + + // simple encoding + if raw[0]&0b11110000 == 0b10000000 { + s.currentEnc = SimpleEncoding + s.sseq.Reset(baseNum, raw[1:]) + return + } + + panic("unknown encoding") +} + +func (s *SequenceReader) search(v uint64) (uint64, bool) { + if s.currentEnc == SimpleEncoding { + return s.sseq.Search(v) + } else if s.currentEnc == PlainEliasFano || s.currentEnc == RebasedEliasFano { + return s.ref.Search(v) + } + + panic("unknown encoding") +} + +func (s *SequenceReader) Iterator(v int) stream.U64 { + if s.currentEnc == SimpleEncoding { + it := s.sseq.Iterator() + if v > 0 { + it.Seek(uint64(v)) + } + return it + } else if s.currentEnc == PlainEliasFano || s.currentEnc == RebasedEliasFano { + it := s.ref.Iterator() + if v > 0 { + it.Seek(uint64(v)) + } + return it + } + + panic("unknown encoding") +} + +func (s *SequenceReader) ReverseIterator(v int) stream.U64 { + if s.currentEnc == SimpleEncoding { + it := s.sseq.ReverseIterator() + if v > 0 { + it.Seek(uint64(v)) + } + return it + } else if s.currentEnc == PlainEliasFano || s.currentEnc == RebasedEliasFano { + it := s.ref.ReverseIterator() + if v > 0 { + it.Seek(uint64(v)) + } + return it + } + + panic("unknown encoding") +} diff --git a/erigon-lib/recsplit/multiencseq/sequence_reader_test.go b/erigon-lib/recsplit/multiencseq/sequence_reader_test.go new file mode 100644 index 00000000000..e15e7832251 --- /dev/null +++ b/erigon-lib/recsplit/multiencseq/sequence_reader_test.go @@ -0,0 +1,164 @@ +package multiencseq + +import ( + "testing" + + "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/recsplit/simpleseq" + "github.com/stretchr/testify/require" +) + +func TestMultiEncSeq(t *testing.T) { + + t.Run("plain elias fano", func(t *testing.T) { + b := make([]byte, 0) + + // append serialized elias fano + ef := eliasfano32.NewEliasFano(3, 1027) + ef.AddOffset(1000) + ef.AddOffset(1015) + ef.AddOffset(1027) + ef.Build() + b = ef.AppendBytes(b) + + // check deserialization + s := ReadMultiEncSeq(1000, b) + require.Equal(t, PlainEliasFano, s.currentEnc) + requireSequenceChecks(t, s) + requireRawDataChecks(t, b) + }) + + t.Run("simple encoding", func(t *testing.T) { + b := make([]byte, 0) + + // type: simple encoding, count: 3 + b = append(b, 0b10000010) + + // append serialized simple sequence + seq := simpleseq.NewSimpleSequence(1000, 3) + seq.AddOffset(1000) + seq.AddOffset(1015) + seq.AddOffset(1027) + b = seq.AppendBytes(b) + + // check deserialization + s := ReadMultiEncSeq(1000, b) + require.Equal(t, SimpleEncoding, s.currentEnc) + requireSequenceChecks(t, s) + requireRawDataChecks(t, b) + }) + + t.Run("rebased elias fano", func(t *testing.T) { + b := make([]byte, 0) + + // type: rebased elias fano + b = append(b, 0b10010000) + + // append serialized elias fano (rebased -1000) + ef := eliasfano32.NewEliasFano(3, 27) + ef.AddOffset(0) + ef.AddOffset(15) + ef.AddOffset(27) + ef.Build() + b = ef.AppendBytes(b) + + // check deserialization + s := ReadMultiEncSeq(1000, b) + require.Equal(t, RebasedEliasFano, s.currentEnc) + requireSequenceChecks(t, s) + requireRawDataChecks(t, b) + }) +} + +func requireSequenceChecks(t *testing.T, s *SequenceReader) { + require.Equal(t, uint64(1000), s.Min()) + require.Equal(t, uint64(1027), s.Max()) + require.Equal(t, uint64(3), s.Count()) + + require.Equal(t, uint64(1000), s.Get(0)) + require.Equal(t, uint64(1015), s.Get(1)) + require.Equal(t, uint64(1027), s.Get(2)) + + // check iterator + it := s.Iterator(0) + require.True(t, it.HasNext()) + n, err := it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1000), n) + + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), n) + + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1027), n) + + require.False(t, it.HasNext()) + + // check iterator + seek + it = s.Iterator(1014) + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), n) + + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1027), n) + + require.False(t, it.HasNext()) + + // check reverse iterator + it = s.ReverseIterator(2000) + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1027), n) + + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), n) + + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1000), n) + + require.False(t, it.HasNext()) + + // check reverse iterator + seek + it = s.ReverseIterator(1016) + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), n) + + require.True(t, it.HasNext()) + n, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1000), n) + + require.False(t, it.HasNext()) +} + +func requireRawDataChecks(t *testing.T, b []byte) { + // check fast count + require.Equal(t, uint64(3), Count(1000, b)) + + // check search + n, found := Seek(1000, b, 1014) + require.True(t, found) + require.Equal(t, uint64(1015), n) + + n, found = Seek(1000, b, 1015) + require.True(t, found) + require.Equal(t, uint64(1015), n) + + _, found = Seek(1000, b, 1028) + require.False(t, found) +} diff --git a/erigon-lib/recsplit/simpleseq/simple_sequence.go b/erigon-lib/recsplit/simpleseq/simple_sequence.go new file mode 100644 index 00000000000..657efaf23ad --- /dev/null +++ b/erigon-lib/recsplit/simpleseq/simple_sequence.go @@ -0,0 +1,182 @@ +package simpleseq + +import ( + "encoding/binary" + "sort" + + "github.com/erigontech/erigon-lib/kv/stream" +) + +// SimpleSequence is a simpler representation of number sequences meant to be a drop-in +// replacement for EliasFano for short sequences. +// +// It stores sequences as deltas of a base number. We assume base number and element values +// as uint64, while the deltas are uint32, reinforcing the assumption that this implementation +// is optimized for short sequences of close values. +type SimpleSequence struct { + baseNum uint64 + raw []byte + pos int +} + +func NewSimpleSequence(baseNum uint64, count uint64) *SimpleSequence { + return &SimpleSequence{ + baseNum: baseNum, + raw: make([]byte, count*4), + pos: 0, + } +} + +// Construct a SimpleSequence from a serialized representation. +// +// Returned object can be reused by calling Reset() on it. +func ReadSimpleSequence(baseNum uint64, raw []byte) *SimpleSequence { + seq := SimpleSequence{} + seq.Reset(baseNum, raw) + return &seq +} + +func (s *SimpleSequence) Get(i uint64) uint64 { + idx := i * 4 + delta := binary.BigEndian.Uint32(s.raw[idx : idx+4]) + return s.baseNum + uint64(delta) +} + +func (s *SimpleSequence) Min() uint64 { + return s.Get(0) +} + +func (s *SimpleSequence) Max() uint64 { + return s.Get(s.Count() - 1) +} + +func (s *SimpleSequence) Count() uint64 { + return uint64(len(s.raw) / 4) +} + +func (s *SimpleSequence) AddOffset(offset uint64) { + binary.BigEndian.PutUint32(s.raw[s.pos*4:(s.pos+1)*4], uint32(offset-s.baseNum)) + s.pos++ +} + +func (s *SimpleSequence) Reset(baseNum uint64, raw []byte) { + s.baseNum = baseNum + s.raw = raw + s.pos = len(raw) / 4 +} + +func (s *SimpleSequence) AppendBytes(buf []byte) []byte { + return append(buf, s.raw...) +} + +func (s *SimpleSequence) search(v uint64) (int, bool) { + c := s.Count() + idx := sort.Search(int(c), func(i int) bool { + return s.Get(uint64(i)) >= v + }) + + if idx >= int(c) { + return 0, false + } + return idx, true +} + +func (s *SimpleSequence) reverseSearch(v uint64) (int, bool) { + c := s.Count() + idx := sort.Search(int(c), func(i int) bool { + return s.Get(c-uint64(i)-1) <= v + }) + + if idx >= int(c) { + return 0, false + } + return int(c) - idx - 1, true +} + +func (s *SimpleSequence) Search(v uint64) (uint64, bool) { + idx, found := s.search(v) + if !found { + return 0, false + } + return s.Get(uint64(idx)), true +} + +func (s *SimpleSequence) Iterator() *SimpleSequenceIterator { + return &SimpleSequenceIterator{ + seq: s, + pos: 0, + } +} + +func (s *SimpleSequence) ReverseIterator() *ReverseSimpleSequenceIterator { + return &ReverseSimpleSequenceIterator{ + seq: s, + pos: int(s.Count()) - 1, + } +} + +type SimpleSequenceIterator struct { + seq *SimpleSequence + pos int +} + +func (it *SimpleSequenceIterator) Next() (uint64, error) { + if !it.HasNext() { + return 0, stream.ErrIteratorExhausted + } + + v := it.seq.Get(uint64(it.pos)) + it.pos++ + return v, nil +} + +func (it *SimpleSequenceIterator) HasNext() bool { + return it.pos < int(it.seq.Count()) +} + +func (it *SimpleSequenceIterator) Close() { + // noop +} + +func (it *SimpleSequenceIterator) Seek(v uint64) { + idx, found := it.seq.search(v) + if !found { + it.pos = int(it.seq.Count()) + return + } + + it.pos = idx +} + +type ReverseSimpleSequenceIterator struct { + seq *SimpleSequence + pos int +} + +func (it *ReverseSimpleSequenceIterator) Next() (uint64, error) { + if !it.HasNext() { + return 0, stream.ErrIteratorExhausted + } + + v := it.seq.Get(uint64(it.pos)) + it.pos-- + return v, nil +} + +func (it *ReverseSimpleSequenceIterator) HasNext() bool { + return it.pos >= 0 +} + +func (it *ReverseSimpleSequenceIterator) Close() { + // noop +} + +func (it *ReverseSimpleSequenceIterator) Seek(v uint64) { + idx, found := it.seq.reverseSearch(v) + if !found { + it.pos = -1 + return + } + + it.pos = idx +} diff --git a/erigon-lib/recsplit/simpleseq/simple_sequence_test.go b/erigon-lib/recsplit/simpleseq/simple_sequence_test.go new file mode 100644 index 00000000000..3e641b91d71 --- /dev/null +++ b/erigon-lib/recsplit/simpleseq/simple_sequence_test.go @@ -0,0 +1,241 @@ +package simpleseq + +import ( + "testing" + + "github.com/erigontech/erigon-lib/common/hexutility" + "github.com/erigontech/erigon-lib/kv/stream" + "github.com/stretchr/testify/require" +) + +func TestSimpleSequence(t *testing.T) { + s := NewSimpleSequence(1000, 4) + s.AddOffset(1001) + s.AddOffset(1007) + s.AddOffset(1015) + s.AddOffset(1027) + + t.Run("basic test", func(t *testing.T) { + require.Equal(t, uint64(1001), s.Get(0)) + require.Equal(t, uint64(1007), s.Get(1)) + require.Equal(t, uint64(1015), s.Get(2)) + require.Equal(t, uint64(1027), s.Get(3)) + + require.Equal(t, uint64(1001), s.Min()) + require.Equal(t, uint64(1027), s.Max()) + require.Equal(t, uint64(4), s.Count()) + }) + + t.Run("serialization", func(t *testing.T) { + b := make([]byte, 0) + b = s.AppendBytes(b) + + require.Equal(t, hexutility.MustDecodeHex("0x"+ + "00000001"+ + "00000007"+ + "0000000f"+ + "0000001b"), b) + }) + + t.Run("search", func(t *testing.T) { + // before baseNum + v, found := s.Search(10) + require.True(t, found) + require.Equal(t, uint64(1001), v) + + // at baseNum + v, found = s.Search(1000) + require.True(t, found) + require.Equal(t, uint64(1001), v) + + // at elem + v, found = s.Search(1007) + require.True(t, found) + require.Equal(t, uint64(1007), v) + + // between elems + v, found = s.Search(1014) + require.True(t, found) + require.Equal(t, uint64(1015), v) + + // at last + v, found = s.Search(1027) + require.True(t, found) + require.Equal(t, uint64(1027), v) + + // after last + v, found = s.Search(1028) + require.False(t, found) + require.Equal(t, uint64(0), v) + }) + + t.Run("iterator", func(t *testing.T) { + it := s.Iterator() + defer it.Close() + + require.True(t, it.HasNext()) + v, err := it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1001), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1007), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1027), v) + + require.False(t, it.HasNext()) + v, err = it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) + + t.Run("iterator seek exact", func(t *testing.T) { + it := s.Iterator() + defer it.Close() + + it.Seek(1015) + + require.True(t, it.HasNext()) + v, err := it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1027), v) + + require.False(t, it.HasNext()) + v, err = it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) + + t.Run("iterator seek", func(t *testing.T) { + it := s.Iterator() + defer it.Close() + + it.Seek(1014) + + require.True(t, it.HasNext()) + v, err := it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1027), v) + + require.False(t, it.HasNext()) + v, err = it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) + + t.Run("iterator seek not found", func(t *testing.T) { + it := s.Iterator() + defer it.Close() + + it.Seek(1029) + require.False(t, it.HasNext()) + v, err := it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) + + t.Run("reverse iterator", func(t *testing.T) { + it := s.ReverseIterator() + defer it.Close() + + require.True(t, it.HasNext()) + v, err := it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1027), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1015), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1007), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1001), v) + + require.False(t, it.HasNext()) + v, err = it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) + + t.Run("reverse iterator seek exact", func(t *testing.T) { + it := s.ReverseIterator() + defer it.Close() + + it.Seek(1007) + + require.True(t, it.HasNext()) + v, err := it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1007), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1001), v) + + require.False(t, it.HasNext()) + v, err = it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) + + t.Run("reverse iterator seek", func(t *testing.T) { + it := s.ReverseIterator() + defer it.Close() + + it.Seek(1008) + + require.True(t, it.HasNext()) + v, err := it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1007), v) + + require.True(t, it.HasNext()) + v, err = it.Next() + require.NoError(t, err) + require.Equal(t, uint64(1001), v) + + require.False(t, it.HasNext()) + v, err = it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) + + t.Run("reverse iterator seek not found", func(t *testing.T) { + it := s.ReverseIterator() + defer it.Close() + + it.Seek(1000) + require.False(t, it.HasNext()) + v, err := it.Next() + require.Error(t, stream.ErrIteratorExhausted, err) + require.Equal(t, uint64(0), v) + }) +} diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 17c0b02224a..d67ba7b9e9d 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -99,7 +99,9 @@ type OnFreezeFunc func(frozenFileNames []string) const AggregatorSqueezeCommitmentValues = true const MaxNonFuriousDirtySpacePerTx = 64 * datasize.MB -func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) { +// TODO: experimentalEFOptimization is a temporary flag to enable experimental .ef optimization. +// It should be removed after the optimization is stable and enabled by default. +func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint64, db kv.RoDB, logger log.Logger, experimentalEFOptimization bool) (*Aggregator, error) { tmpdir := dirs.Tmp salt, err := getStateIndicesSalt(dirs.Snap) if err != nil { @@ -167,7 +169,8 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 withLocalityIndex: false, historyLargeValues: false, iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx}, + aggregationStep: aggregationStep, keysTable: kv.TblAccountHistoryKeys, valuesTable: kv.TblAccountIdx, + experimentalEFOptimization: experimentalEFOptimization}, }, } if a.d[kv.AccountsDomain], err = NewDomain(cfg, logger); err != nil { @@ -187,7 +190,8 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 withLocalityIndex: false, historyLargeValues: false, iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx}, + aggregationStep: aggregationStep, keysTable: kv.TblStorageHistoryKeys, valuesTable: kv.TblStorageIdx, + experimentalEFOptimization: experimentalEFOptimization}, }, } if a.d[kv.StorageDomain], err = NewDomain(cfg, logger); err != nil { @@ -208,7 +212,8 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 withLocalityIndex: false, historyLargeValues: true, iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblCodeHistoryKeys, valuesTable: kv.TblCodeIdx}, + aggregationStep: aggregationStep, keysTable: kv.TblCodeHistoryKeys, valuesTable: kv.TblCodeIdx, + experimentalEFOptimization: experimentalEFOptimization}, }, } if a.d[kv.CodeDomain], err = NewDomain(cfg, logger); err != nil { @@ -230,7 +235,8 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 withLocalityIndex: false, historyLargeValues: false, iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx}, + aggregationStep: aggregationStep, keysTable: kv.TblCommitmentHistoryKeys, valuesTable: kv.TblCommitmentIdx, + experimentalEFOptimization: experimentalEFOptimization}, }, } if a.d[kv.CommitmentDomain], err = NewDomain(cfg, logger); err != nil { @@ -248,22 +254,23 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 withLocalityIndex: false, historyLargeValues: false, iiCfg: iiCfg{salt: salt, dirs: dirs, db: db, withExistence: false, compressorCfg: seg.DefaultCfg, - aggregationStep: aggregationStep, keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx}, + aggregationStep: aggregationStep, keysTable: kv.TblReceiptHistoryKeys, valuesTable: kv.TblReceiptIdx, + experimentalEFOptimization: experimentalEFOptimization}, }, } if a.d[kv.ReceiptDomain], err = NewDomain(cfg, logger); err != nil { return nil, err } - if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, db, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger); err != nil { + if err := a.registerII(kv.LogAddrIdxPos, salt, dirs, db, aggregationStep, kv.FileLogAddressIdx, kv.TblLogAddressKeys, kv.TblLogAddressIdx, logger, experimentalEFOptimization); err != nil { return nil, err } - if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, db, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger); err != nil { + if err := a.registerII(kv.LogTopicIdxPos, salt, dirs, db, aggregationStep, kv.FileLogTopicsIdx, kv.TblLogTopicsKeys, kv.TblLogTopicsIdx, logger, experimentalEFOptimization); err != nil { return nil, err } - if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger); err != nil { + if err := a.registerII(kv.TracesFromIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesFromIdx, kv.TblTracesFromKeys, kv.TblTracesFromIdx, logger, experimentalEFOptimization); err != nil { return nil, err } - if err := a.registerII(kv.TracesToIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger); err != nil { + if err := a.registerII(kv.TracesToIdxPos, salt, dirs, db, aggregationStep, kv.FileTracesToIdx, kv.TblTracesToKeys, kv.TblTracesToIdx, logger, experimentalEFOptimization); err != nil { return nil, err } a.KeepRecentTxnsOfHistoriesWithDisabledSnapshots(100_000) // ~1k blocks of history @@ -272,6 +279,11 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 return a, nil } +// TODO: exported for idx_optimize.go +func GetStateIndicesSalt(baseDir string) (salt *uint32, err error) { + return getStateIndicesSalt(baseDir) +} + // getStateIndicesSalt - try read salt for all indices from DB. Or fall-back to new salt creation. // if db is Read-Only (for example remote RPCDaemon or utilities) - we will not create new indices - and existing indices have salt in metadata. func getStateIndicesSalt(baseDir string) (salt *uint32, err error) { @@ -316,7 +328,7 @@ func getStateIndicesSalt(baseDir string) (salt *uint32, err error) { return salt, nil } -func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadir.Dirs, db kv.RoDB, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger) error { +func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadir.Dirs, db kv.RoDB, aggregationStep uint64, filenameBase, indexKeysTable, indexTable string, logger log.Logger, experimentalEFOptimization bool) error { idxCfg := iiCfg{ salt: salt, dirs: dirs, db: db, aggregationStep: aggregationStep, @@ -324,6 +336,8 @@ func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadi keysTable: indexKeysTable, valuesTable: indexTable, compression: seg.CompressNone, + + experimentalEFOptimization: experimentalEFOptimization, } var err error a.iis[idx], err = NewInvertedIndex(idxCfg, logger) diff --git a/erigon-lib/state/aggregator_bench_test.go b/erigon-lib/state/aggregator_bench_test.go index 63fa225af81..c4dc85e9ceb 100644 --- a/erigon-lib/state/aggregator_bench_test.go +++ b/erigon-lib/state/aggregator_bench_test.go @@ -45,7 +45,7 @@ func testDbAndAggregatorBench(b *testing.B, aggStep uint64) (kv.RwDB, *Aggregato dirs := datadir.New(b.TempDir()) db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).MustOpen() b.Cleanup(db.Close) - agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger) + agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger, false) require.NoError(b, err) b.Cleanup(agg.Close) return db, agg diff --git a/erigon-lib/state/aggregator_fuzz_test.go b/erigon-lib/state/aggregator_fuzz_test.go index 559869f66ad..74eba4b07c1 100644 --- a/erigon-lib/state/aggregator_fuzz_test.go +++ b/erigon-lib/state/aggregator_fuzz_test.go @@ -276,7 +276,7 @@ func testFuzzDbAndAggregatorv3(f *testing.F, aggStep uint64) (kv.RwDB, *Aggregat db := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).MustOpen() f.Cleanup(db.Close) - agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger) + agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger, false) require.NoError(err) f.Cleanup(agg.Close) err = agg.OpenFolder() diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index aef94e7a7d8..4d5226f31d1 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -154,7 +154,7 @@ func aggregatorV3_RestartOnDatadir(t *testing.T, rc runCfg) { agg.Close() // Start another aggregator on same datadir - anotherAgg, err := NewAggregator(context.Background(), agg.dirs, aggStep, db, logger) + anotherAgg, err := NewAggregator(context.Background(), agg.dirs, aggStep, db, logger, false) require.NoError(t, err) defer anotherAgg.Close() @@ -614,7 +614,7 @@ func TestAggregatorV3_RestartOnFiles(t *testing.T) { newDb := mdbx.New(kv.ChainDB, logger).InMem(dirs.Chaindata).MustOpen() t.Cleanup(newDb.Close) - newAgg, err := NewAggregator(context.Background(), agg.dirs, aggStep, newDb, logger) + newAgg, err := NewAggregator(context.Background(), agg.dirs, aggStep, newDb, logger, false) require.NoError(t, err) require.NoError(t, newAgg.OpenFolder()) @@ -885,7 +885,7 @@ func testDbAndAggregatorv3(tb testing.TB, aggStep uint64) (kv.RwDB, *Aggregator) }).MustOpen() tb.Cleanup(db.Close) - agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger) + agg, err := NewAggregator(context.Background(), dirs, aggStep, db, logger, false) require.NoError(err) tb.Cleanup(agg.Close) err = agg.OpenFolder() diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index 328c25d1f0e..ccb49bd6f06 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -1366,6 +1366,11 @@ func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps } } +// TODO: exported for idx_optimize.go +func BuildAccessor(ctx context.Context, d *seg.Decompressor, compressed seg.FileCompression, idxPath string, values bool, cfg recsplit.RecSplitArgs, ps *background.ProgressSet, logger log.Logger) error { + return buildAccessor(ctx, d, compressed, idxPath, values, cfg, ps, logger) +} + func buildAccessor(ctx context.Context, d *seg.Decompressor, compressed seg.FileCompression, idxPath string, values bool, cfg recsplit.RecSplitArgs, ps *background.ProgressSet, logger log.Logger) error { _, fileName := filepath.Split(idxPath) count := d.Count() diff --git a/erigon-lib/state/history.go b/erigon-lib/state/history.go index 13728088c3f..36d0f5fe41b 100644 --- a/erigon-lib/state/history.go +++ b/erigon-lib/state/history.go @@ -41,7 +41,7 @@ import ( "github.com/erigontech/erigon-lib/kv/stream" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/recsplit" - "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/recsplit/multiencseq" "github.com/erigontech/erigon-lib/seg" ) @@ -340,14 +340,14 @@ func (h *History) buildVi(ctx context.Context, item *filesItem, ps *background.P fromStep, toStep := item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep idxPath := h.vAccessorFilePath(fromStep, toStep) - _, err = h.buildVI(ctx, idxPath, item.decompressor, iiItem.decompressor, ps) + _, err = h.buildVI(ctx, idxPath, item.decompressor, iiItem.decompressor, iiItem.startTxNum, ps) if err != nil { return fmt.Errorf("buildVI: %w", err) } return nil } -func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHist *seg.Decompressor, ps *background.ProgressSet) (string, error) { +func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHist *seg.Decompressor, efBaseTxNum uint64, ps *background.ProgressSet) (string, error) { rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{ KeyCount: hist.Count(), Enums: false, @@ -390,10 +390,10 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi // fmt.Printf("ef key %x\n", keyBuf) - ef, _ := eliasfano32.ReadEliasFano(valBuf) - efIt := ef.Iterator() - for efIt.HasNext() { - txNum, err := efIt.Next() + seq := multiencseq.ReadMultiEncSeq(efBaseTxNum, valBuf) + it := seq.Iterator(0) + for it.HasNext() { + txNum, err := it.Next() if err != nil { return "", err } @@ -555,12 +555,14 @@ func (w *historyBufferedWriter) Flush(ctx context.Context, tx kv.RwTx) error { return nil } +// TODO: rename ef* fields type HistoryCollation struct { historyComp *seg.Writer efHistoryComp *seg.Writer historyPath string efHistoryPath string - historyCount int // same as historyComp.Count() + efBaseTxNum uint64 // TODO: is it necessary or using step later is reliable? + historyCount int // same as historyComp.Count() } func (c HistoryCollation) Close() { @@ -579,10 +581,10 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k } var ( - historyComp *seg.Writer - efHistoryComp *seg.Writer - txKey [8]byte - err error + historyComp *seg.Writer + seqWriter *seg.Writer + txKey [8]byte + err error historyPath = h.vFilePath(step, step+1) efHistoryPath = h.efFilePath(step, step+1) @@ -595,8 +597,8 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k if historyComp != nil { historyComp.Close() } - if efHistoryComp != nil { - efHistoryComp.Close() + if seqWriter != nil { + seqWriter.Close() } } }() @@ -652,26 +654,27 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k defer cd.Close() } - efComp, err := seg.NewCompressor(ctx, "collate idx "+h.filenameBase, efHistoryPath, h.dirs.Tmp, h.compressorCfg, log.LvlTrace, h.logger) + seqComp, err := seg.NewCompressor(ctx, "collate idx "+h.filenameBase, efHistoryPath, h.dirs.Tmp, h.compressorCfg, log.LvlTrace, h.logger) if err != nil { return HistoryCollation{}, fmt.Errorf("create %s ef history compressor: %w", h.filenameBase, err) } if h.noFsync { - efComp.DisableFsync() + seqComp.DisableFsync() } var ( keyBuf = make([]byte, 0, 256) numBuf = make([]byte, 8) bitmap = bitmapdb.NewBitmap64() - prevEf []byte + prevSeq []byte prevKey []byte initialized bool ) - efHistoryComp = seg.NewWriter(efComp, seg.CompressNone) // coll+build must be fast - no compression + seqWriter = seg.NewWriter(seqComp, seg.CompressNone) // coll+build must be fast - no compression collector.SortAndFlushInBackground(true) defer bitmapdb.ReturnToPool64(bitmap) + baseTxNum := step * h.aggregationStep loadBitmapsFunc := func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { txNum := binary.BigEndian.Uint64(v) if !initialized { @@ -685,7 +688,7 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k return nil } - ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum()) + seqBuilder := multiencseq.NewBuilder(baseTxNum, bitmap.GetCardinality(), bitmap.Maximum(), h.experimentalEFOptimization) it := bitmap.Iterator() for it.HasNext() { @@ -718,17 +721,17 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k } } - ef.AddOffset(vTxNum) + seqBuilder.AddOffset(vTxNum) } bitmap.Clear() - ef.Build() + seqBuilder.Build() - prevEf = ef.AppendBytes(prevEf[:0]) + prevSeq = seqBuilder.AppendBytes(prevSeq[:0]) - if err = efHistoryComp.AddWord(prevKey); err != nil { + if err = seqWriter.AddWord(prevKey); err != nil { return fmt.Errorf("add %s ef history key [%x]: %w", h.filenameBase, prevKey, err) } - if err = efHistoryComp.AddWord(prevEf); err != nil { + if err = seqWriter.AddWord(prevSeq); err != nil { return fmt.Errorf("add %s ef history val: %w", h.filenameBase, err) } @@ -753,8 +756,9 @@ func (h *History) collate(ctx context.Context, step, txFrom, txTo uint64, roTx k mxCollationSizeHist.SetUint64(uint64(historyComp.Count())) return HistoryCollation{ - efHistoryComp: efHistoryComp, + efHistoryComp: seqWriter, efHistoryPath: efHistoryPath, + efBaseTxNum: step * h.aggregationStep, historyPath: historyPath, historyComp: historyComp, historyCount: historyComp.Count(), @@ -874,7 +878,7 @@ func (h *History) buildFiles(ctx context.Context, step uint64, collation History } historyIdxPath := h.vAccessorFilePath(step, step+1) - historyIdxPath, err = h.buildVI(ctx, historyIdxPath, historyDecomp, efHistoryDecomp, ps) + historyIdxPath, err = h.buildVI(ctx, historyIdxPath, historyDecomp, efHistoryDecomp, collation.efBaseTxNum, ps) if err != nil { return HistoryFiles{}, fmt.Errorf("build %s .vi: %w", h.filenameBase, err) } diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index 7c351294536..0f794db3dee 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -50,6 +50,7 @@ import ( "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/recsplit" "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/recsplit/multiencseq" "github.com/erigontech/erigon-lib/seg" ) @@ -91,6 +92,8 @@ type iiCfg struct { // external checker for integrity of inverted index ranges integrity rangeIntegrityChecker indexList idxList + + experimentalEFOptimization bool } type iiVisible struct { @@ -598,8 +601,8 @@ func (iit *InvertedIndexRoTx) seekInFiles(key []byte, txNum uint64) (found bool, if !bytes.Equal(k, key) { continue } - eliasVal, _ := g.Next(nil) - equalOrHigherTxNum, found = eliasfano32.Seek(eliasVal, txNum) + encodedSeq, _ := g.Next(nil) + equalOrHigherTxNum, found = multiencseq.Seek(iit.files[i].startTxNum, encodedSeq, txNum) if !found { continue } @@ -689,7 +692,7 @@ func (iit *InvertedIndexRoTx) iterateRangeOnFiles(key []byte, startTxNum, endTxN indexTable: iit.ii.valuesTable, orderAscend: asc, limit: limit, - ef: eliasfano32.NewEliasFano(1, 1), + seq: &multiencseq.SequenceReader{}, } if asc { for i := len(iit.files) - 1; i >= 0; i-- { @@ -1076,7 +1079,7 @@ func (ii *InvertedIndex) collate(ctx context.Context, step uint64, roTx kv.Tx) ( coll.writer = seg.NewWriter(comp, ii.compression) var ( - prevEf []byte + prevSeq []byte prevKey []byte initialized bool bitmap = bitmapdb.NewBitmap64() @@ -1096,20 +1099,20 @@ func (ii *InvertedIndex) collate(ctx context.Context, step uint64, roTx kv.Tx) ( return nil } - ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum()) + seqBuilder := multiencseq.NewBuilder(step*ii.aggregationStep, bitmap.GetCardinality(), bitmap.Maximum(), ii.experimentalEFOptimization) it := bitmap.Iterator() for it.HasNext() { - ef.AddOffset(it.Next()) + seqBuilder.AddOffset(it.Next()) } bitmap.Clear() - ef.Build() + seqBuilder.Build() - prevEf = ef.AppendBytes(prevEf[:0]) + prevSeq = seqBuilder.AppendBytes(prevSeq[:0]) if err = coll.writer.AddWord(prevKey); err != nil { return fmt.Errorf("add %s efi index key [%x]: %w", ii.filenameBase, prevKey, err) } - if err = coll.writer.AddWord(prevEf); err != nil { + if err = coll.writer.AddWord(prevSeq); err != nil { return fmt.Errorf("add %s efi index val: %w", ii.filenameBase, err) } diff --git a/erigon-lib/state/inverted_index_stream.go b/erigon-lib/state/inverted_index_stream.go index 5f7b5808f16..5d1b154a2c3 100644 --- a/erigon-lib/state/inverted_index_stream.go +++ b/erigon-lib/state/inverted_index_stream.go @@ -28,6 +28,7 @@ import ( "github.com/erigontech/erigon-lib/kv/order" "github.com/erigontech/erigon-lib/kv/stream" "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/recsplit/multiencseq" ) // InvertedIdxStreamFiles allows iteration over range of txn numbers @@ -40,7 +41,7 @@ type InvertedIdxStreamFiles struct { limit int orderAscend order.By - efIt stream.Uno[uint64] + seqIt stream.Uno[uint64] indexTable string stack []visibleFile @@ -48,7 +49,7 @@ type InvertedIdxStreamFiles struct { hasNext bool err error - ef *eliasfano32.EliasFano + seq *multiencseq.SequenceReader } func (it *InvertedIdxStreamFiles) Close() { @@ -84,7 +85,7 @@ func (it *InvertedIdxStreamFiles) next() uint64 { func (it *InvertedIdxStreamFiles) advanceInFiles() { for { - for it.efIt == nil { + for it.seqIt == nil { if len(it.stack) == 0 { it.hasNext = false return @@ -99,26 +100,23 @@ func (it *InvertedIdxStreamFiles) advanceInFiles() { g.Reset(offset) k, _ := g.NextUncompressed() if bytes.Equal(k, it.key) { - eliasVal, _ := g.NextUncompressed() - it.ef.Reset(eliasVal) - var efiter *eliasfano32.EliasFanoIter + numSeqVal, _ := g.NextUncompressed() + it.seq.Reset(item.startTxNum, numSeqVal) + var seqIt stream.Uno[uint64] if it.orderAscend { - efiter = it.ef.Iterator() + seqIt = it.seq.Iterator(it.startTxNum) } else { - efiter = it.ef.ReverseIterator() + seqIt = it.seq.ReverseIterator(it.startTxNum) } - if it.startTxNum > 0 { - efiter.Seek(uint64(it.startTxNum)) - } - it.efIt = efiter + it.seqIt = seqIt } } //Asc: [from, to) AND from < to //Desc: [from, to) AND from > to if it.orderAscend { - for it.efIt.HasNext() { - n, err := it.efIt.Next() + for it.seqIt.HasNext() { + n, err := it.seqIt.Next() if err != nil { it.err = err return @@ -137,8 +135,8 @@ func (it *InvertedIdxStreamFiles) advanceInFiles() { return } } else { - for it.efIt.HasNext() { - n, err := it.efIt.Next() + for it.seqIt.HasNext() { + n, err := it.seqIt.Next() if err != nil { it.err = err return @@ -157,7 +155,7 @@ func (it *InvertedIdxStreamFiles) advanceInFiles() { return } } - it.efIt = nil // Exhausted this iterator + it.seqIt = nil // Exhausted this iterator } } diff --git a/erigon-lib/state/merge.go b/erigon-lib/state/merge.go index 3f3f40476d5..2cc9ce82aed 100644 --- a/erigon-lib/state/merge.go +++ b/erigon-lib/state/merge.go @@ -35,7 +35,7 @@ import ( "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/recsplit" - "github.com/erigontech/erigon-lib/recsplit/eliasfano32" + "github.com/erigontech/erigon-lib/recsplit/multiencseq" "github.com/erigontech/erigon-lib/seg" ) @@ -338,28 +338,28 @@ func (ht *HistoryRoTx) staticFilesInRange(r HistoryRanges) (indexFiles, historyF return } -func mergeEfs(preval, val, buf []byte) ([]byte, error) { - preef, _ := eliasfano32.ReadEliasFano(preval) - ef, _ := eliasfano32.ReadEliasFano(val) - preIt := preef.Iterator() - efIt := ef.Iterator() - newEf := eliasfano32.NewEliasFano(preef.Count()+ef.Count(), ef.Max()) +func mergeNumSeqs(preval, val []byte, preBaseNum, baseNum uint64, buf []byte, outBaseNum uint64, experimentalEFOptimization bool) ([]byte, error) { + preSeq := multiencseq.ReadMultiEncSeq(preBaseNum, preval) + seq := multiencseq.ReadMultiEncSeq(baseNum, val) + preIt := preSeq.Iterator(0) + efIt := seq.Iterator(0) + newSeq := multiencseq.NewBuilder(outBaseNum, preSeq.Count()+seq.Count(), seq.Max(), experimentalEFOptimization) for preIt.HasNext() { v, err := preIt.Next() if err != nil { return nil, err } - newEf.AddOffset(v) + newSeq.AddOffset(v) } for efIt.HasNext() { v, err := efIt.Next() if err != nil { return nil, err } - newEf.AddOffset(v) + newSeq.AddOffset(v) } - newEf.Build() - return newEf.AppendBytes(buf), nil + newSeq.Build() + return newSeq.AppendBytes(buf), nil } type valueTransformer func(val []byte, startTxNum, endTxNum uint64) ([]byte, error) @@ -603,12 +603,13 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*filesItem val, _ := g.Next(nil) //fmt.Printf("heap push %s [%d] %x\n", item.decompressor.FilePath(), item.endTxNum, key) heap.Push(&cp, &CursorItem{ - t: FILE_CURSOR, - dg: g, - key: key, - val: val, - endTxNum: item.endTxNum, - reverse: true, + t: FILE_CURSOR, + dg: g, + key: key, + val: val, + startTxNum: item.startTxNum, + endTxNum: item.endTxNum, + reverse: true, }) } } @@ -622,13 +623,28 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*filesItem for cp.Len() > 0 { lastKey := common.Copy(cp[0].key) lastVal := common.Copy(cp[0].val) + + // Pre-rebase the first sequence + preSeq := multiencseq.ReadMultiEncSeq(cp[0].startTxNum, lastVal) + preIt := preSeq.Iterator(0) + newSeq := multiencseq.NewBuilder(startTxNum, preSeq.Count(), preSeq.Max(), iit.ii.experimentalEFOptimization) + for preIt.HasNext() { + v, err := preIt.Next() + if err != nil { + return nil, err + } + newSeq.AddOffset(v) + } + newSeq.Build() + lastVal = newSeq.AppendBytes(nil) + var mergedOnce bool // Advance all the items that have this key (including the top) for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) { ci1 := heap.Pop(&cp).(*CursorItem) if mergedOnce { - if lastVal, err = mergeEfs(ci1.val, lastVal, nil); err != nil { + if lastVal, err = mergeNumSeqs(ci1.val, lastVal, ci1.startTxNum, startTxNum, nil, startTxNum, iit.ii.experimentalEFOptimization); err != nil { return nil, fmt.Errorf("merge %s inverted index: %w", iit.ii.filenameBase, err) } } else { @@ -768,13 +784,14 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles key, _ := g.Next(nil) val, _ := g.Next(nil) heap.Push(&cp, &CursorItem{ - t: FILE_CURSOR, - dg: g, - dg2: g2, - key: key, - val: val, - endTxNum: item.endTxNum, - reverse: false, + t: FILE_CURSOR, + dg: g, + dg2: g2, + key: key, + val: val, + startTxNum: item.startTxNum, + endTxNum: item.endTxNum, + reverse: false, }) } } @@ -790,7 +807,7 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles // Advance all the items that have this key (including the top) for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) { ci1 := heap.Pop(&cp).(*CursorItem) - count := eliasfano32.Count(ci1.val) + count := multiencseq.Count(ci1.startTxNum, ci1.val) for i := uint64(0); i < count; i++ { if !ci1.dg2.HasNext() { panic(fmt.Errorf("assert: no value??? %s, i=%d, count=%d, lastKey=%x, ci1.key=%x", ci1.dg2.FileName(), i, count, lastKey, ci1.key)) @@ -853,10 +870,10 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles for g.HasNext() { keyBuf, _ = g.Next(nil) valBuf, _ = g.Next(nil) - ef, _ := eliasfano32.ReadEliasFano(valBuf) - efIt := ef.Iterator() - for efIt.HasNext() { - txNum, err := efIt.Next() + seq := multiencseq.ReadMultiEncSeq(indexIn.startTxNum, valBuf) + it := seq.Iterator(0) + for it.HasNext() { + txNum, err := it.Next() if err != nil { return nil, nil, err } diff --git a/erigon-lib/state/merge_test.go b/erigon-lib/state/merge_test.go index c7fa5124e06..5161a6fffe8 100644 --- a/erigon-lib/state/merge_test.go +++ b/erigon-lib/state/merge_test.go @@ -18,11 +18,12 @@ package state import ( "context" - "github.com/erigontech/erigon-lib/common/datadir" "os" "sort" "testing" + "github.com/erigontech/erigon-lib/common/datadir" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" btree2 "github.com/tidwall/btree" @@ -498,7 +499,7 @@ func Test_mergeEliasFano(t *testing.T) { require.Contains(t, secondList, int(v)) } - menc, err := mergeEfs(firstBytes, secondBytes, nil) + menc, err := mergeNumSeqs(firstBytes, secondBytes, 0, 0, nil, 0, false) require.NoError(t, err) merged, _ := eliasfano32.ReadEliasFano(menc) diff --git a/eth/backend.go b/eth/backend.go index 85e9c43617a..9a0d65c5156 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1502,7 +1502,15 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf } } blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, heimdallStore, bridgeStore) - agg, err := libstate.NewAggregator(ctx, dirs, config3.DefaultStepSize, db, logger) + if snConfig.ExperimentalEFOptimization { + logger.Info("**************************************************") + logger.Info("****** USING EXPERIMENTAL .EF OPTIMIZATIONS ******") + logger.Info("****** ******") + logger.Info("****** NEW .EF FILES ARE -NOT- BACKWARDS ******") + logger.Info("****** COMPATIBLE ******") + logger.Info("**************************************************") + } + agg, err := libstate.NewAggregator(ctx, dirs, config3.DefaultStepSize, db, logger, snConfig.ExperimentalEFOptimization) if err != nil { return nil, nil, nil, nil, nil, nil, nil, err } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index a36daa2c1ec..9281aa7dd29 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -259,6 +259,8 @@ type Config struct { SilkwormRpcJsonCompatibility bool DisableTxPoolGossip bool + + ExperimentalEFOptimization bool } type Sync struct { diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index 68100116297..42d3e2c38bd 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -1467,7 +1467,7 @@ func dbCfg(label kv.Label, path string) mdbx.MdbxOpts { Accede(true) // integration tool: open db without creation and without blocking erigon } func openAgg(ctx context.Context, dirs datadir.Dirs, chainDB kv.RwDB, logger log.Logger) *libstate.Aggregator { - agg, err := libstate.NewAggregator(ctx, dirs, config3.DefaultStepSize, chainDB, logger) + agg, err := libstate.NewAggregator(ctx, dirs, config3.DefaultStepSize, chainDB, logger, false) if err != nil { panic(err) } diff --git a/turbo/app/squeeze_cmd.go b/turbo/app/squeeze_cmd.go index da096ff2c02..878de49e629 100644 --- a/turbo/app/squeeze_cmd.go +++ b/turbo/app/squeeze_cmd.go @@ -137,7 +137,7 @@ func squeezeStorage(ctx context.Context, dirs datadir.Dirs, logger log.Logger) e ac := agg.BeginFilesRo() defer ac.Close() - aggOld, err := state.NewAggregator(ctx, dirsOld, config3.DefaultStepSize, db, logger) + aggOld, err := state.NewAggregator(ctx, dirsOld, config3.DefaultStepSize, db, logger, false) if err != nil { panic(err) } @@ -178,7 +178,7 @@ func squeezeStorage(ctx context.Context, dirs datadir.Dirs, logger log.Logger) e func squeezeCode(ctx context.Context, dirs datadir.Dirs, logger log.Logger) error { db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen() defer db.Close() - agg, err := state.NewAggregator(ctx, dirs, config3.DefaultStepSize, db, logger) + agg, err := state.NewAggregator(ctx, dirs, config3.DefaultStepSize, db, logger, false) if err != nil { return err } diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index df05ee1b999..1930205daab 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -237,4 +237,5 @@ var DefaultFlags = []cli.Flag{ &SyncParallelStateFlushing, &utils.ChaosMonkeyFlag, + &utils.ExperimentalEFOptimizationFlag, } diff --git a/turbo/cli/flags.go b/turbo/cli/flags.go index f0502679cc3..b7d00709264 100644 --- a/turbo/cli/flags.go +++ b/turbo/cli/flags.go @@ -266,6 +266,12 @@ var ( Usage: "How often transactions should be committed to the storage", Value: txpoolcfg.DefaultConfig.CommitEvery, } + + CsvOutput = cli.StringFlag{ + Name: "csv", + Usage: "Output statistics to a CSV file", + Value: "idx_stat.csv", + } ) func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config, logger log.Logger) {