From 4134caf4c5b5c7ebc7d5f53f0be8690a986f755c Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Thu, 26 Sep 2024 19:50:21 +0400 Subject: [PATCH 01/10] migration command --- .github/image/Dockerfile | 2 +- .gitignore | 1 + Makefile | 8 +- cmd/migrate-curio/leveldb.go | 19 ++ cmd/migrate-curio/main.go | 88 ++++++ cmd/migrate-curio/migrate.go | 557 ++++++++++++++++++++++++++++++++++ cmd/migrate-curio/yugabyte.go | 98 ++++++ go.mod | 16 +- go.sum | 28 +- go.work | 2 +- go.work.sum | 6 + 11 files changed, 802 insertions(+), 23 deletions(-) create mode 100644 cmd/migrate-curio/leveldb.go create mode 100644 cmd/migrate-curio/main.go create mode 100644 cmd/migrate-curio/migrate.go create mode 100644 cmd/migrate-curio/yugabyte.go diff --git a/.github/image/Dockerfile b/.github/image/Dockerfile index 2c7981f34..7c808d9ef 100644 --- a/.github/image/Dockerfile +++ b/.github/image/Dockerfile @@ -18,7 +18,7 @@ RUN git submodule update --init RUN go mod download # Download Go dependencies for Curio -WORKDIR /app/boost +WORKDIR /app/curio RUN git submodule update --init RUN go mod download diff --git a/.gitignore b/.gitignore index ee9bf7cf3..9f2c5e3fc 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ /booster-bitswap /docgen-md /docgen-openrpc +/migrate-curio extern/filecoin-ffi/rust/target extern/boostd-data/boostd-data **/*.a diff --git a/Makefile b/Makefile index 1e77c8f34..6968c7204 100644 --- a/Makefile +++ b/Makefile @@ -133,6 +133,11 @@ boostci: $(BUILD_DEPS) $(GOCC) build $(GOFLAGS) -o boostci ./cmd/boostci .PHONY: boostci +migrate-curio: $(BUILD_DEPS) + rm -f migrate-curio + $(GOCC) build $(GOFLAGS) -o migrate-curio ./cmd/migrate-curio +.PHONY: boostci + react: validate-node-version npm_config_legacy_peer_deps=yes npm ci --no-audit --prefix react npm run --prefix react build @@ -143,7 +148,7 @@ update-react: validate-node-version npm run --prefix react build .PHONY: react -build-go: boost boostd boostx boostd-data booster-http booster-bitswap devnet migrate-lid +build-go: boost boostd boostx boostd-data booster-http booster-bitswap devnet migrate-lid migrate-curio .PHONY: build-go build: react build-go @@ -162,6 +167,7 @@ install-boost: install -C ./booster-http /usr/local/bin/booster-http install -C ./booster-bitswap /usr/local/bin/booster-bitswap install -C ./migrate-lid /usr/local/bin/migrate-lid + install -C ./migrate-curio /usr/local/bin/migrate-curio install-devnet: install -C ./devnet /usr/local/bin/devnet diff --git a/cmd/migrate-curio/leveldb.go b/cmd/migrate-curio/leveldb.go new file mode 100644 index 000000000..d4b7f8ebd --- /dev/null +++ b/cmd/migrate-curio/leveldb.go @@ -0,0 +1,19 @@ +package main + +import ( + "fmt" + + "github.com/urfave/cli/v2" +) + +var cleanupLevelDBCmd = &cli.Command{ + Name: "leveldb", + Description: "Removes the indexes and other metadata leveldb based LID store", + Usage: "migrate-curio cleanup leveldb", + Before: before, + Action: func(cctx *cli.Context) error { + fmt.Println("Please remove the directory called 'LID' in the boost repo path to remove leveldb based LID") + fmt.Println("This directory can also be present outside of Boost repo if 'boostd-data' was running with a custom repo path") + return nil + }, +} diff --git a/cmd/migrate-curio/main.go b/cmd/migrate-curio/main.go new file mode 100644 index 000000000..3aa8399d6 --- /dev/null +++ b/cmd/migrate-curio/main.go @@ -0,0 +1,88 @@ +package main + +import ( + "os" + + "github.com/filecoin-project/boost/build" + logging "github.com/ipfs/go-log/v2" + "github.com/urfave/cli/v2" +) + +var log = logging.Logger("migrate-curio") + +const ( + FlagBoostRepo = "boost-repo" +) + +var FlagRepo = &cli.StringFlag{ + Name: FlagBoostRepo, + EnvVars: []string{"BOOST_PATH"}, + Usage: "boost repo path", + Value: "~/.boost", +} + +var IsVeryVerbose bool + +var FlagVeryVerbose = &cli.BoolFlag{ + Name: "vv", + Usage: "enables very verbose mode, useful for debugging the CLI", + Destination: &IsVeryVerbose, +} + +func main() { + app := &cli.App{ + Name: "migrate-curio", + Usage: "Migrate boost to Curio", + EnableBashCompletion: true, + Version: build.UserVersion(), + Flags: []cli.Flag{ + FlagRepo, + FlagVeryVerbose, + &cli.StringFlag{ + Name: "db-host", + EnvVars: []string{"CURIO_DB_HOST", "CURIO_HARMONYDB_HOSTS"}, + Usage: "Command separated list of hostnames for yugabyte cluster", + Value: "127.0.0.1", + }, + &cli.StringFlag{ + Name: "db-name", + EnvVars: []string{"CURIO_DB_NAME", "CURIO_HARMONYDB_NAME"}, + Value: "yugabyte", + }, + &cli.StringFlag{ + Name: "db-user", + EnvVars: []string{"CURIO_DB_USER", "CURIO_HARMONYDB_USERNAME"}, + Value: "yugabyte", + }, + &cli.StringFlag{ + Name: "db-password", + EnvVars: []string{"CURIO_DB_PASSWORD", "CURIO_HARMONYDB_PASSWORD"}, + Value: "yugabyte", + }, + &cli.StringFlag{ + Name: "db-port", + EnvVars: []string{"CURIO_DB_PORT", "CURIO_HARMONYDB_PORT"}, + Value: "5433", + }, + }, + Commands: []*cli.Command{ + migrateCmd, + cleanupLIDCmd, + }, + } + app.Setup() + + if err := app.Run(os.Args); err != nil { + os.Stderr.WriteString("Error: " + err.Error() + "\n") + } +} + +func before(cctx *cli.Context) error { + _ = logging.SetLogLevel("migrate-curio", "INFO") + + if IsVeryVerbose { + _ = logging.SetLogLevel("migrate-curio", "DEBUG") + } + + return nil +} diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go new file mode 100644 index 000000000..d9fcbcdc5 --- /dev/null +++ b/cmd/migrate-curio/migrate.go @@ -0,0 +1,557 @@ +package main + +import ( + "database/sql" + "encoding/json" + "errors" + "fmt" + "path" + "time" + + "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/lib/legacy" + "github.com/filecoin-project/boost/node/repo" + "github.com/filecoin-project/boost/storagemarket/types" + "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" + "github.com/filecoin-project/boost/storagemarket/types/legacytypes" + transportTypes "github.com/filecoin-project/boost/transport/types" + "github.com/filecoin-project/curio/deps" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + vfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm" + "github.com/filecoin-project/go-state-types/abi" + verifreg9types "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" + "github.com/filecoin-project/go-statemachine/fsm" + "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + ltypes "github.com/filecoin-project/lotus/chain/types" + lcli "github.com/filecoin-project/lotus/cli" + lotus_repo "github.com/filecoin-project/lotus/node/repo" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/mitchellh/go-homedir" + "github.com/urfave/cli/v2" + "golang.org/x/net/context" + "golang.org/x/xerrors" +) + +var migrateCmd = &cli.Command{ + Name: "migrate", + Description: "Migrate boost metadata to Curio", + Usage: "migrate-curio migrate", + Before: before, + Action: func(cctx *cli.Context) error { + repoDir, err := homedir.Expand(cctx.String(FlagBoostRepo)) + if err != nil { + return err + } + + return migrate(cctx, repoDir) + }, +} + +func migrate(cctx *cli.Context, repoDir string) error { + ctx := cctx.Context + + r, err := lotus_repo.NewFS(repoDir) + if err != nil { + return err + } + ok, err := r.Exists() + if err != nil { + return err + } + if !ok { + return fmt.Errorf("repo at '%s' is not initialized", cctx.String(FlagBoostRepo)) + } + + lr, err := r.Lock(repo.Boost) + if err != nil { + return err + } + + mds, err := lr.Datastore(ctx, "/metadata") + if err != nil { + return err + } + + maddrb, err := mds.Get(context.TODO(), datastore.NewKey("miner-address")) + if err != nil { + return err + } + + maddr, err := address.NewFromBytes(maddrb) + if err != nil { + return err + } + + // Connect to full Node + full, closer, err := lcli.GetFullNodeAPIV1(cctx) + if err != nil { + return xerrors.Errorf("failed to connect to full node API: %w", err) + } + defer closer() + + // Connect to Harmony DB + hdb, err := deps.MakeDB(cctx) + if err != nil { + return xerrors.Errorf("failed to connect to harmony DB: %w", err) + } + + dbPath := path.Join(repoDir, "boost.db?cache=shared") + sqldb, err := db.SqlDB(dbPath) + if err != nil { + return fmt.Errorf("opening boost sqlite db: %w", err) + } + + mdbPath := path.Join(repoDir, "migrate-curio.db?cache=shared") + mdb, err := db.SqlDB(mdbPath) + if err != nil { + return fmt.Errorf("opening migrate sqlite db: %w", err) + } + + _, err = mdb.Exec(`CREATE TABLE IF NOT EXISTS Deals ( + ID TEXT UNIQUE, + DB BOOL, + LID BOOL, + Pipeline BOOL + );`) + if err != nil { + return fmt.Errorf("failed to create migration table: %w", err) + } + + mActor, err := full.StateGetActor(ctx, maddr, ltypes.EmptyTSK) + if err != nil { + return fmt.Errorf("getting actor for the miner %s: %w", maddr, err) + } + astore := adt.WrapStore(ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(full))) + mas, err := miner.Load(astore, mActor) + if err != nil { + return fmt.Errorf("loading miner actor state %s: %w", maddr, err) + } + liveSectors, err := miner.AllPartSectors(mas, miner.Partition.LiveSectors) + if err != nil { + return fmt.Errorf("getting live sector sets for miner %s: %w", maddr, err) + } + unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) + if err != nil { + return fmt.Errorf("getting unproven sector sets for miner %s: %w", maddr, err) + } + activeSectors, err := bitfield.MergeBitFields(liveSectors, unProvenSectors) + if err != nil { + return fmt.Errorf("merging bitfields to generate all sealed sectors on miner %s: %w", maddr, err) + } + + // Migrate Boost deals + if err := migrateBoostDeals(ctx, activeSectors, maddr, hdb, sqldb, mdb); err != nil { + return xerrors.Errorf("failed to migrate boost deals: %w", err) + } + + // Migrate Legacy deal + if err := migrateLegacyDeals(ctx, full, activeSectors, maddr, hdb, mds, mdb); err != nil { + return xerrors.Errorf("failed to migrate legacy deals: %w", err) + } + + // Migrate Direct deals + if err := migrateDDODeals(ctx, full, activeSectors, maddr, hdb, sqldb, mdb); err != nil { + return xerrors.Errorf("failed to migrate DDO deals: %w", err) + } + return nil +} + +func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, maddr address.Address, hdb *harmonydb.DB, sqldb, mdb *sql.DB) error { + sdb := db.NewDealsDB(sqldb) + + mid, err := address.IDFromAddress(maddr) + if err != nil { + return fmt.Errorf("address.IDFromAddress: %s", err) + } + + aDeals, err := sdb.ListActive(ctx) + if err != nil { + return err + } + + cDeals, err := sdb.ListCompleted(ctx) + if err != nil { + return err + } + + deals := append(aDeals, cDeals...) + + for _, deal := range deals { + llog := log.With("Boost Deal", deal.DealUuid.String()) + // Skip deals which are before add piece + if deal.Checkpoint < dealcheckpoints.AddedPiece { + llog.Infow("Skipping as checkpoint is below add piece") + continue + } + + // Skip deals which do not have chain deal ID + if deal.ChainDealID == 0 { + llog.Infow("Skipping as chain deal ID is 0") + continue + } + + // Skip deals which do not have retryable error + if deal.Retry == types.DealRetryFatal { + llog.Infow("Skipping as deal retry is fatal") + continue + } + + // SKip sector 0. This might cause some deals to not migrate but + // that is better than migrating faulty deals + if deal.SectorID == 0 { + llog.Infow("Skipping as sector ID is 0") + continue + } + + // Skip if the sector for the deal is not alive + ok, err := activeSectors.IsSet(uint64(deal.SectorID)) + if err != nil { + return err + } + if !ok { + llog.Infof("Skipping as sector %d is not alive anymore", deal.SectorID) + continue + } + + // Skip if already migrated + var a, b, c bool + err = mdb.QueryRow(`SELECT DB, LID, Pipeline FROM Deals WHERE ID = ?`, deal.DealUuid.String()).Scan(&a, &b, &c) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("deal: %s: failed to check migration status: %w", deal.DealUuid.String(), err) + } + } + if a && b && c { + llog.Infow("Skipped as this deal is already migrated") + continue + } + + propJson, err := json.Marshal(deal.ClientDealProposal.Proposal) + if err != nil { + return fmt.Errorf("deal: %s: json.Marshal(piece.DealProposal): %s", deal.DealUuid.String(), err) + } + + sigByte, err := deal.ClientDealProposal.ClientSignature.MarshalBinary() + if err != nil { + return fmt.Errorf("deal: %s: marshal client signature: %s", deal.DealUuid.String(), err) + } + + prop := deal.ClientDealProposal.Proposal + + sProp, err := deal.SignedProposalCid() + if err != nil { + return err + } + + // de-serialize transport opaque token + tInfo := &transportTypes.HttpRequest{} + if err := json.Unmarshal(deal.Transfer.Params, tInfo); err != nil { + return fmt.Errorf("deal: %s: failed to de-serialize transport params bytes '%s': %s", deal.DealUuid.String(), string(deal.Transfer.Params), err) + } + + headers, err := json.Marshal(tInfo.Headers) + if err != nil { + return fmt.Errorf("deal: %s: failed to marshal headers: %s", deal.DealUuid.String(), err) + } + + // Add deal to HarmonyDB + if !a { + _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid, + proposal_signature, proposal, piece_cid, + piece_size, offline, verified, start_epoch, end_epoch, + client_peer_id, fast_retrieval, announce_to_ipni, url, url_headers, chain_deal_id, publish_cid, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) + ON CONFLICT (uuid) DO NOTHING`, + deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, prop.PieceCID.String(), + prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(), + deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt) + + if err != nil { + return fmt.Errorf("deal: %s: failed to add the deal to harmonyDB: %w", deal.DealUuid.String(), err) + } + + // Mark deal added to harmonyDB + _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String()) + if err != nil { + return fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err) + } + } + + if !b { + // Add LID details to pieceDeal in HarmonyDB + _, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + deal.DealUuid.String(), prop.PieceCID.String(), true, mid, deal.SectorID, deal.Offset, prop.PieceSize, deal.NBytesReceived, false) + if err != nil { + return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal: %w", deal.DealUuid.String(), err) + } + + // Mark deal added to pieceDeal in HarmonyDB + _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String()) + if err != nil { + return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err) + } + } + + if !c { + var proof abi.RegisteredSealProof + err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err) + } + + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, + after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, + sealed, should_index, indexing_created_at, announce) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, + deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline, + true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) + if err != nil { + return fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err) + } + } + } + + return nil +} + +func migrateLegacyDeals(ctx context.Context, full v1api.FullNode, activeSectors bitfield.BitField, maddr address.Address, hdb *harmonydb.DB, ds datastore.Batching, mdb *sql.DB) error { + mid, err := address.IDFromAddress(maddr) + if err != nil { + return fmt.Errorf("address.IDFromAddress: %s", err) + } + + // Get the deals FSM + provDS := namespace.Wrap(ds, datastore.NewKey("/deals/provider")) + deals, migrate, err := vfsm.NewVersionedFSM(provDS, fsm.Parameters{ + StateType: legacytypes.MinerDeal{}, + StateKeyField: "State", + }, nil, "2") + if err != nil { + return fmt.Errorf("reading legacy deals from datastore: %w", err) + } + + err = migrate(ctx) + if err != nil { + return fmt.Errorf("running provider fsm migration script: %w", err) + } + + lm := legacy.NewLegacyDealsManager(deals) + go lm.Run(ctx) + // Wait for 5 seconds + time.Sleep(time.Second * 5) + + legacyDeals, err := lm.ListDeals() + if err != nil { + return fmt.Errorf("getting legacy deals: %w", err) + } + + head, err := full.ChainHead(ctx) + if err != nil { + return err + } + + for _, deal := range legacyDeals { + llog := log.With("Boost Deal", deal.ProposalCid.String()) + // Skip deals which do not have chain deal ID + if deal.DealID == 0 { + llog.Infow("Skipping as chain deal ID is 0") + continue + } + + // SKip sector 0. This might cause some deals to not migrate but + // that is better than migrating faulty deals + if deal.SectorNumber == 0 { + llog.Infow("Skipping as sector ID is 0") + continue + } + + // Skip expired legacy deals + if deal.ClientDealProposal.Proposal.EndEpoch < head.Height() { + llog.Infow("Deal end epoch is lower than current height") + continue + } + + // Skip if the sector for the deal is not alive + ok, err := activeSectors.IsSet(uint64(deal.SectorNumber)) + if err != nil { + return err + } + if !ok { + llog.Infof("Skipping as sector %d is not alive anymore", deal.SectorNumber) + continue + } + + // Skip if already migrated + var a, b, c bool + err = mdb.QueryRow(`SELECT DB, LID, Pipeline FROM Deals WHERE ID = ?`, deal.ProposalCid.String()).Scan(&a, &b, &c) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("deal: %s: failed to check migration status: %w", deal.ProposalCid.String(), err) + } + } + if a && b && c { + llog.Infow("Skipped as this deal is already migrated") + continue + } + + propJson, err := json.Marshal(deal.ClientDealProposal.Proposal) + if err != nil { + return fmt.Errorf("deal: %s: json.Marshal(piece.DealProposal): %s", deal.ProposalCid.String(), err) + } + + sigByte, err := deal.ClientDealProposal.ClientSignature.MarshalBinary() + if err != nil { + return fmt.Errorf("deal: %s: marshal client signature: %s", deal.ProposalCid.String(), err) + } + + prop := deal.ClientDealProposal.Proposal + + _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (signed_proposal_cid, sp_id, client_peer_id, + proposal_signature, proposal, piece_cid, + piece_size, verified, start_epoch, end_epoch, + publish_cid, chain_deal_id, fast_retrieval, created_at, sector_num) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + ON CONFLICT (signed_proposal_cid) DO NOTHING`, + deal.ProposalCid, mid, deal.Client.String(), sigByte, propJson, prop.PieceCID.String(), + prop.PieceSize, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.PublishCid.String(), + deal.FastRetrieval, deal.CreationTime, deal.SectorNumber) + + if err != nil { + return fmt.Errorf("deal: %s: failed to add the legacy deal to harmonyDB: %w", deal.ProposalCid.String(), err) + } + + // Mark deal added to harmonyDB + _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, TRUE) ON CONFLICT(ID) DO NOTHING`, deal.ProposalCid.String()) + if err != nil { + return fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.ProposalCid.String(), err) + } + } + + return nil +} + +func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bitfield.BitField, maddr address.Address, hdb *harmonydb.DB, sqldb, mdb *sql.DB) error { + ddb := db.NewDirectDealsDB(sqldb) + + mid, err := address.IDFromAddress(maddr) + if err != nil { + return fmt.Errorf("address.IDFromAddress: %s", err) + } + + deals, err := ddb.ListAll(ctx) + if err != nil { + return fmt.Errorf("failed to get all DDO deals: %w", err) + } + + for _, deal := range deals { + llog := log.With("Boost Deal", deal.ID.String()) + if deal.Err != "" && deal.Retry == types.DealRetryFatal { + llog.Infow("Skipping as deal retry is fatal") + continue + } + + if deal.Checkpoint < dealcheckpoints.AddedPiece { + llog.Infow("Skipping as checkpoint is below add piece") + continue + } + + claim, err := full.StateGetClaim(ctx, maddr, verifreg9types.ClaimId(deal.AllocationID), ltypes.EmptyTSK) + if err != nil { + return fmt.Errorf("deal: %s: error getting the claim status: %w", deal.ID.String(), err) + } + if claim == nil { + llog.Infow("Skipping as checkpoint is below add piece") + continue + } + if claim.Sector != deal.SectorID { + return fmt.Errorf("deal: %s: sector mismatch for deal", deal.ID.String()) + } + + // Skip if the sector for the deal is not alive + ok, err := activeSectors.IsSet(uint64(deal.SectorID)) + if err != nil { + return err + } + if !ok { + llog.Infow("Skipping as sector ID is 0") + continue + } + + // Skip if already migrated + var a, b, c bool + err = mdb.QueryRow(`SELECT DB, LID, Pipeline FROM Deals WHERE ID = ?`, deal.ID.String()).Scan(&a, &b, &c) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("deal: %s: failed to check migration status: %w", deal.ID.String(), err) + } + } + if a && b && c { + continue + } + + if !a { + // Add DDO deal to harmonyDB + _, err = hdb.Exec(ctx, `INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified, + start_epoch, end_epoch, allocation_id, piece_cid, piece_size, fast_retrieval, announce_to_ipni) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (uuid) DO NOTHING`, + deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID, + deal.PieceCID.String(), deal.PieceSize, true, true) + + if err != nil { + return fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err) + } + + // Mark deal added to harmonyDB + _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String()) + if err != nil { + return fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err) + } + } + + if !b { + // Add LID details to pieceDeal in HarmonyDB + _, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + deal.ID.String(), deal.PieceCID.String(), false, mid, deal.SectorID, deal.Offset, deal.PieceSize, deal.InboundFileSize, false) + if err != nil { + return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal for DDO deal %s: %w", deal.ID.String(), deal.ID.String(), err) + } + + // Mark deal added to pieceDeal in HarmonyDB + _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String()) + if err != nil { + return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err) + } + } + + // TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals + if !c { + var proof abi.RegisteredSealProof + err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err) + } + + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, + after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, + sealed, should_index, indexing_created_at, announce) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, + deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true, + true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) + if err != nil { + return fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err) + } + } + } + + return nil +} diff --git a/cmd/migrate-curio/yugabyte.go b/cmd/migrate-curio/yugabyte.go new file mode 100644 index 000000000..856e034f5 --- /dev/null +++ b/cmd/migrate-curio/yugabyte.go @@ -0,0 +1,98 @@ +package main + +import ( + "fmt" + + "github.com/filecoin-project/boost/extern/boostd-data/yugabyte" + "github.com/urfave/cli/v2" + "github.com/yugabyte/pgx/v4/pgxpool" +) + +var cleanupLIDCmd = &cli.Command{ + Name: "cleanup", + Description: "Removes the indexes and other metadata from LID", + Usage: "migrate-curio cleanup", + Subcommands: []*cli.Command{ + cleanupLevelDBCmd, + cleanupYugabyteDBCmd, + }, +} + +var cleanupYugabyteDBCmd = &cli.Command{ + Name: "yugabyte", + Description: "Removes the indexes and other metadata from Yugabyte based LID store", + Usage: "migrate-curio cleanup yugabyte", + Before: before, + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "hosts", + Usage: "yugabyte hosts to connect to over cassandra interface eg '127.0.0.1'", + Required: true, + }, + &cli.StringFlag{ + Name: "username", + Usage: "yugabyte username to connect to over cassandra interface eg 'cassandra'", + }, + &cli.StringFlag{ + Name: "password", + Usage: "yugabyte password to connect to over cassandra interface eg 'cassandra'", + }, + &cli.StringFlag{ + Name: "connect-string", + Usage: "postgres connect string eg 'postgresql://postgres:postgres@localhost'", + Required: true, + }, + &cli.BoolFlag{ + Name: "i-know-what-i-am-doing", + Usage: "confirmation flag", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Bool("i-know-what-i-am-doing") { + return fmt.Errorf("please use --i-know-what-i-am-doing flag to confirm. THIS CANNOT BE UNDONE.") + } + + // Create a yugabyte data service + settings := yugabyte.DBSettings{ + Hosts: cctx.StringSlice("hosts"), + Username: cctx.String("username"), + Password: cctx.String("password"), + ConnectString: cctx.String("connect-string"), + } + + settings.CQLTimeout = 60 + + cluster := yugabyte.NewCluster(settings) + + session, err := cluster.CreateSession() + if err != nil { + return fmt.Errorf("creating yugabyte cluster: %w", err) + } + + query := `DROP KEYSPACE idx` + log.Debug(query) + err = session.Query(query).WithContext(cctx.Context).Exec() + if err != nil { + return err + } + + // Create connection pool to postgres interface + db, err := pgxpool.Connect(cctx.Context, settings.ConnectString) + if err != nil { + return err + } + + _, err = db.Exec(cctx.Context, `DROP TABLE IF EXISTS PieceTracker CASCADE;`) + if err != nil { + return err + } + + _, err = db.Exec(cctx.Context, `DROP TABLE IF EXISTS PieceFlagged CASCADE;`) + if err != nil { + return err + } + + return nil + }, +} diff --git a/go.mod b/go.mod index 4b2f79b40..c34934c48 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/filecoin-project/boost -go 1.22 +go 1.22.3 replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi @@ -21,6 +21,7 @@ require ( github.com/fatih/color v1.16.0 github.com/filecoin-project/boost-graphsync v0.13.12 github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f + github.com/filecoin-project/curio v1.23.2-0.20241017101155-587dd6361137 github.com/filecoin-project/dagstore v0.7.0 github.com/filecoin-project/go-address v1.2.0 github.com/filecoin-project/go-bitfield v0.2.4 @@ -30,13 +31,13 @@ require ( github.com/filecoin-project/go-ds-versioning v0.1.2 github.com/filecoin-project/go-fil-commcid v0.2.0 github.com/filecoin-project/go-fil-commp-hashhash v0.2.0 - github.com/filecoin-project/go-jsonrpc v0.6.0 + github.com/filecoin-project/go-jsonrpc v0.6.1-0.20240820160949-2cfe810e5d2f github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-state-types v0.15.0-rc1 github.com/filecoin-project/go-statemachine v1.0.3 github.com/filecoin-project/go-statestore v0.2.0 - github.com/filecoin-project/lotus v1.30.0-rc2 + github.com/filecoin-project/lotus v1.30.0-rc2.0.20241016173451-c07d2f73e436 github.com/filecoin-project/specs-actors v0.9.15 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/gbrlsnchs/jwt/v3 v3.0.1 @@ -107,6 +108,7 @@ require ( github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 github.com/whyrusleeping/cbor-gen v0.1.2 + github.com/yugabyte/pgx/v4 v4.14.5 go.opencensus.io v0.24.0 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 @@ -174,6 +176,7 @@ require ( github.com/filecoin-project/go-amt-ipld/v3 v3.1.0 // indirect github.com/filecoin-project/go-amt-ipld/v4 v4.4.0 // indirect github.com/filecoin-project/go-clock v0.1.0 // indirect + github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8ffe8 // indirect github.com/filecoin-project/go-commp-utils/v2 v2.1.0 // indirect github.com/filecoin-project/go-crypto v0.1.0 // indirect github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc8 // indirect @@ -266,7 +269,7 @@ require ( github.com/jessevdk/go-flags v1.4.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect github.com/josharian/intern v1.0.0 // indirect - github.com/kilic/bls12-381 v0.1.0 // indirect + github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect @@ -347,7 +350,7 @@ require ( github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect - github.com/triplewz/poseidon v0.0.1 // indirect + github.com/triplewz/poseidon v0.0.2-0.20240407130934-5265fab9d889 // indirect github.com/twmb/murmur3 v1.1.6 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.1 // indirect @@ -359,7 +362,6 @@ require ( github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect github.com/yugabyte/gocql v0.0.0-20230831121436-1e2272bb6bb6 // indirect - github.com/yugabyte/pgx/v4 v4.14.5 // indirect github.com/yugabyte/pgx/v5 v5.5.3-yb-2 // indirect github.com/zondax/hid v0.9.2 // indirect github.com/zondax/ledger-filecoin-go v0.11.1 // indirect @@ -390,5 +392,3 @@ require ( lukechampine.com/blake3 v1.3.0 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) - -replace github.com/KarpelesLab/reflink => github.com/magik6k/reflink v1.0.2-patch1 diff --git a/go.sum b/go.sum index 87ea76bd7..6bf37f9e8 100644 --- a/go.sum +++ b/go.sum @@ -332,6 +332,8 @@ github.com/filecoin-project/boost-graphsync v0.13.12 h1:fAGaHRIYoN6cPMs2ChVymio8 github.com/filecoin-project/boost-graphsync v0.13.12/go.mod h1:bc2M5ZLZJtXHl8kjnqtn4L1MsdEqpJErDaIeY0bJ9wk= github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f h1:8dd0yAadyeOL5Qd42XhEwD60UKvIFkY2MLhef/IaeOk= github.com/filecoin-project/boost/extern/boostd-data v0.0.0-20231124125934-3233c510357f/go.mod h1:MyzvfYWAH0OAyf95TLUWYq3cO3vm/TVzDS57GKQi47o= +github.com/filecoin-project/curio v1.23.2-0.20241017101155-587dd6361137 h1:vb2R0gbX8yN/Qod0CKL+u2xtu/KuR817AhObMnDYkXY= +github.com/filecoin-project/curio v1.23.2-0.20241017101155-587dd6361137/go.mod h1:mQ4yfT238Yu0bsRBfxsqnQb9uRshkvUM5wIllu0QzV8= github.com/filecoin-project/dagstore v0.7.0 h1:IS0R+69za8dguYWeqz/MI+nb7ONpk03tAkxPCBXEKm0= github.com/filecoin-project/dagstore v0.7.0/go.mod h1:YKn4qXih+/2xQWpfJsaKGOi4POw5vH5grDmfPCCnx8g= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -357,6 +359,8 @@ github.com/filecoin-project/go-clock v0.1.0 h1:SFbYIM75M8NnFm1yMHhN9Ahy3W5bEZV9g github.com/filecoin-project/go-clock v0.1.0/go.mod h1:4uB/O4PvOjlx1VCMdZ9MyDZXRm//gkj1ELEbxfI1AZs= github.com/filecoin-project/go-commp-utils v0.1.4 h1:/WSsrAb0xupo+aRWRyD80lRUXAXJvYoTgDQS1pYZ1Mk= github.com/filecoin-project/go-commp-utils v0.1.4/go.mod h1:Sekocu5q9b4ECAUFu853GFUbm8I7upAluummHFe2kFo= +github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8ffe8 h1:jAG2g1Fs/qoDSSaI8JaP/KmqR+QQ8IVQ6k9xKONa72M= +github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20240802040721-2a04ffc8ffe8/go.mod h1:kU2KuSPLB+Xz4FEbVE0abzSN4l6irZ8tqgcYWPVDftU= github.com/filecoin-project/go-commp-utils/v2 v2.1.0 h1:KWNRalUp2bhN1SW7STsJS2AHs9mnfGKk9LnQgzDe+gI= github.com/filecoin-project/go-commp-utils/v2 v2.1.0/go.mod h1:NbxJYlhxtWaNhlVCj/gysLNu26kYII83IV5iNrAO9iI= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= @@ -382,8 +386,8 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGy github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 h1:nYs6OPUF8KbZ3E8o9p9HJnQaE8iugjHR5WYVMcicDJc= github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0/go.mod h1:s0qiHRhFyrgW0SvdQMSJFQxNa4xEIG5XvqCBZUEgcbc= -github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY= -github.com/filecoin-project/go-jsonrpc v0.6.0/go.mod h1:/n/niXcS4ZQua6i37LcVbY1TmlJR0UIK9mDFQq2ICek= +github.com/filecoin-project/go-jsonrpc v0.6.1-0.20240820160949-2cfe810e5d2f h1:0FMH/uwBH7RinWrE+TkiOotYoqxSM54teKx/olJ/cWs= +github.com/filecoin-project/go-jsonrpc v0.6.1-0.20240820160949-2cfe810e5d2f/go.mod h1:/n/niXcS4ZQua6i37LcVbY1TmlJR0UIK9mDFQq2ICek= github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs= github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ= github.com/filecoin-project/go-paramfetch v0.0.4 h1:H+Me8EL8T5+79z/KHYQQcT8NVOzYVqXIi7nhb48tdm8= @@ -405,8 +409,8 @@ github.com/filecoin-project/go-statestore v0.2.0 h1:cRRO0aPLrxKQCZ2UOQbzFGn4WDNd github.com/filecoin-project/go-statestore v0.2.0/go.mod h1:8sjBYbS35HwPzct7iT4lIXjLlYyPor80aU7t7a/Kspo= github.com/filecoin-project/go-storedcounter v0.1.0 h1:Mui6wSUBC+cQGHbDUBcO7rfh5zQkWJM/CpAZa/uOuus= github.com/filecoin-project/go-storedcounter v0.1.0/go.mod h1:4ceukaXi4vFURIoxYMfKzaRF5Xv/Pinh2oTnoxpv+z8= -github.com/filecoin-project/lotus v1.30.0-rc2 h1:LLzMnb6dqxN5QHj4IAvDpFPYp8InXY8fvcTGr4uhpnw= -github.com/filecoin-project/lotus v1.30.0-rc2/go.mod h1:gXQFTK6OpJIjg2yWnYsf0awszREDffb/X+LPCDmZkwI= +github.com/filecoin-project/lotus v1.30.0-rc2.0.20241016173451-c07d2f73e436 h1:hr/BnP02MP5I00CTvhed3eJILDksobzhmbi9zwwvylg= +github.com/filecoin-project/lotus v1.30.0-rc2.0.20241016173451-c07d2f73e436/go.mod h1:SzG9YCzgmShS36hGG8PUyASa6pf91zlMsiIARxuCXQQ= github.com/filecoin-project/pubsub v1.0.0 h1:ZTmT27U07e54qV1mMiQo4HDr0buo8I1LDHBYLXlsNXM= github.com/filecoin-project/pubsub v1.0.0/go.mod h1:GkpB33CcUtUNrLPhJgfdy4FDx4OMNR9k+46DHx/Lqrg= github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= @@ -1075,8 +1079,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= -github.com/kilic/bls12-381 v0.1.0 h1:encrdjqKMEvabVQ7qYOKu1OvhqpK4s47wDYtNiPtlp4= -github.com/kilic/bls12-381 v0.1.0/go.mod h1:vDTTHJONJ6G+P2R74EhnyotQDTliQDnFEwhdmfzw1ig= +github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4 h1:xWK4TZ4bRL05WQUU/3x6TG1l+IYAqdXpAeSLt/zZJc4= +github.com/kilic/bls12-381 v0.1.1-0.20220929213557-ca162e8a70f4/go.mod h1:tlkavyke+Ac7h8R3gZIjI5LKBcvMlSWnXNMgT3vZXo8= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -1742,8 +1746,8 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= @@ -1817,6 +1821,8 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= +github.com/snadrus/must v0.0.0-20240605044437-98cedd57f8eb h1:78YgPq3NbWnO4xyNhLsn2zitc7NiZpjQZ560rsxVLm4= +github.com/snadrus/must v0.0.0-20240605044437-98cedd57f8eb/go.mod h1:YnCEHk4UlWROjM3YzC2pbSTq+iynM3ZaLoVDUI8QGpE= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= @@ -1881,8 +1887,8 @@ github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0h github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/triplewz/poseidon v0.0.1 h1:G5bdkTzb9R5K5Dd3DIzBCp7rAErP1zWH0LW7Ip6bxIA= -github.com/triplewz/poseidon v0.0.1/go.mod h1:QYG1d0B4YZD7TgF6qZndTTu4rxUGFCCZAQRDanDj+9c= +github.com/triplewz/poseidon v0.0.2-0.20240407130934-5265fab9d889 h1:cbYPZOEknyV/Gyud82ebTPiciOnVSv6tiMCQi5Y+mAs= +github.com/triplewz/poseidon v0.0.2-0.20240407130934-5265fab9d889/go.mod h1:fmoxtMcbtMUjlSJmpuS3Wk/oKSvdJpIp9YWRbsOu3T0= github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o= @@ -2328,7 +2334,6 @@ golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201101102859-da207088b7d1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -2344,7 +2349,6 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/go.work b/go.work index abdbb01e4..013a26e98 100644 --- a/go.work +++ b/go.work @@ -1,4 +1,4 @@ -go 1.22 +go 1.22.3 use ( . diff --git a/go.work.sum b/go.work.sum index a0111e92f..d36365de6 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1095,6 +1095,8 @@ github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKz github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7 h1:mreN1m/5VJ/Zc3b4pzj9qU6D9SRQ6Vm+3KfI328t3S8= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Joker/jade v1.1.3/go.mod h1:T+2WLyt7VH6Lp0TRxQrUYEs64nRc83wkMQrfeIQKduM= +github.com/KarpelesLab/reflink v1.0.1 h1:d+tdjliwOCqvub9bl0Y02GxahWkNqejNb3TZTTUcQWA= +github.com/KarpelesLab/reflink v1.0.1/go.mod h1:WGkTOKNjd1FsJKBw3mu4JvrPEDJyJJ+JPtxBkbPoCok= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk= github.com/Masterminds/glide v0.13.2 h1:M5MOH04TyRiMBVeWHbifqTpnauxWINIubTCOkhXh+2g= @@ -2312,6 +2314,8 @@ github.com/lyft/protoc-gen-validate v0.0.0-20180911180927-64fcb82c878e/go.mod h1 github.com/lyft/protoc-gen-validate v0.0.13 h1:KNt/RhmQTOLr7Aj8PsJ7mTronaFyx80mRTT9qF261dA= github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= +github.com/magik6k/reflink v1.0.2-patch1 h1:NXSgQugcESI8Z/jBtuAI83YsZuRauY9i9WOyOnJ7Vns= +github.com/magik6k/reflink v1.0.2-patch1/go.mod h1:WGkTOKNjd1FsJKBw3mu4JvrPEDJyJJ+JPtxBkbPoCok= github.com/mailgun/raymond/v2 v2.0.48/go.mod h1:lsgvL50kgt1ylcFJYZiULi5fjPBkkhNfj4KA0W54Z18= github.com/marten-seemann/qpack v0.2.1 h1:jvTsT/HpCn2UZJdP+UUB53FfUUgeOyG5K1ns0OJOGVs= github.com/marten-seemann/qpack v0.3.0 h1:UiWstOgT8+znlkDPOg2+3rIuYXJ2CnGDkGUXN6ki6hE= @@ -2663,6 +2667,7 @@ github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52 h1:RnWNS9Hlm8BI github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -3257,6 +3262,7 @@ golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= From b22f37f3e3cec89ba65960e190160398ec297ad1 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 2 Oct 2024 21:40:01 +0400 Subject: [PATCH 02/10] add indexer remove all --- api/api.go | 1 + api/proxy_gen.go | 13 ++++ build/openrpc/boost.json.gz | Bin 3233 -> 3295 bytes cmd/boostd/index.go | 27 ++++++++ documentation/en/api-v1-methods.md | 17 ++++- gql/resolver_ipni.go | 4 ++ gql/schema.graphql | 3 + indexprovider/wrapper.go | 90 +++++++++++++++++++++++-- node/config/def.go | 3 + node/config/doc_gen.go | 14 ++++ node/config/types.go | 6 ++ node/impl/boost.go | 4 ++ node/modules/directdeals.go | 1 + node/modules/storageminer.go | 1 + react/src/Ipni.js | 15 ++++- react/src/gql.js | 7 ++ storagemarket/direct_deals_provider.go | 4 ++ storagemarket/provider.go | 7 ++ 18 files changed, 209 insertions(+), 8 deletions(-) diff --git a/api/api.go b/api/api.go index 91f4fd425..6f7a0b943 100644 --- a/api/api.go +++ b/api/api.go @@ -26,6 +26,7 @@ type Boost interface { Net // MethodGroup: Boost + BoostIndexerRemoveAll(ctx context.Context) ([]cid.Cid, error) //perm:admin BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin BoostIndexerListMultihashes(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) //perm:admin BoostIndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 8f3ce38a0..b98bf78bb 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -60,6 +60,8 @@ type BoostStruct struct { BoostIndexerListMultihashes func(p0 context.Context, p1 []byte) ([]multihash.Multihash, error) `perm:"admin"` + BoostIndexerRemoveAll func(p0 context.Context) ([]cid.Cid, error) `perm:"admin"` + BoostLegacyDealByProposalCid func(p0 context.Context, p1 cid.Cid) (legacytypes.MinerDeal, error) `perm:"admin"` BoostOfflineDealWithData func(p0 context.Context, p1 uuid.UUID, p2 string, p3 bool) (*ProviderDealRejectionInfo, error) `perm:"admin"` @@ -355,6 +357,17 @@ func (s *BoostStub) BoostIndexerListMultihashes(p0 context.Context, p1 []byte) ( return *new([]multihash.Multihash), ErrNotSupported } +func (s *BoostStruct) BoostIndexerRemoveAll(p0 context.Context) ([]cid.Cid, error) { + if s.Internal.BoostIndexerRemoveAll == nil { + return *new([]cid.Cid), ErrNotSupported + } + return s.Internal.BoostIndexerRemoveAll(p0) +} + +func (s *BoostStub) BoostIndexerRemoveAll(p0 context.Context) ([]cid.Cid, error) { + return *new([]cid.Cid), ErrNotSupported +} + func (s *BoostStruct) BoostLegacyDealByProposalCid(p0 context.Context, p1 cid.Cid) (legacytypes.MinerDeal, error) { if s.Internal.BoostLegacyDealByProposalCid == nil { return *new(legacytypes.MinerDeal), ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index b3922ced318090f2c0968301c260c4dfa4ced6bd..59679d547c85998cd475955a7ea127dff7f3d293 100644 GIT binary patch literal 3295 zcmV<53?TC#iwFP!00000|Li?$Q`usvJuDK zdn~E60eA5M=bl7ogG1CC^s#|&L3ori+{Nw2UZc4q%od&+*a11_mWR8S_m~k!t^7!B zFgaz71Dl&Sp5y{Pfy4&3z%vDPWeDcmw{O?{D;xRDL|Yzu6Kr@cg>TAjG`DoiZ6cF1 z33sw_82;ZtZVQ>zY8+B)8*N=)4JQ%|1VV7Jjqbm)uab_C*XT0|f)RTj7xex;V$=@j zeU$>FFBp2m;ClpV+dv#{niiG~xouG4)VL)CM-NO9XL3qWD4)cR1N`xkm#U zMqken-)&_G9g%4c6ZOFygG16+9?+Ia)VF|1I-(#lunwt(1h@jc0AqqYutz)LoFQe` z;XN^lh-C@zJg^X*L2IDCL^L?30~Q)i5JA6(Y-J#V4!H#;+WJ1G<`_|CQIkkedpN!= zsj0r*afx{Y)@&C!abX(~W+8%3)vPh`#)@umM;zA%MH=SY_LYIvRNTG9o)nZ_lclwSf~eOF&6U+bgk`=;sAc>q$~a|NdlilqkeB(+P(#ws!LXtW zL9;0-{9El#3jJutvsMAEw?!)8n0Tv-|1t53MSsY-T~79oiKmzQkeVyWeu!c%_s>i2 z70+bo5)BsHb+uCp8?_Dyd){)~{y9gE)#4){S#m9x;doh|{Q5q%8ehKjkG9d)8_!X+ z^cn-fZ>a@BEj*W`z|)Vn(91NVb|r~UA^hOdJUe_jH?sLQ{BK9|SNxxHEvbZP>s8ED z#wuMI*dcfQ5NK=Pzz$`pPw{*r1evTMi}|;o!F%b{VnW_yHUEoGE!-_fR0AIhPzUHq zwWQf>?F4@=n$2$T=WjK|roaN;83^ujkJxJ8xqnkI;jn*%yGoH6;pf0F?zWnpwt;^K zp|q|q1Pr%sQZkRjONna?E=@`KCGu>#v_wha7%S@Q$uk_&PT z13^s~fQY`t%@p)QZrenHP?!W?gorE*Pvh}(dd)~E3itRT;EA?@kBB6=n+<@bQv>(C z-$o;wGDuDeg!yi`CVlt;fKXJ!t@cspSI+H{cu!?I?q)BKka?Crq{ZyOit8azUmG+^fY~s~;*5Mz1r zVA0?oL&}Jld|)QG)DgOPNIclzQ>bPR7YaZz@shQCsGJV9cCFJEF3N0|RZ~?VB-*kJ zw?vw^>L!UTC~(>{cagFhW$;U^MP(Ec}sY@3@TpzsmyHe;VasrS(J z4`sAxS#zOYnfNY0tT_T3B=klSMBh=@5X;UPIQ$mAFb{|g)**K%@i*r;)K%_y?3}OW z+0PxelpCHpeN_h_xr`raC^fX!S+r{Y=2F|4a8Pw~l)Zi(vRP)(&Z^}0t*YF$X{1~g zmQ5oqn_5QXd!`AlOfWnH;)SXtEpt@`Wd>A7*m#S4lld|X@9*-uF(SaV^&xnar>W72N`9$f}d>6GvW==8qx?Rs|HS${2NIS1Xs!mei znwYDWyXL}ruy`fe;xV*H`aE^C@^s~}R(G#i-F22Vl8r?AgnDvD5?G_kX8};YzZAUe z0c~AgIkTIXtt+eb7B*Y@atX~gD=Wb4As@kq`T;LF(7%L1LrD#~`dMZCzk}}z*p1EEh#TwYc5VrKA`O7gnyOB(CPep`%Q zqVjBU`kNAx6WA_o*Xb8zt2Y$_K2gctU-oQz;&2~K)@w(X1F1MJv4e$SW#B1gVOe0I zeCKSpM9j7OR2TXDbJp?7({-eET2jjp8A9dFDpRmpSCML~NR#5SRC9!$WqtIS<)wz= zI*nu`xPa9wN@wI;aeY{<+}&$O*B6zbVgsLoBo>h*89BF!H(u!Cpew&T1Vu9sbGK{3a5?L7mZk?+BaZc_EFy<0+UGPL0%7a4Tt z>t2IWD$3af4J0b3S~W)Za7-8jyC0PF^cj_uNWR8(o`L6bX05{!>L{IA@lNRL2c@!6 z$BL^6OOf1#i!@0(G9bR%sbhwiv5K3EYA-R>daGPMRB=MRx z+u2r1En)IWWPd^MhV(SvPNZsm&WYKj$=WI3?l+2j3OM7-cwxe117dqK%<~)Tsunv4`TS4i^M=oYL zpGp#t1ElcR@?2@i0l2PU++|kso|ei d8nf0XCwrZ@csjkB{x1Lk|NpT4DYCwk002umeHH)! literal 3233 zcmV;S3|{jeiwFP!00000|Li?$Q`4yMq$u4yB0^J*Bo+|bU z(St2{Wtoy?^5374FR>HL@g+c^c*;0XgNChx-?Im=Q;f{6K9m zIc2SVo1533>`A$xhg3Oxabq zPfQ|WSpqx{EJPR3T4*Q{4c6&^g~l^P(C;Bz8Hk_*Zh?umzD=n)MU+|8Bofpdj&Dn9 zs&98(VqSx_m_^Q9*hYj|h@cbIYf8MSq8r>2$F)I`hWWOAX<*eA_b;#~1!Y(Ga&BM& z|M*mZh5J(YpqA?w;BX1ZrtBIU*cUeLAC4|1X$wcdVpzcWm?Vu0uQtVwbpd*A@JB_MaXjy z%~G+vQc%N-6$53mv{q0M^_rnwTOCAL=BtE8#-E~$QB>faLE4mQ0 z=#s*})9t0uk9s_770`KIqymnJS6BRxh*vE7W6tezvVTN8z1)Y?tR?#)ijCYqFS%Dd zlc7u0SZvqTOet*CI3VnK%W?NjjvTAOhd{FAS}()#usr$oeQLEne;yugqpeq-qiE^1 zMuOi^3xpbYCP{&(A5Wo|X-4fz5}iW$!KHbz_;PM!`={{Vj^5Sy4Vz0z&j(sUG5QE&AaqB1rrX2hq$j4nGt>t{NjG6-Rm0ocMwXu zT7jTN)C3Sf8D^*pVIyKX5fj>zwDcT+#c$XYnLm(wckm9T*tKUg~+0R3NjEN;!BPV{AqtC!8-*L=#7%$ zw+8+|JgIt7xJfv5z!>+L3Fmw``ZNq%m8cWT3e;jK7$787Vi=h`H`R2P0dW_pmxXa% zrJ6>^MI)lm^JT(R>puuaC?AI?8d^lcj-(DGqaqh%zQVm)ytVqF0&z5gR$+9Em;)9K z{uon6#OysYxuurS$7ABbn>~eUW^tha6caBQ%g4&;P-8b*ZQ-EIbXhf36+)sd%WzAi zd8c6Dai_y8fYz z4lHXa)GHI;<%czgV1tCgM1mMPY8ztNIR%H`z-Q(GvB5gv?kxW1?3%jD9gm&!dY=8# zVoSNN?NZDA!5cC*RJ-TTx@G<|SfI<$FGF*8I6so;7R6q8cc9QE7h3%Qo7IDv=69QO$g7 zZbOI*z$%BQ`ei+MpxRUg{Rkm!Qx()y1vOPcO>(12ZZye_Cz;%6s)CxTpr$J5$IB=s z$2po~9`|(2Ts>V`Y*gM0R(ZX4 zue{1TRz$3NWos@RZH~RJatlE|f?iwTzOg!M84)f7MNO<-7Q9^}RV6pLA=D;g8$NK6L3g=&E-1*LoSoA^qH=29 zVssCtgfXy(L1Dm#8*i zIa`rm?WQlAd7gD_%gj~ho`SL^9VH5BR-~~iHfeU!?9f69XunIV7F~x>Ddyc-kbk2= zQzzKe2|o2Y!4+Sj@}8tzi0TDnlZv8I|BjG~yt8bzOO2>i|G4aElqefpwWOt3E9EJb zW5vJ=t5U1CfjG5r-7?cJ@uf0QurP$ z1G!xkZi0c_9#Sf3v)f2CSlr*eB>E+)C8>p|6*O*b7K5QJu%*UlXEfD2@%&IKY|>1*VVLhdZfNu?NvIx7QSmB@-zqD^*WWC z27NDTb_Lf5sZLwF+N`V;^e!bX-gV%ZDz`n#A;ms;y6uFrOg{32+-BRN{M#b+sA6+Z z630!_jkhjUNX5xU_~#Rk5=&K&tX34_aH$nW#Hg0>@Rq4jYQz#vOOB`)4pOs5f+X6R zQf_pz%VoN+a9B0Ov@yl=N?rY6cH!ron?!zN$|c<==Cxn3t};C`_p-pKhBEikkTj4@ zF9}&eDE~>`!(9xr_M*VYb~Kw?N{)MPb8aaaS^GY&PbCS+0aEz1JXacW z0B$N6`FK7x8EGDl-|NHi7bfj{?VOW~BP+URqN;pH!>V0`wHUU!+$QrsNGS8t_@k3^ z%xh$Yhcpj?*s+4F$|_k11$H2nyj;snS-_j~l@{4J5!sYKJxvY&w9=T3J~`g&?&10T Ta{j*n00960!o!XpGmroPdagv) diff --git a/cmd/boostd/index.go b/cmd/boostd/index.go index 0e4b0fd69..35dc919dd 100644 --- a/cmd/boostd/index.go +++ b/cmd/boostd/index.go @@ -22,6 +22,7 @@ var indexProvCmd = &cli.Command{ indexProvAnnounceLatestHttp, indexProvAnnounceDealRemovalAd, indexProvAnnounceDeal, + indexProvRemoveAllCmd, }, } @@ -303,3 +304,29 @@ var indexProvAnnounceDeal = &cli.Command{ return nil }, } + +var indexProvRemoveAllCmd = &cli.Command{ + Name: "remove-all", + Usage: "Announce all removal ad for all contextIDs", + Flags: []cli.Flag{}, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + // get boost api + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + // announce markets and boost deals + cids, err := napi.BoostIndexerRemoveAll(ctx) + if err != nil { + return err + } + for _, c := range cids { + fmt.Println("Published the removal ad with CID", c.String()) + } + return nil + }, +} diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 859bfc871..5dfd09678 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -20,6 +20,7 @@ * [BoostIndexerAnnounceLatestHttp](#boostindexerannouncelatesthttp) * [BoostIndexerAnnounceLegacyDeal](#boostindexerannouncelegacydeal) * [BoostIndexerListMultihashes](#boostindexerlistmultihashes) + * [BoostIndexerRemoveAll](#boostindexerremoveall) * [BoostLegacyDealByProposalCid](#boostlegacydealbyproposalcid) * [BoostOfflineDealWithData](#boostofflinedealwithdata) * [I](#i) @@ -374,7 +375,7 @@ Response: ``` ### BoostIndexerAnnounceAllDeals -There are not yet any comments for this method. + Perms: admin @@ -513,6 +514,20 @@ Response: ] ``` +### BoostIndexerRemoveAll +There are not yet any comments for this method. + +Perms: admin + +Inputs: `null` + +Response: +```json +[ + null +] +``` + ### BoostLegacyDealByProposalCid diff --git a/gql/resolver_ipni.go b/gql/resolver_ipni.go index 1b23892b4..7391b288a 100644 --- a/gql/resolver_ipni.go +++ b/gql/resolver_ipni.go @@ -229,3 +229,7 @@ func (r *resolver) IpniDistanceFromLatestAd(ctx context.Context, args struct { return count, nil } + +func (r *resolver) IpniRemovedAllAdsStatus(ctx context.Context) (bool, error) { + return r.idxProvWrapper.RemoveAllStatus(ctx), nil +} diff --git a/gql/schema.graphql b/gql/schema.graphql index 5e02f01f3..b9301f7ce 100644 --- a/gql/schema.graphql +++ b/gql/schema.graphql @@ -652,6 +652,9 @@ type RootQuery { """Get the latest IPNI advertisemen""" ipniDistanceFromLatestAd(LatestAdcid: String!, Adcid: String!): Int! + + """Get the IPNI Remove All Status""" + ipniRemovedAllAdsStatus: Boolean! } type RootMutation { diff --git a/indexprovider/wrapper.go b/indexprovider/wrapper.go index 0976ad1a2..ea2d230dd 100644 --- a/indexprovider/wrapper.go +++ b/indexprovider/wrapper.go @@ -7,6 +7,7 @@ import ( "fmt" "net/url" "os" + "time" "github.com/filecoin-project/boost/lib/legacy" "github.com/filecoin-project/boost/storagemarket/types/legacytypes" @@ -18,11 +19,11 @@ import ( "github.com/filecoin-project/lotus/chain/actors/builtin/miner" chainTypes "github.com/filecoin-project/lotus/chain/types" "github.com/google/uuid" - cbor "github.com/ipfs/go-ipld-cbor" - "go.uber.org/fx" - "github.com/ipfs/go-datastore" + cbor "github.com/ipfs/go-ipld-cbor" "github.com/ipld/go-ipld-prime" + "github.com/ipni/go-libipni/ingest/schema" + "go.uber.org/fx" "github.com/filecoin-project/boost/db" bdtypes "github.com/filecoin-project/boost/extern/boostd-data/svc/types" @@ -71,6 +72,7 @@ type Wrapper struct { bitswapEnabled bool httpEnabled bool stop context.CancelFunc + removeAllAds bool } func NewWrapper(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB, @@ -127,7 +129,11 @@ func (w *Wrapper) Start(_ context.Context) { log.Info("starting index provider") - go w.checkForUpdates(runCtx) + if w.cfg.CurioMigration.Enable { + go w.tryAnnounceRemoveAll(runCtx) + } else { + go w.checkForUpdates(runCtx) + } } func (w *Wrapper) checkForUpdates(ctx context.Context) { @@ -867,3 +873,79 @@ func (w *Wrapper) AnnounceBoostDirectDealRemoved(ctx context.Context, dealUUID u } return annCid, err } + +func (w *Wrapper) AnnounceRemoveAll(ctx context.Context) ([]cid.Cid, error) { + var allAds []*schema.Advertisement + _, ad, err := w.prov.GetLatestAdv(ctx) + if err != nil { + return nil, err + } + allAds = append(allAds, ad) + + prev, err := cid.Parse(ad.PreviousID.String()) + if err != nil { + return nil, err + } + + for prev != cid.Undef { + ad, err := w.prov.GetAdv(ctx, prev) + if err != nil { + return nil, err + } + + prev, err = cid.Parse(ad.PreviousID.String()) + if err != nil { + return nil, err + } + } + + var entryAds []*schema.Advertisement + + for _, ad := range allAds { + if !ad.IsRm { + entryAds = append(entryAds, ad) + } + } + + var newAds []cid.Cid + + for _, ad := range entryAds { + a, err := w.prov.NotifyRemove(ctx, w.h.ID(), ad.ContextID) + if err != nil { + if !errors.Is(err, provider.ErrContextIDNotFound) { + return nil, fmt.Errorf("failed to publish the removal ad: %w", err) + } + } + newAds = append(newAds, a) + } + + return newAds, nil + +} + +func (w *Wrapper) tryAnnounceRemoveAll(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + + for { + select { + case <-ticker.C: + out, err := w.AnnounceRemoveAll(ctx) + if err != nil { + log.Errorw("error while announcing remove all", "err", err) + continue + } + if len(out) > 0 { + continue + } + log.Debugw("Cleaned up all the IPNI ads") + w.removeAllAds = true + return + case <-ctx.Done(): + return + } + } +} + +func (w *Wrapper) RemoveAllStatus(ctx context.Context) bool { + return w.removeAllAds +} diff --git a/node/config/def.go b/node/config/def.go index 7348db15c..ec39b0827 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -170,6 +170,9 @@ func DefaultBoost() *Boost { MaxDealsPerPublishMsg: 8, MaxPublishDealsFee: types.MustParseFIL("0.05"), }, + CurioMigration: CurioMigration{ + Enable: false, + }, } return cfg } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 2f0745bea..50ca25f3e 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -145,6 +145,12 @@ a file containing the booster-bitswap peer id's private key. Can be left blank w Name: "IndexProvider", Type: "IndexProviderConfig", + Comment: ``, + }, + { + Name: "CurioMigration", + Type: "CurioMigration", + Comment: ``, }, }, @@ -194,6 +200,14 @@ a file containing the booster-bitswap peer id's private key. Can be left blank w Comment: `From address for eth_ state call`, }, }, + "CurioMigration": []DocField{ + { + Name: "Enable", + Type: "bool", + + Comment: `Enable limits the Boost functionality to prepare for the migration`, + }, + }, "DealPublishConfig": []DocField{ { Name: "ManualDealPublish", diff --git a/node/config/types.go b/node/config/types.go index 06e17159a..5b4b5acc3 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -51,6 +51,7 @@ type Boost struct { HttpDownload HttpDownloadConfig Retrievals RetrievalConfig IndexProvider IndexProviderConfig + CurioMigration CurioMigration } type WalletsConfig struct { @@ -399,3 +400,8 @@ type DealPublishConfig struct { // The maximum fee to pay when sending the PublishStorageDeals message MaxPublishDealsFee types.FIL } + +type CurioMigration struct { + // Enable limits the Boost functionality to prepare for the migration + Enable bool +} diff --git a/node/impl/boost.go b/node/impl/boost.go index 645fb0e49..c27be73ff 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -231,3 +231,7 @@ func (sm *BoostAPI) PdCleanup(ctx context.Context) error { func (sm *BoostAPI) MarketGetAsk(ctx context.Context) (*legacytypes.SignedStorageAsk, error) { return sm.StorageProvider.GetAsk(), nil } + +func (sm *BoostAPI) BoostIndexerRemoveAll(ctx context.Context) ([]cid.Cid, error) { + return sm.IndexProvider.AnnounceRemoveAll(ctx) +} diff --git a/node/modules/directdeals.go b/node/modules/directdeals.go index 2094a555d..227f85f8a 100644 --- a/node/modules/directdeals.go +++ b/node/modules/directdeals.go @@ -34,6 +34,7 @@ func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc ddpCfg := storagemarket.DDPConfig{ StartEpochSealingBuffer: abi.ChainEpoch(cfg.Dealmaking.StartEpochSealingBuffer), RemoteCommp: cfg.Dealmaking.RemoteCommp, + CurioMigration: cfg.CurioMigration.Enable, } prov := storagemarket.NewDirectDealsProvider(ddpCfg, provAddr, fullnodeApi, secb, commpc, commpt, sps, directDealsDB, dl, piecedirectory, ip) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 80a1f606e..84cf5d922 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -486,6 +486,7 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func( DealLogDurationDays: cfg.Dealmaking.DealLogDurationDays, StorageFilter: cfg.Dealmaking.Filter, SealingPipelineCacheTimeout: time.Duration(cfg.Dealmaking.SealingPipelineCacheTimeout), + CurioMigration: cfg.CurioMigration.Enable, } dl := logs.NewDealLogger(logsDB) tspt := httptransport.New(h, dl, httptransport.NChunksOpt(cfg.HttpDownload.NChunks), httptransport.AllowPrivateIPsOpt(cfg.HttpDownload.AllowPrivateIPs)) diff --git a/react/src/Ipni.js b/react/src/Ipni.js index cd7bc782a..793fdcb26 100644 --- a/react/src/Ipni.js +++ b/react/src/Ipni.js @@ -6,6 +6,7 @@ import { IpniProviderInfoQuery, IpniLatestAdQuery, IpniDistanceFromLatestAdQuery, + IpniRemovedAllAdsStatus, } from "./gql"; import moment from "moment"; import React, {useEffect, useState} from "react"; @@ -89,6 +90,7 @@ function ProviderIpniInfoRender(props){ adcid: adCid } }) + const publishedRemoveAll = useQuery(IpniRemovedAllAdsStatus) return

Provider Indexer Info

@@ -111,13 +113,14 @@ function ProviderIpniInfoRender(props){   ({moment(data.LastAdvertisementTime).fromNow()} ago)   - {distance.data ? ({distance.data.ipniDistanceFromLatestAd} behind): ''} + {distance.data ? + ({distance.data.ipniDistanceFromLatestAd} behind) : ''} Latest Advertisement on Boost - {lad ? {lad}: ''} + {lad ? {lad} : ''} @@ -128,6 +131,12 @@ function ProviderIpniInfoRender(props){ ({moment(data.LastErrorTime).fromNow()} ago) + + Published Removal Ads for Curio Migration + + {publishedRemoveAll.data ? publishedRemoveAll.data: ''} + +
@@ -137,7 +146,7 @@ function ProviderConfig({configJson}) { const cfg = JSON.parse(configJson) return

Index Provider Config

- +
} diff --git a/react/src/gql.js b/react/src/gql.js index 2aa0a7a12..519136620 100644 --- a/react/src/gql.js +++ b/react/src/gql.js @@ -880,6 +880,12 @@ const IpniDistanceFromLatestAdQuery = gql` } `; +const IpniRemovedAllAdsStatus = gql` + query AppIpniRemovedAllAdsStatusQuery{ + ipniRemovedAllAdsStatus + } +`; + export { gqlClient, EpochQuery, @@ -931,4 +937,5 @@ export { StorageAskQuery, PublishPendingDealsMutation, PiecePayloadCidsQuery, + IpniRemovedAllAdsStatus, } diff --git a/storagemarket/direct_deals_provider.go b/storagemarket/direct_deals_provider.go index bff8f5077..efd13400a 100644 --- a/storagemarket/direct_deals_provider.go +++ b/storagemarket/direct_deals_provider.go @@ -44,6 +44,7 @@ type DDPConfig struct { // Minimum start epoch buffer to give time for sealing of sector with deal StartEpochSealingBuffer abi.ChainEpoch Curio bool + CurioMigration bool } type DirectDealsProvider struct { @@ -197,6 +198,9 @@ func (ddp *DirectDealsProvider) Accept(ctx context.Context, entry *types.DirectD } func (ddp *DirectDealsProvider) Import(ctx context.Context, params smtypes.DirectDealParams) (*api.ProviderDealRejectionInfo, error) { + if ddp.config.CurioMigration { + return &api.ProviderDealRejectionInfo{Reason: "Boost is migrating to Curio"}, nil + } piececid := params.PieceCid.String() clientAddr := params.ClientAddr.String() log.Infow("received direct data import", "piececid", piececid, "filepath", params.FilePath, "clientAddr", clientAddr, "allocationId", params.AllocationID) diff --git a/storagemarket/provider.go b/storagemarket/provider.go index 5e09fee1c..b6af08047 100644 --- a/storagemarket/provider.go +++ b/storagemarket/provider.go @@ -84,6 +84,7 @@ type Config struct { SealingPipelineCacheTimeout time.Duration StorageFilter string Curio bool + CurioMigration bool } var log = logging.Logger("boost-provider") @@ -250,6 +251,9 @@ func (p *Provider) GetAsk() *legacytypes.SignedStorageAsk { // an offline deal (the deal must already have been proposed by the client) func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (pi *api.ProviderDealRejectionInfo, err error) { p.dealLogger.Infow(dealUuid, "import data for offline deal", "filepath", filePath, "delete after import", delAfterImport) + if p.config.CurioMigration { + return &api.ProviderDealRejectionInfo{Reason: "Boost is migrating to Curio"}, nil + } // db should already have a deal with this uuid as the deal proposal should have been made beforehand ds, err := p.dealsDB.ByID(p.ctx, dealUuid) @@ -293,6 +297,9 @@ func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID // ExecuteDeal is called when the Storage Provider receives a deal proposal // from the network func (p *Provider) ExecuteDeal(ctx context.Context, dp *types.DealParams, clientPeer peer.ID) (*api.ProviderDealRejectionInfo, error) { + if p.config.CurioMigration { + return &api.ProviderDealRejectionInfo{Reason: "Boost is migrating to Curio"}, nil + } ctx, span := tracing.Tracer.Start(ctx, "Provider.ExecuteLibp2pDeal") defer span.End() From d00b91b32289ce7d053cf68cd0e538601b2b3d88 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Fri, 4 Oct 2024 17:59:03 +0400 Subject: [PATCH 03/10] migrate config to v7 --- node/config/migrate.go | 3 ++- node/config/v6_to_v7.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 node/config/v6_to_v7.go diff --git a/node/config/migrate.go b/node/config/migrate.go index 849cef3b5..3de616475 100644 --- a/node/config/migrate.go +++ b/node/config/migrate.go @@ -15,7 +15,7 @@ var log = logging.Logger("cfg") // CurrentVersion is the config version expected by Boost. // We need to migrate the config file to this version. -const CurrentVersion = 6 +const CurrentVersion = 7 type migrateUpFn = func(cfgPath string) (string, error) @@ -26,6 +26,7 @@ var migrations = []migrateUpFn{ v3Tov4, // index 3 => version 4 v4Tov5, // index 4 => version 5 v5Tov6, // index 5 => version 6 + v6Tov7, // index 6 => version 7 } // This struct is used to get the config file version diff --git a/node/config/v6_to_v7.go b/node/config/v6_to_v7.go new file mode 100644 index 000000000..a3861e627 --- /dev/null +++ b/node/config/v6_to_v7.go @@ -0,0 +1,28 @@ +package config + +import ( + "fmt" +) + +// Migrate from config version 6 to version 7 +func v6Tov7(cfgPath string) (string, error) { + cfg, err := FromFile(cfgPath, DefaultBoost()) + if err != nil { + return "", fmt.Errorf("parsing config file %s: %w", cfgPath, err) + } + + boostCfg, ok := cfg.(*Boost) + if !ok { + return "", fmt.Errorf("unexpected config type %T: expected *config.Boost", cfg) + } + + // Update the Boost config version + boostCfg.ConfigVersion = 7 + + bz, err := ConfigUpdate(boostCfg, DefaultBoost(), true, false) + if err != nil { + return "", fmt.Errorf("applying configuration: %w", err) + } + + return string(bz), nil +} From a5fffa115a6d11f831b65a3843eb184c0ec4f579 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Fri, 4 Oct 2024 21:10:18 +0400 Subject: [PATCH 04/10] migrate libp2p key --- cmd/migrate-curio/migrate.go | 42 ++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go index d9fcbcdc5..2dc2f4b48 100644 --- a/cmd/migrate-curio/migrate.go +++ b/cmd/migrate-curio/migrate.go @@ -33,6 +33,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" cbor "github.com/ipfs/go-ipld-cbor" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" "golang.org/x/net/context" @@ -74,6 +75,21 @@ func migrate(cctx *cli.Context, repoDir string) error { return err } + keyStore, err := lr.KeyStore() + if err != nil { + return fmt.Errorf("failed to open Boost keystore") + } + + key, err := keyStore.Get("libp2p-host") + if err != nil { + return fmt.Errorf("failed to get key from keystore: %w", err) + } + + pkey, err := crypto.UnmarshalPrivateKey(key.PrivateKey) + if err != nil { + return fmt.Errorf("failed to unmarshal private key: %w", err) + } + mds, err := lr.Datastore(ctx, "/metadata") if err != nil { return err @@ -160,6 +176,12 @@ func migrate(cctx *cli.Context, repoDir string) error { if err := migrateDDODeals(ctx, full, activeSectors, maddr, hdb, sqldb, mdb); err != nil { return xerrors.Errorf("failed to migrate DDO deals: %w", err) } + + // Migrate libp2p key + if err := migrateKeys(ctx, maddr, pkey, hdb); err != nil { + return xerrors.Errorf("failed to migrate libp2p key: %w", err) + } + return nil } @@ -555,3 +577,23 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit return nil } + +func migrateKeys(ctx context.Context, maddr address.Address, priv crypto.PrivKey, hdb *harmonydb.DB) error { + + pkey, err := priv.Raw() + if err != nil { + return err + } + + mid, err := address.IDFromAddress(maddr) + if err != nil { + return err + } + + _, err = hdb.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, pkey) + if err != nil { + return fmt.Errorf("inserting private key into libp2p table: %w", err) + } + + return nil +} From 5fe99f88292e3d8a9cd7589553d6a5ddcf794982 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Sat, 5 Oct 2024 15:38:19 +0400 Subject: [PATCH 05/10] generate libp2p key --- cmd/migrate-curio/migrate.go | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go index 2dc2f4b48..9e547ab50 100644 --- a/cmd/migrate-curio/migrate.go +++ b/cmd/migrate-curio/migrate.go @@ -1,6 +1,7 @@ package main import ( + "crypto/rand" "database/sql" "encoding/json" "errors" @@ -75,21 +76,6 @@ func migrate(cctx *cli.Context, repoDir string) error { return err } - keyStore, err := lr.KeyStore() - if err != nil { - return fmt.Errorf("failed to open Boost keystore") - } - - key, err := keyStore.Get("libp2p-host") - if err != nil { - return fmt.Errorf("failed to get key from keystore: %w", err) - } - - pkey, err := crypto.UnmarshalPrivateKey(key.PrivateKey) - if err != nil { - return fmt.Errorf("failed to unmarshal private key: %w", err) - } - mds, err := lr.Datastore(ctx, "/metadata") if err != nil { return err @@ -178,7 +164,7 @@ func migrate(cctx *cli.Context, repoDir string) error { } // Migrate libp2p key - if err := migrateKeys(ctx, maddr, pkey, hdb); err != nil { + if err := generateNewKeys(ctx, maddr, hdb); err != nil { return xerrors.Errorf("failed to migrate libp2p key: %w", err) } @@ -578,16 +564,21 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit return nil } -func migrateKeys(ctx context.Context, maddr address.Address, priv crypto.PrivKey, hdb *harmonydb.DB) error { +func generateNewKeys(ctx context.Context, maddr address.Address, hdb *harmonydb.DB) error { - pkey, err := priv.Raw() + mid, err := address.IDFromAddress(maddr) if err != nil { return err } - mid, err := address.IDFromAddress(maddr) + pk, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { - return err + return fmt.Errorf("generating private key: %w", err) + } + + pkey, err := pk.Raw() + if err != nil { + return fmt.Errorf("converting private key: %w", err) } _, err = hdb.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, pkey) From a84e4ff36810695ae64c2c0b8311c940f7a1511f Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Sat, 5 Oct 2024 15:51:51 +0400 Subject: [PATCH 06/10] undo config --- gql/resolver_ipni.go | 4 --- gql/schema.graphql | 3 --- indexprovider/wrapper.go | 35 +------------------------- node/config/def.go | 3 --- node/config/doc_gen.go | 14 ----------- node/config/migrate.go | 3 +-- node/config/types.go | 6 ----- node/config/v6_to_v7.go | 28 --------------------- node/modules/directdeals.go | 1 - node/modules/storageminer.go | 1 - react/src/Ipni.js | 8 ------ react/src/gql.js | 7 ------ storagemarket/direct_deals_provider.go | 4 --- storagemarket/provider.go | 7 ------ 14 files changed, 2 insertions(+), 122 deletions(-) delete mode 100644 node/config/v6_to_v7.go diff --git a/gql/resolver_ipni.go b/gql/resolver_ipni.go index 7391b288a..1b23892b4 100644 --- a/gql/resolver_ipni.go +++ b/gql/resolver_ipni.go @@ -229,7 +229,3 @@ func (r *resolver) IpniDistanceFromLatestAd(ctx context.Context, args struct { return count, nil } - -func (r *resolver) IpniRemovedAllAdsStatus(ctx context.Context) (bool, error) { - return r.idxProvWrapper.RemoveAllStatus(ctx), nil -} diff --git a/gql/schema.graphql b/gql/schema.graphql index b9301f7ce..5e02f01f3 100644 --- a/gql/schema.graphql +++ b/gql/schema.graphql @@ -652,9 +652,6 @@ type RootQuery { """Get the latest IPNI advertisemen""" ipniDistanceFromLatestAd(LatestAdcid: String!, Adcid: String!): Int! - - """Get the IPNI Remove All Status""" - ipniRemovedAllAdsStatus: Boolean! } type RootMutation { diff --git a/indexprovider/wrapper.go b/indexprovider/wrapper.go index ea2d230dd..0b564bee4 100644 --- a/indexprovider/wrapper.go +++ b/indexprovider/wrapper.go @@ -7,7 +7,6 @@ import ( "fmt" "net/url" "os" - "time" "github.com/filecoin-project/boost/lib/legacy" "github.com/filecoin-project/boost/storagemarket/types/legacytypes" @@ -72,7 +71,6 @@ type Wrapper struct { bitswapEnabled bool httpEnabled bool stop context.CancelFunc - removeAllAds bool } func NewWrapper(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, directDealsDB *db.DirectDealsDB, dealsDB *db.DealsDB, @@ -129,11 +127,7 @@ func (w *Wrapper) Start(_ context.Context) { log.Info("starting index provider") - if w.cfg.CurioMigration.Enable { - go w.tryAnnounceRemoveAll(runCtx) - } else { - go w.checkForUpdates(runCtx) - } + go w.checkForUpdates(runCtx) } func (w *Wrapper) checkForUpdates(ctx context.Context) { @@ -922,30 +916,3 @@ func (w *Wrapper) AnnounceRemoveAll(ctx context.Context) ([]cid.Cid, error) { return newAds, nil } - -func (w *Wrapper) tryAnnounceRemoveAll(ctx context.Context) { - ticker := time.NewTicker(time.Minute) - - for { - select { - case <-ticker.C: - out, err := w.AnnounceRemoveAll(ctx) - if err != nil { - log.Errorw("error while announcing remove all", "err", err) - continue - } - if len(out) > 0 { - continue - } - log.Debugw("Cleaned up all the IPNI ads") - w.removeAllAds = true - return - case <-ctx.Done(): - return - } - } -} - -func (w *Wrapper) RemoveAllStatus(ctx context.Context) bool { - return w.removeAllAds -} diff --git a/node/config/def.go b/node/config/def.go index ec39b0827..7348db15c 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -170,9 +170,6 @@ func DefaultBoost() *Boost { MaxDealsPerPublishMsg: 8, MaxPublishDealsFee: types.MustParseFIL("0.05"), }, - CurioMigration: CurioMigration{ - Enable: false, - }, } return cfg } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 50ca25f3e..2f0745bea 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -145,12 +145,6 @@ a file containing the booster-bitswap peer id's private key. Can be left blank w Name: "IndexProvider", Type: "IndexProviderConfig", - Comment: ``, - }, - { - Name: "CurioMigration", - Type: "CurioMigration", - Comment: ``, }, }, @@ -200,14 +194,6 @@ a file containing the booster-bitswap peer id's private key. Can be left blank w Comment: `From address for eth_ state call`, }, }, - "CurioMigration": []DocField{ - { - Name: "Enable", - Type: "bool", - - Comment: `Enable limits the Boost functionality to prepare for the migration`, - }, - }, "DealPublishConfig": []DocField{ { Name: "ManualDealPublish", diff --git a/node/config/migrate.go b/node/config/migrate.go index 3de616475..849cef3b5 100644 --- a/node/config/migrate.go +++ b/node/config/migrate.go @@ -15,7 +15,7 @@ var log = logging.Logger("cfg") // CurrentVersion is the config version expected by Boost. // We need to migrate the config file to this version. -const CurrentVersion = 7 +const CurrentVersion = 6 type migrateUpFn = func(cfgPath string) (string, error) @@ -26,7 +26,6 @@ var migrations = []migrateUpFn{ v3Tov4, // index 3 => version 4 v4Tov5, // index 4 => version 5 v5Tov6, // index 5 => version 6 - v6Tov7, // index 6 => version 7 } // This struct is used to get the config file version diff --git a/node/config/types.go b/node/config/types.go index 5b4b5acc3..06e17159a 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -51,7 +51,6 @@ type Boost struct { HttpDownload HttpDownloadConfig Retrievals RetrievalConfig IndexProvider IndexProviderConfig - CurioMigration CurioMigration } type WalletsConfig struct { @@ -400,8 +399,3 @@ type DealPublishConfig struct { // The maximum fee to pay when sending the PublishStorageDeals message MaxPublishDealsFee types.FIL } - -type CurioMigration struct { - // Enable limits the Boost functionality to prepare for the migration - Enable bool -} diff --git a/node/config/v6_to_v7.go b/node/config/v6_to_v7.go deleted file mode 100644 index a3861e627..000000000 --- a/node/config/v6_to_v7.go +++ /dev/null @@ -1,28 +0,0 @@ -package config - -import ( - "fmt" -) - -// Migrate from config version 6 to version 7 -func v6Tov7(cfgPath string) (string, error) { - cfg, err := FromFile(cfgPath, DefaultBoost()) - if err != nil { - return "", fmt.Errorf("parsing config file %s: %w", cfgPath, err) - } - - boostCfg, ok := cfg.(*Boost) - if !ok { - return "", fmt.Errorf("unexpected config type %T: expected *config.Boost", cfg) - } - - // Update the Boost config version - boostCfg.ConfigVersion = 7 - - bz, err := ConfigUpdate(boostCfg, DefaultBoost(), true, false) - if err != nil { - return "", fmt.Errorf("applying configuration: %w", err) - } - - return string(bz), nil -} diff --git a/node/modules/directdeals.go b/node/modules/directdeals.go index 227f85f8a..2094a555d 100644 --- a/node/modules/directdeals.go +++ b/node/modules/directdeals.go @@ -34,7 +34,6 @@ func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc ddpCfg := storagemarket.DDPConfig{ StartEpochSealingBuffer: abi.ChainEpoch(cfg.Dealmaking.StartEpochSealingBuffer), RemoteCommp: cfg.Dealmaking.RemoteCommp, - CurioMigration: cfg.CurioMigration.Enable, } prov := storagemarket.NewDirectDealsProvider(ddpCfg, provAddr, fullnodeApi, secb, commpc, commpt, sps, directDealsDB, dl, piecedirectory, ip) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 84cf5d922..80a1f606e 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -486,7 +486,6 @@ func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func( DealLogDurationDays: cfg.Dealmaking.DealLogDurationDays, StorageFilter: cfg.Dealmaking.Filter, SealingPipelineCacheTimeout: time.Duration(cfg.Dealmaking.SealingPipelineCacheTimeout), - CurioMigration: cfg.CurioMigration.Enable, } dl := logs.NewDealLogger(logsDB) tspt := httptransport.New(h, dl, httptransport.NChunksOpt(cfg.HttpDownload.NChunks), httptransport.AllowPrivateIPsOpt(cfg.HttpDownload.AllowPrivateIPs)) diff --git a/react/src/Ipni.js b/react/src/Ipni.js index 793fdcb26..d2b5b610b 100644 --- a/react/src/Ipni.js +++ b/react/src/Ipni.js @@ -6,7 +6,6 @@ import { IpniProviderInfoQuery, IpniLatestAdQuery, IpniDistanceFromLatestAdQuery, - IpniRemovedAllAdsStatus, } from "./gql"; import moment from "moment"; import React, {useEffect, useState} from "react"; @@ -90,7 +89,6 @@ function ProviderIpniInfoRender(props){ adcid: adCid } }) - const publishedRemoveAll = useQuery(IpniRemovedAllAdsStatus) return

Provider Indexer Info

@@ -131,12 +129,6 @@ function ProviderIpniInfoRender(props){ ({moment(data.LastErrorTime).fromNow()} ago) - - Published Removal Ads for Curio Migration - - {publishedRemoveAll.data ? publishedRemoveAll.data: ''} - -
diff --git a/react/src/gql.js b/react/src/gql.js index 519136620..2aa0a7a12 100644 --- a/react/src/gql.js +++ b/react/src/gql.js @@ -880,12 +880,6 @@ const IpniDistanceFromLatestAdQuery = gql` } `; -const IpniRemovedAllAdsStatus = gql` - query AppIpniRemovedAllAdsStatusQuery{ - ipniRemovedAllAdsStatus - } -`; - export { gqlClient, EpochQuery, @@ -937,5 +931,4 @@ export { StorageAskQuery, PublishPendingDealsMutation, PiecePayloadCidsQuery, - IpniRemovedAllAdsStatus, } diff --git a/storagemarket/direct_deals_provider.go b/storagemarket/direct_deals_provider.go index efd13400a..bff8f5077 100644 --- a/storagemarket/direct_deals_provider.go +++ b/storagemarket/direct_deals_provider.go @@ -44,7 +44,6 @@ type DDPConfig struct { // Minimum start epoch buffer to give time for sealing of sector with deal StartEpochSealingBuffer abi.ChainEpoch Curio bool - CurioMigration bool } type DirectDealsProvider struct { @@ -198,9 +197,6 @@ func (ddp *DirectDealsProvider) Accept(ctx context.Context, entry *types.DirectD } func (ddp *DirectDealsProvider) Import(ctx context.Context, params smtypes.DirectDealParams) (*api.ProviderDealRejectionInfo, error) { - if ddp.config.CurioMigration { - return &api.ProviderDealRejectionInfo{Reason: "Boost is migrating to Curio"}, nil - } piececid := params.PieceCid.String() clientAddr := params.ClientAddr.String() log.Infow("received direct data import", "piececid", piececid, "filepath", params.FilePath, "clientAddr", clientAddr, "allocationId", params.AllocationID) diff --git a/storagemarket/provider.go b/storagemarket/provider.go index b6af08047..5e09fee1c 100644 --- a/storagemarket/provider.go +++ b/storagemarket/provider.go @@ -84,7 +84,6 @@ type Config struct { SealingPipelineCacheTimeout time.Duration StorageFilter string Curio bool - CurioMigration bool } var log = logging.Logger("boost-provider") @@ -251,9 +250,6 @@ func (p *Provider) GetAsk() *legacytypes.SignedStorageAsk { // an offline deal (the deal must already have been proposed by the client) func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (pi *api.ProviderDealRejectionInfo, err error) { p.dealLogger.Infow(dealUuid, "import data for offline deal", "filepath", filePath, "delete after import", delAfterImport) - if p.config.CurioMigration { - return &api.ProviderDealRejectionInfo{Reason: "Boost is migrating to Curio"}, nil - } // db should already have a deal with this uuid as the deal proposal should have been made beforehand ds, err := p.dealsDB.ByID(p.ctx, dealUuid) @@ -297,9 +293,6 @@ func (p *Provider) ImportOfflineDealData(ctx context.Context, dealUuid uuid.UUID // ExecuteDeal is called when the Storage Provider receives a deal proposal // from the network func (p *Provider) ExecuteDeal(ctx context.Context, dp *types.DealParams, clientPeer peer.ID) (*api.ProviderDealRejectionInfo, error) { - if p.config.CurioMigration { - return &api.ProviderDealRejectionInfo{Reason: "Boost is migrating to Curio"}, nil - } ctx, span := tracing.Tracer.Start(ctx, "Provider.ExecuteLibp2pDeal") defer span.End() From ce3cf50ed0276f798178cad2901a163f6764770c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 13 Oct 2024 11:39:01 +0200 Subject: [PATCH 07/10] migrate-curio: Fix migrator schema --- cmd/migrate-curio/migrate.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go index 9e547ab50..9ac5d5a16 100644 --- a/cmd/migrate-curio/migrate.go +++ b/cmd/migrate-curio/migrate.go @@ -118,9 +118,9 @@ func migrate(cctx *cli.Context, repoDir string) error { _, err = mdb.Exec(`CREATE TABLE IF NOT EXISTS Deals ( ID TEXT UNIQUE, - DB BOOL, - LID BOOL, - Pipeline BOOL + DB BOOL NOT NULL DEFAULT FALSE, + LID BOOL NOT NULL DEFAULT FALSE, + Pipeline BOOL NOT NULL DEFAULT FALSE );`) if err != nil { return fmt.Errorf("failed to create migration table: %w", err) From 997c6a8ddbc8989575cd4dc21e780c425c7d37cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 13 Oct 2024 12:12:49 +0200 Subject: [PATCH 08/10] migrate-curio: Correctly marshal the libp2p key --- cmd/migrate-curio/migrate.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go index 9ac5d5a16..0ac5d61be 100644 --- a/cmd/migrate-curio/migrate.go +++ b/cmd/migrate-curio/migrate.go @@ -576,12 +576,12 @@ func generateNewKeys(ctx context.Context, maddr address.Address, hdb *harmonydb. return fmt.Errorf("generating private key: %w", err) } - pkey, err := pk.Raw() + kbytes, err := crypto.MarshalPrivateKey(pk) if err != nil { - return fmt.Errorf("converting private key: %w", err) + return fmt.Errorf("marshaling private key: %w", err) } - _, err = hdb.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, pkey) + _, err = hdb.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, kbytes) if err != nil { return fmt.Errorf("inserting private key into libp2p table: %w", err) } From d0ea1d739075fa98e7b93db8b4daf85c7280dbca Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 16 Oct 2024 17:25:41 +0400 Subject: [PATCH 09/10] remove lip2p key, use transactions --- cmd/migrate-curio/migrate.go | 227 +++++++++++++++++------------------ 1 file changed, 109 insertions(+), 118 deletions(-) diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go index 0ac5d61be..36b5864d7 100644 --- a/cmd/migrate-curio/migrate.go +++ b/cmd/migrate-curio/migrate.go @@ -1,7 +1,6 @@ package main import ( - "crypto/rand" "database/sql" "encoding/json" "errors" @@ -34,7 +33,6 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" cbor "github.com/ipfs/go-ipld-cbor" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" "golang.org/x/net/context" @@ -163,11 +161,6 @@ func migrate(cctx *cli.Context, repoDir string) error { return xerrors.Errorf("failed to migrate DDO deals: %w", err) } - // Migrate libp2p key - if err := generateNewKeys(ctx, maddr, hdb); err != nil { - return xerrors.Errorf("failed to migrate libp2p key: %w", err) - } - return nil } @@ -191,7 +184,11 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad deals := append(aDeals, cDeals...) - for _, deal := range deals { + for i, deal := range deals { + if i > 0 && i%100 == 0 { + fmt.Printf("Migrating Boost Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100) + } + llog := log.With("Boost Deal", deal.DealUuid.String()) // Skip deals which are before add piece if deal.Checkpoint < dealcheckpoints.AddedPiece { @@ -269,62 +266,69 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad return fmt.Errorf("deal: %s: failed to marshal headers: %s", deal.DealUuid.String(), err) } - // Add deal to HarmonyDB - if !a { - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid, + _, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + // Add deal to HarmonyDB + if !a { + _, err = tx.Exec(`INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid, proposal_signature, proposal, piece_cid, piece_size, offline, verified, start_epoch, end_epoch, client_peer_id, fast_retrieval, announce_to_ipni, url, url_headers, chain_deal_id, publish_cid, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19) ON CONFLICT (uuid) DO NOTHING`, - deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, prop.PieceCID.String(), - prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(), - deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt) - - if err != nil { - return fmt.Errorf("deal: %s: failed to add the deal to harmonyDB: %w", deal.DealUuid.String(), err) - } - - // Mark deal added to harmonyDB - _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err) - } - } - - if !b { - // Add LID details to pieceDeal in HarmonyDB - _, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - deal.DealUuid.String(), prop.PieceCID.String(), true, mid, deal.SectorID, deal.Offset, prop.PieceSize, deal.NBytesReceived, false) - if err != nil { - return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal: %w", deal.DealUuid.String(), err) + deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, prop.PieceCID.String(), + prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(), + deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt) + + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add the deal to harmonyDB: %w", deal.DealUuid.String(), err) + } + + // Mark deal added to harmonyDB + _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.DealUuid.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark deal migrated: %w", deal.DealUuid.String(), err) + } } - // Mark deal added to pieceDeal in HarmonyDB - _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err) + if !b { + // Add LID details to pieceDeal in HarmonyDB + _, err = tx.Exec(`SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + deal.DealUuid.String(), prop.PieceCID.String(), true, mid, deal.SectorID, deal.Offset, prop.PieceSize, deal.NBytesReceived, false) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to update piece metadata and piece deal: %w", deal.DealUuid.String(), err) + } + + // Mark deal added to pieceDeal in HarmonyDB + _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.DealUuid.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.DealUuid.String(), err) + } } - } - if !c { - var proof abi.RegisteredSealProof - err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) - if err != nil { - return fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err) - } + if !c { + var proof abi.RegisteredSealProof + err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return false, fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err) + } - // Add deal to mk12 pipeline in Curio for indexing and announcement - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, sealed, should_index, indexing_created_at, announce) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, - deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline, - true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) - if err != nil { - return fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err) + deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline, + true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err) + } } + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return err } + } return nil @@ -366,7 +370,10 @@ func migrateLegacyDeals(ctx context.Context, full v1api.FullNode, activeSectors return err } - for _, deal := range legacyDeals { + for i, deal := range legacyDeals { + if i > 0 && i%100 == 0 { + fmt.Printf("Migrating Legacy Deals: %d / %d (%0.2f%%)\n", i, len(legacyDeals), float64(i)/float64(len(legacyDeals))*100) + } llog := log.With("Boost Deal", deal.ProposalCid.String()) // Skip deals which do not have chain deal ID if deal.DealID == 0 { @@ -422,7 +429,7 @@ func migrateLegacyDeals(ctx context.Context, full v1api.FullNode, activeSectors prop := deal.ClientDealProposal.Proposal - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deals (signed_proposal_cid, sp_id, client_peer_id, + _, err = hdb.Exec(ctx, `INSERT INTO signed_proposal_cid (signed_proposal_cid, sp_id, client_peer_id, proposal_signature, proposal, piece_cid, piece_size, verified, start_epoch, end_epoch, publish_cid, chain_deal_id, fast_retrieval, created_at, sector_num) @@ -459,7 +466,10 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit return fmt.Errorf("failed to get all DDO deals: %w", err) } - for _, deal := range deals { + for i, deal := range deals { + if i > 0 && i%100 == 0 { + fmt.Printf("Migrating DDO Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100) + } llog := log.With("Boost Deal", deal.ID.String()) if deal.Err != "" && deal.Retry == types.DealRetryFatal { llog.Infow("Skipping as deal retry is fatal") @@ -505,86 +515,67 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit continue } - if !a { - // Add DDO deal to harmonyDB - _, err = hdb.Exec(ctx, `INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified, + _, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) { + if !a { + // Add DDO deal to harmonyDB + _, err = tx.Exec(`INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified, start_epoch, end_epoch, allocation_id, piece_cid, piece_size, fast_retrieval, announce_to_ipni) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (uuid) DO NOTHING`, - deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID, - deal.PieceCID.String(), deal.PieceSize, true, true) - - if err != nil { - return fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err) + deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID, + deal.PieceCID.String(), deal.PieceSize, true, true) + + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err) + } + + // Mark deal added to harmonyDB + _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err) + } } - // Mark deal added to harmonyDB - _, err = mdb.Exec(`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING`, deal.ID.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark DDO deal migrated: %w", deal.ID.String(), err) - } - } - - if !b { - // Add LID details to pieceDeal in HarmonyDB - _, err = hdb.Exec(ctx, `SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, - deal.ID.String(), deal.PieceCID.String(), false, mid, deal.SectorID, deal.Offset, deal.PieceSize, deal.InboundFileSize, false) - if err != nil { - return fmt.Errorf("deal: %s: failed to update piece metadata and piece deal for DDO deal %s: %w", deal.ID.String(), deal.ID.String(), err) + if !b { + // Add LID details to pieceDeal in HarmonyDB + _, err = tx.Exec(`SELECT process_piece_deal($1, $2, $3, $4, $5, $6, $7, $8, $9)`, + deal.ID.String(), deal.PieceCID.String(), false, mid, deal.SectorID, deal.Offset, deal.PieceSize, deal.InboundFileSize, false) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to update piece metadata and piece deal for DDO deal %s: %w", deal.ID.String(), deal.ID.String(), err) + } + + // Mark deal added to pieceDeal in HarmonyDB + _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String()) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err) + } } - // Mark deal added to pieceDeal in HarmonyDB - _, err = mdb.Exec(`UPDATE Deals SET LID = TRUE WHERE ID = ?`, deal.ID.String()) - if err != nil { - return fmt.Errorf("deal: %s: failed to mark deal LID migrated: %w", deal.ID.String(), err) - } - } + // TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals + if !c { + var proof abi.RegisteredSealProof + err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err) + } - // TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals - if !c { - var proof abi.RegisteredSealProof - err = hdb.QueryRow(ctx, `SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) - if err != nil { - return fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err) - } - - // Add deal to mk12 pipeline in Curio for indexing and announcement - _, err = hdb.Exec(ctx, `INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, sealed, should_index, indexing_created_at, announce) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, - deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true, - true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) - if err != nil { - return fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err) + deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true, + true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err) + } } + return true, nil + }, harmonydb.OptionRetry()) + if err != nil { + return err } } return nil } - -func generateNewKeys(ctx context.Context, maddr address.Address, hdb *harmonydb.DB) error { - - mid, err := address.IDFromAddress(maddr) - if err != nil { - return err - } - - pk, _, err := crypto.GenerateEd25519Key(rand.Reader) - if err != nil { - return fmt.Errorf("generating private key: %w", err) - } - - kbytes, err := crypto.MarshalPrivateKey(pk) - if err != nil { - return fmt.Errorf("marshaling private key: %w", err) - } - - _, err = hdb.Exec(ctx, `INSERT INTO libp2p (sp_id, priv_key) VALUES ($1, $2) ON CONFLICT(sp_id) DO NOTHING`, mid, kbytes) - if err != nil { - return fmt.Errorf("inserting private key into libp2p table: %w", err) - } - - return nil -} From ad00bf4392ee02dc9bc54a0961a1304b4c7dbb3b Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Thu, 17 Oct 2024 19:04:56 +0400 Subject: [PATCH 10/10] use new migration indexing table --- cmd/migrate-curio/migrate.go | 74 ++++++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 25 deletions(-) diff --git a/cmd/migrate-curio/migrate.go b/cmd/migrate-curio/migrate.go index 36b5864d7..426aa8529 100644 --- a/cmd/migrate-curio/migrate.go +++ b/cmd/migrate-curio/migrate.go @@ -306,21 +306,33 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad } if !c { - var proof abi.RegisteredSealProof - err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + // Check if we can index and announce i.e. we have unsealed copy + var exists bool + err = tx.QueryRow(`SELECT EXISTS (SELECT 1 FROM sector_location WHERE miner_id = $1 + AND sector_num = $2 + AND sector_filetype = 1);`, mid, deal.SectorID).Scan(&exists) if err != nil { - return false, fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err) + return false, fmt.Errorf("seal: %s: failed to check if sector is unsealed: %w", deal.DealUuid.String(), err) } - // Add deal to mk12 pipeline in Curio for indexing and announcement - _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, - after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, - sealed, should_index, indexing_created_at, announce) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, - deal.DealUuid.String(), mid, true, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, deal.IsOffline, - true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) - if err != nil { - return false, fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err) + if exists { + var proof abi.RegisteredSealProof + err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return false, fmt.Errorf("seal: %s: failed to get sector proof: %w", deal.DealUuid.String(), err) + } + + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline_migration ( + uuid, sp_id, piece_cid, piece_size, raw_size, sector, reg_seal_proof, sector_offset, should_announce + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (uuid) DO NOTHING`, + deal.DealUuid.String(), mid, prop.PieceCID.String(), prop.PieceSize, deal.NBytesReceived, + deal.SectorID, proof, deal.Offset, deal.AnnounceToIPNI) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add deal to pipeline for indexing and announcing: %w", deal.DealUuid.String(), err) + } + } else { + llog.Infof("Skipping indexing as sector %d is not unsealed", deal.SectorID) } } return true, nil @@ -470,7 +482,7 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit if i > 0 && i%100 == 0 { fmt.Printf("Migrating DDO Deals: %d / %d (%0.2f%%)\n", i, len(deals), float64(i)/float64(len(deals))*100) } - llog := log.With("Boost Deal", deal.ID.String()) + llog := log.With("DDO Deal", deal.ID.String()) if deal.Err != "" && deal.Retry == types.DealRetryFatal { llog.Infow("Skipping as deal retry is fatal") continue @@ -553,21 +565,33 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit // TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals if !c { - var proof abi.RegisteredSealProof - err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + // Check if we can index and announce i.e. we have unsealed copy + var exists bool + err = tx.QueryRow(`SELECT EXISTS (SELECT 1 FROM sector_location WHERE miner_id = $1 + AND sector_num = $2 + AND sector_filetype = 1);`, mid, deal.SectorID).Scan(&exists) if err != nil { - return false, fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err) + return false, fmt.Errorf("seal: %s: failed to check if sector is unsealed: %w", deal.ID.String(), err) } - // Add deal to mk12 pipeline in Curio for indexing and announcement - _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline (uuid, sp_id, started, piece_cid, piece_size, raw_size, offline, - after_commp, after_psd, after_find_deal, sector, reg_seal_proof, sector_offset, - sealed, should_index, indexing_created_at, announce) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT (uuid) DO NOTHING`, - deal.ID.String(), mid, true, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true, - true, true, true, deal.SectorID, proof, deal.Offset, true, true, time.Now(), true) - if err != nil { - return false, fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err) + if exists { + var proof abi.RegisteredSealProof + err = tx.QueryRow(`SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = $1 AND sector_num = $2`, mid, deal.SectorID).Scan(&proof) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to get sector proof: %w", deal.ID.String(), err) + } + + // Add deal to mk12 pipeline in Curio for indexing and announcement + _, err = tx.Exec(`INSERT INTO market_mk12_deal_pipeline_migration ( + uuid, sp_id, piece_cid, piece_size, raw_size, sector, reg_seal_proof, sector_offset, should_announce + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (uuid) DO NOTHING`, + deal.ID.String(), mid, deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, + deal.SectorID, proof, deal.Offset, true) + if err != nil { + return false, fmt.Errorf("deal: %s: failed to add DDO deal to pipeline for indexing and announcing: %w", deal.ID.String(), err) + } + } else { + llog.Infof("Skipping indexing as sector %d is not unsealed", deal.SectorID) } } return true, nil