From dfadd606f16b1c61e130a48056992ff5610a8896 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Mon, 8 Apr 2024 15:09:55 +0600 Subject: [PATCH 1/4] add monngoshake filtering Signed-off-by: sayedppqq --- Makefile | 6 -- cmd/mongo/oplog_fetch.go | 2 +- cmd/mongo/oplog_push.go | 8 ++ cmd/mongo/oplog_replay.go | 20 ++-- internal/config.go | 1 + internal/databases/mongo/archive/layout.go | 15 ++- internal/databases/mongo/archive/loader.go | 51 ++++++++-- internal/databases/mongo/client/client.go | 3 + internal/databases/mongo/models/archive.go | 14 ++- .../databases/mongo/models/archive_test.go | 2 +- internal/databases/mongo/oplog/applier.go | 28 ++++-- internal/databases/mongo/shake/filter.go | 99 +++++++++++++++++++ internal/databases/mongo/shake/oplog.go | 53 ++++++++++ internal/databases/mongo/stages/applier.go | 1 + internal/databases/mongo/stages/fetcher.go | 11 ++- main/mongo/Dockerfile | 13 +++ 16 files changed, 283 insertions(+), 44 deletions(-) create mode 100644 internal/databases/mongo/shake/filter.go create mode 100644 internal/databases/mongo/shake/oplog.go create mode 100644 main/mongo/Dockerfile diff --git a/Makefile b/Makefile index a3cf3f922..eb2a4ffb6 100644 --- a/Makefile +++ b/Makefile @@ -285,9 +285,3 @@ unlink_libsodium: build_client: cd cmd/daemonclient && \ go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`" - - -update: mongo_build - docker build --tag walg:2.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo - docker tag walg:2.0 sayedppqq/walg:2.0 - docker push sayedppqq/walg:2.0 \ No newline at end of file diff --git a/cmd/mongo/oplog_fetch.go b/cmd/mongo/oplog_fetch.go index 9d81c63b5..f6cded29e 100644 --- a/cmd/mongo/oplog_fetch.go +++ b/cmd/mongo/oplog_fetch.go @@ -50,7 +50,7 @@ var oplogFetchCmd = &cobra.Command{ tracelog.ErrorLogger.FatalOnError(err) // setup storage fetcher - oplogFetcher := stages.NewStorageFetcher(downloader, path) + oplogFetcher := stages.NewStorageFetcher(downloader, path, "") // run worker cycle err = mongo.HandleOplogReplay(ctx, since, until, oplogFetcher, oplogApplier) diff --git a/cmd/mongo/oplog_push.go b/cmd/mongo/oplog_push.go index fe8a7205d..bab3e74ca 100644 --- a/cmd/mongo/oplog_push.go +++ b/cmd/mongo/oplog_push.go @@ -83,6 +83,7 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo uploader := archive.NewStorageUploader(uplProvider) uploader.SetKubeClient(pushArgs.kubeClient) uploader.SetSnapshot(snapshotName, snapshotNamespace) + uploader.SetDBNode(pushArgs.dbNode) // set up mongodb client and oplog fetcher mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL) @@ -111,6 +112,8 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo if err != nil { return err } + downloader.SetNodeSpecificDownloader(uploader.GetDBNode()) + since, err := discovery.ResolveStartingTS(ctx, downloader, mongoClient) if err != nil { return err @@ -148,6 +151,7 @@ type oplogPushRunArgs struct { archiveAfterSize int archiveTimeout time.Duration mongodbURL string + dbNode string primaryWait bool primaryWaitTimeout time.Duration lwUpdate time.Duration @@ -171,6 +175,10 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { if err != nil { return } + args.dbNode, err = internal.GetRequiredSetting(internal.MongoDBNode) + if err != nil { + return + } args.dbProvider = internal.GetNonRequiredSetting(internal.MongoDBProvider) diff --git a/cmd/mongo/oplog_replay.go b/cmd/mongo/oplog_replay.go index 8784fddc5..03edf041a 100644 --- a/cmd/mongo/oplog_replay.go +++ b/cmd/mongo/oplog_replay.go @@ -3,6 +3,10 @@ package mongo import ( "context" "encoding/json" + "github.com/wal-g/wal-g/internal/databases/mongo/shake" + "os" + "syscall" + "github.com/spf13/cobra" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal" @@ -13,8 +17,6 @@ import ( "github.com/wal-g/wal-g/internal/databases/mongo/oplog" "github.com/wal-g/wal-g/internal/databases/mongo/stages" "github.com/wal-g/wal-g/utility" - "os" - "syscall" ) const LatestBackupString = "LATEST_BACKUP" @@ -47,6 +49,7 @@ type oplogReplayRunArgs struct { ignoreErrCodes map[string][]int32 mongodbURL string + dbNode string oplogAlwaysUpsert *bool oplogApplicationMode *string @@ -73,7 +76,10 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err if err != nil { return } - + args.dbNode, err = internal.GetRequiredSetting(internal.MongoDBNode) + if err != nil { + return + } oplogAlwaysUpsert, hasOplogAlwaysUpsert, err := internal.GetBoolSetting(internal.OplogReplayOplogAlwaysUpsert) if err != nil { return @@ -132,7 +138,8 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { return err } - dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes) + filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter), new(shake.DDLFilter)} + dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList) oplogApplier := stages.NewGenericApplier(dbApplier) // set up storage downloader client @@ -140,22 +147,23 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { if err != nil { return err } + downloader.SetNodeSpecificDownloader(replayArgs.dbNode) // discover archive sequence to replay archives, err := downloader.ListOplogArchives() if err != nil { return err } - // update since and until. since = matched archive start ts , until = matched archiver end ts replayArgs.since, replayArgs.until = archive.GetUpdatedBackupTimes(archives, replayArgs.since, replayArgs.until) dbApplier.SetUntilTime(replayArgs.until) + path, err := archive.SequenceBetweenTS(archives, replayArgs.since, replayArgs.until) if err != nil { return err } // setup storage fetcher - oplogFetcher := stages.NewStorageFetcher(downloader, path) + oplogFetcher := stages.NewStorageFetcher(downloader, path, replayArgs.dbNode) // run worker cycle return mongo.HandleOplogReplay(ctx, replayArgs.since, replayArgs.until, oplogFetcher, oplogApplier) diff --git a/internal/config.go b/internal/config.go index f2dc7ac48..c4ce88e4a 100644 --- a/internal/config.go +++ b/internal/config.go @@ -111,6 +111,7 @@ const ( MongoDBProvider = "MONGODB_PROVIDER" MongoDBPath = "MONGODB_PATH" + MongoDBNode = "MONGODB_NODE" MongoDBUriSetting = "MONGODB_URI" MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL" MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP" diff --git a/internal/databases/mongo/archive/layout.go b/internal/databases/mongo/archive/layout.go index b8d32d966..1ba2603ab 100644 --- a/internal/databases/mongo/archive/layout.go +++ b/internal/databases/mongo/archive/layout.go @@ -75,6 +75,7 @@ func SequenceBetweenTS(archives []models.Archive, since, until models.Timestamp) func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timestamp) (models.Timestamp, models.Timestamp) { var updatedSince models.Timestamp + //var minimum []models.Timestamp for i := range archives { arch := archives[i] if arch.Type != models.ArchiveTypeOplog { @@ -84,7 +85,19 @@ func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timest if arch.In(since) { updatedSince = arch.Start } - } + //minimum = append(minimum, arch.Start) + //fmt.Printf("ar---- %v %v\n", arch.Start, arch.End) + } + //sort.Slice(minimum, func(i, j int) bool { + // if minimum[i].TS == minimum[j].TS { + // return minimum[i].Inc < minimum[j].Inc + // } + // return minimum[i].TS < minimum[j].TS + //}) + //fmt.Println("------", minimum[0], updatedSince) + //if structs.IsZero(updatedSince) { + // updatedSince = minimum[0] + //} return updatedSince, until } diff --git a/internal/databases/mongo/archive/loader.go b/internal/databases/mongo/archive/loader.go index aac6204ec..2f5cd4dee 100644 --- a/internal/databases/mongo/archive/loader.go +++ b/internal/databases/mongo/archive/loader.go @@ -73,6 +73,7 @@ type StorageDownloader struct { rootFolder storage.Folder oplogsFolder storage.Folder backupsFolder storage.Folder + dbNode string } // NewStorageDownloader builds mongodb downloader. @@ -87,6 +88,13 @@ func NewStorageDownloader(opts StorageSettings) (*StorageDownloader, error) { nil } +func (sd *StorageDownloader) SetNodeSpecificDownloader(node string) { + sd.dbNode = node +} +func (sd *StorageDownloader) GetNodeSpecificDownloader() string { + return sd.dbNode +} + // BackupMeta downloads sentinel contents. func (sd *StorageDownloader) BackupMeta(name string) (*models.Backup, error) { return common.DownloadSentinel(sd.backupsFolder, name) @@ -124,7 +132,7 @@ func (sd *StorageDownloader) LastBackupName() (string, error) { // DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive. func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error { - return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.Filename(), arch.Extension(), writeCloser) + return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser) } // ListOplogArchives fetches all oplog archives existed in storage. @@ -137,7 +145,10 @@ func (sd *StorageDownloader) ListOplogArchives() ([]models.Archive, error) { archives := make([]models.Archive, 0, len(objects)) for _, key := range objects { archName := key.GetName() - arch, err := models.ArchFromFilename(archName) + if !isOplogForSpecificNode(archName, sd.dbNode) { + continue + } + arch, err := models.ArchFromFilename(archName, sd.dbNode) if err != nil { return nil, fmt.Errorf("can not convert retrieve timestamps since oplog archive Ext '%s': %w", archName, err) } @@ -155,7 +166,11 @@ func (sd *StorageDownloader) LastKnownArchiveTS() (models.Timestamp, error) { } for _, key := range keys { filename := key.GetName() - arch, err := models.ArchFromFilename(filename) + if !isOplogForSpecificNode(filename, sd.dbNode) { + continue + } + arch, err := models.ArchFromFilename(filename, sd.dbNode) + tracelog.InfoLogger.Printf("key: %v fileName: %v arch: %v", key, filename, arch) if err != nil { return models.Timestamp{}, fmt.Errorf("can not build archive since filename '%s': %w", filename, err) } @@ -209,6 +224,7 @@ type StorageUploader struct { kubeClient controllerruntime.Client snapshotName string snapshotNamespace string + dbNode string } // NewStorageUploader builds mongodb uploader. @@ -225,6 +241,12 @@ func (su *StorageUploader) SetSnapshot(name, namespace string) { su.snapshotName = name su.snapshotNamespace = namespace } +func (su *StorageUploader) SetDBNode(node string) { + su.dbNode = node +} +func (su *StorageUploader) GetDBNode() string { + return su.dbNode +} func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error { var snapshot storageapi.Snapshot @@ -236,6 +258,9 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro return err } + compName := "wal" + compName = compName + "-" + su.GetDBNode() + _, err = kmc.PatchStatus( context.TODO(), su.kubeClient, @@ -244,17 +269,17 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro in := obj.(*storageapi.Snapshot) if len(in.Status.Components) == 0 { in.Status.Components = make(map[string]storageapi.Component) - + } + if _, ok := in.Status.Components[compName]; !ok { walSegments := make([]storageapi.WalSegment, 1) walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)} - in.Status.Components["wal"] = storageapi.Component{ + in.Status.Components[compName] = storageapi.Component{ WalSegments: walSegments, } } - - component := in.Status.Components["wal"] + component := in.Status.Components[compName] component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)} - in.Status.Components["wal"] = component + in.Status.Components[compName] = component return in }, @@ -281,8 +306,10 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea return err } + fileName := arch.DBNodeSpecificFileName(su.dbNode) + // providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage - return su.Upload(ctx, arch.Filename(), bytes.NewReader(su.buf.Bytes())) + return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes())) } // UploadGap uploads mark indicating archiving gap. @@ -296,7 +323,7 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model return fmt.Errorf("can not build archive: %w", err) } - if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.Filename()); err != nil { + if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.DBNodeSpecificFileName(su.dbNode)); err != nil { return fmt.Errorf("error while uploading stream: %w", err) } return nil @@ -366,3 +393,7 @@ func (sp *StoragePurger) DeleteOplogArchives(archives []models.Archive) error { tracelog.DebugLogger.Printf("Oplog keys will be deleted: %+v\n", oplogKeys) return sp.oplogsFolder.DeleteObjects(oplogKeys) } + +func isOplogForSpecificNode(fileName, node string) bool { + return strings.Contains(fileName, node) +} diff --git a/internal/databases/mongo/client/client.go b/internal/databases/mongo/client/client.go index 9ffc589fc..4c9cd05ad 100644 --- a/internal/databases/mongo/client/client.go +++ b/internal/databases/mongo/client/client.go @@ -350,6 +350,9 @@ func (mc *MongoClient) ApplyOp(ctx context.Context, dbop db.Oplog) error { cmd[0] = bson.E{Key: "applyOps", Value: []interface{}{op}} apply := mc.c.Database("admin").RunCommand(ctx, cmd) if err := apply.Err(); err != nil { + //if mongo.IsDuplicateKeyError(err) { + // return nil + //} return err } resp := CmdResponse{} diff --git a/internal/databases/mongo/models/archive.go b/internal/databases/mongo/models/archive.go index f0c24195c..238c52618 100644 --- a/internal/databases/mongo/models/archive.go +++ b/internal/databases/mongo/models/archive.go @@ -15,9 +15,9 @@ const ( ArchiveTypeGap = "gap" ) -var ( - ArchRegexp = regexp.MustCompile(`^(oplog|gap)_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`) -) +//var ( +// ArchRegexp = regexp.MustCompile(`^(oplog|gap)_shard0_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`) +//) // Archive defines oplog archive representation. type Archive struct { @@ -49,6 +49,9 @@ func (a Archive) In(ts Timestamp) bool { func (a Archive) Filename() string { return fmt.Sprintf("%s_%v%s%v.%s", a.Type, a.Start, ArchNameTSDelimiter, a.End, a.Ext) } +func (a Archive) DBNodeSpecificFileName(node string) string { + return fmt.Sprintf("%s_%s_%v%s%v.%s", a.Type, node, a.Start, ArchNameTSDelimiter, a.End, a.Ext) +} // Extension returns extension of archive file name. func (a Archive) Extension() string { @@ -57,7 +60,10 @@ func (a Archive) Extension() string { // ArchFromFilename builds Arch from given path. // TODO: support empty extension -func ArchFromFilename(path string) (Archive, error) { +func ArchFromFilename(path string, node string) (Archive, error) { + format := fmt.Sprintf(`^(oplog|gap)_%v_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`, node) + ArchRegexp := regexp.MustCompile(format) + res := ArchRegexp.FindAllStringSubmatch(path, -1) if len(res) != 1 { return Archive{}, fmt.Errorf("can not parse oplog path: %s", path) diff --git a/internal/databases/mongo/models/archive_test.go b/internal/databases/mongo/models/archive_test.go index efb21b1ad..f01d7c8fa 100644 --- a/internal/databases/mongo/models/archive_test.go +++ b/internal/databases/mongo/models/archive_test.go @@ -280,7 +280,7 @@ func TestArchFromFilename(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := ArchFromFilename(tt.args.path) + got, err := ArchFromFilename(tt.args.path, "shard0") if (err != nil) != tt.wantErr { t.Errorf("ArchFromFilename() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/databases/mongo/oplog/applier.go b/internal/databases/mongo/oplog/applier.go index 2b140f8aa..55c1c7e34 100644 --- a/internal/databases/mongo/oplog/applier.go +++ b/internal/databases/mongo/oplog/applier.go @@ -3,17 +3,17 @@ package oplog import ( "context" "fmt" - "io" - "strings" - "github.com/mongodb/mongo-tools-common/db" "github.com/mongodb/mongo-tools-common/txn" "github.com/mongodb/mongo-tools-common/util" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal/databases/mongo/client" "github.com/wal-g/wal-g/internal/databases/mongo/models" + "github.com/wal-g/wal-g/internal/databases/mongo/shake" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "io" + "strings" ) type TypeAssertionError struct { @@ -78,11 +78,13 @@ type DBApplier struct { preserveUUID bool applyIgnoreErrorCodes map[string][]int32 until models.Timestamp + dbNode string + filterList shake.OplogFilterChain } // NewDBApplier builds DBApplier with given args. -func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32) *DBApplier { - return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, applyIgnoreErrorCodes: ignoreErrCodes} +func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, node string, filterList shake.OplogFilterChain) *DBApplier { + return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList} } func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { @@ -97,11 +99,17 @@ func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { return nil } - if err := ap.shouldSkip(op.Operation, op.Namespace); err != nil { - tracelog.DebugLogger.Printf("skipping op %+v due to: %+v", op, err) - return nil + if strings.HasPrefix(ap.dbNode, "shard") { + if ap.filterList.IterateFilter(&op) { + return nil + } } + //if err := ap.shouldSkip(op.Operation, op.Namespace); err != nil { + // tracelog.DebugLogger.Printf("skipping op %+v due to: %+v", op, err) + // return nil + //} + meta, err := txn.NewMeta(op) if err != nil { return fmt.Errorf("can not extract op metadata: %w", err) @@ -140,7 +148,7 @@ func (ap *DBApplier) shouldSkip(op, ns string) error { } // sharded clusters are not supported yet - if strings.HasPrefix(ns, "config.") { + if (strings.HasPrefix(ns, "config.") || strings.HasPrefix(ns, "admin.")) && ap.dbNode != "configsvr" { return fmt.Errorf("config database op") } @@ -201,7 +209,7 @@ func (ap *DBApplier) handleNonTxnOp(ctx context.Context, op db.Oplog) error { return ap.db.DropIndexes(ctx, dbName, op.Object) } - //tracelog.DebugLogger.Printf("applying op: %+v", op) + tracelog.InfoLogger.Printf("applying op: %+v", op) if err := ap.db.ApplyOp(ctx, op); err != nil { // we ignore some errors (for example 'duplicate key error') // TODO: check after TOOLS-2041 diff --git a/internal/databases/mongo/shake/filter.go b/internal/databases/mongo/shake/filter.go new file mode 100644 index 000000000..575ab7f0b --- /dev/null +++ b/internal/databases/mongo/shake/filter.go @@ -0,0 +1,99 @@ +package shake + +import ( + "fmt" + "github.com/mongodb/mongo-tools-common/db" + "github.com/wal-g/tracelog" + "reflect" + "strings" +) + +// OplogFilter: AutologousFilter, NoopFilter, DDLFilter +type OplogFilter interface { + Filter(log *db.Oplog) bool +} +type OplogFilterChain []OplogFilter + +func (chain OplogFilterChain) IterateFilter(log *db.Oplog) bool { + for _, filter := range chain { + if filter.Filter(log) { + tracelog.InfoLogger.Printf("%v filter oplog[%v]", reflect.TypeOf(filter), log) + return true + } + } + return false +} + +type AutologousFilter struct { +} + +func (filter *AutologousFilter) Filter(log *db.Oplog) bool { + + // Filter out unnecessary commands + if operation, found := ExtraCommandName(log.Object); found { + fmt.Printf("unnecessary commands. operation: %v found: %v\n", operation) + if IsNeedFilterCommand(operation) { + return true + } + } + + // for namespace. we filter noop operation and collection name + // that are admin, local, mongoshake, mongoshake_conflict + return filter.FilterNs(log.Namespace) +} + +// namespace should be filtered. +// key: ns, value: true means prefix, false means contain +var NsShouldBeIgnore = map[string]bool{ + "admin.": true, + "local.": true, + "config.": true, + "system.views": false, +} + +// namespace should not be filtered. +// NsShouldNotBeIgnore has a higher priority than NsShouldBeIgnore +// key: ns, value: true means prefix, false means contain +var NsShouldNotBeIgnore = map[string]bool{ + "admin.$cmd": true, +} + +func (filter *AutologousFilter) FilterNs(namespace string) bool { + // for namespace. we filter noop operation and collection name + // that are admin, local, config, mongoshake, mongoshake_conflict + + // v2.4.13, don't filter admin.$cmd which may include transaction + for key, val := range NsShouldNotBeIgnore { + if val == true && strings.HasPrefix(namespace, key) { + return false + } + if val == false && strings.Contains(namespace, key) { + return false + } + } + + for key, val := range NsShouldBeIgnore { + if val == true && strings.HasPrefix(namespace, key) { + return true + } + if val == false && strings.Contains(namespace, key) { + return true + } + } + return false +} + +type NoopFilter struct { +} + +func (filter *NoopFilter) Filter(log *db.Oplog) bool { + return log.Operation == "n" +} + +type DDLFilter struct { +} + +func (filter *DDLFilter) Filter(log *db.Oplog) bool { + operation, _ := ExtraCommandName(log.Object) + return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes") +} diff --git a/internal/databases/mongo/shake/oplog.go b/internal/databases/mongo/shake/oplog.go new file mode 100644 index 000000000..9deaf8dcf --- /dev/null +++ b/internal/databases/mongo/shake/oplog.go @@ -0,0 +1,53 @@ +package shake + +import ( + "go.mongodb.org/mongo-driver/bson" + "strings" +) + +const ( + PrimaryKey = "_id" +) + +type CommandOperation struct { + concernSyncData bool + runOnAdmin bool // some commands like `renameCollection` need run on admin database + needFilter bool // should be ignored in shake +} + +var opsMap = map[string]*CommandOperation{ + "create": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "createIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "collMod": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropDatabase": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "drop": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "deleteIndex": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "deleteIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropIndex": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "renameCollection": {concernSyncData: false, runOnAdmin: true, needFilter: false}, + "convertToCapped": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "emptycapped": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "applyOps": {concernSyncData: true, runOnAdmin: false, needFilter: false}, + "startIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: true}, + "commitIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "abortIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: true}, +} + +func ExtraCommandName(o bson.D) (string, bool) { + // command name must be at the first position + if len(o) > 0 { + if _, exist := opsMap[o[0].Key]; exist { + return o[0].Key, true + } + } + + return "", false +} + +func IsNeedFilterCommand(operation string) bool { + if op, ok := opsMap[strings.TrimSpace(operation)]; ok { + return op.needFilter + } + return false +} diff --git a/internal/databases/mongo/stages/applier.go b/internal/databases/mongo/stages/applier.go index b1883b5d8..8085be2b4 100644 --- a/internal/databases/mongo/stages/applier.go +++ b/internal/databases/mongo/stages/applier.go @@ -34,6 +34,7 @@ func NewGenericApplier(applier oplog.Applier) *GenericApplier { // Apply runs working cycle that applies oplog records. func (dba *GenericApplier) Apply(ctx context.Context, ch chan *models.Oplog) (chan error, error) { + errc := make(chan error) go func() { defer close(errc) diff --git a/internal/databases/mongo/stages/fetcher.go b/internal/databases/mongo/stages/fetcher.go index 487477cff..8830c4fc0 100644 --- a/internal/databases/mongo/stages/fetcher.go +++ b/internal/databases/mongo/stages/fetcher.go @@ -142,11 +142,12 @@ func (cb *CloserBuffer) Close() error { type StorageFetcher struct { downloader archive.Downloader path archive.Sequence + dbNode string } // NewStorageFetcher builds StorageFetcher instance -func NewStorageFetcher(downloader archive.Downloader, path archive.Sequence) *StorageFetcher { - return &StorageFetcher{downloader: downloader, path: path} +func NewStorageFetcher(downloader archive.Downloader, path archive.Sequence, node string) *StorageFetcher { + return &StorageFetcher{downloader: downloader, path: path, dbNode: node} } // FetchBetween returns channel of oplog records, channel is filled in background. @@ -168,11 +169,11 @@ func (sf *StorageFetcher) FetchBetween(ctx context.Context, firstFound := false for _, arch := range path { - tracelog.DebugLogger.Printf("Fetching archive %s", arch.Filename()) + tracelog.DebugLogger.Printf("Fetching archive %s", arch.DBNodeSpecificFileName(sf.dbNode)) err := sf.downloader.DownloadOplogArchive(arch, buf) if err != nil { - errc <- fmt.Errorf("failed to download archive %s: %w", arch.Filename(), err) + errc <- fmt.Errorf("failed to download archive %s: %w", arch.DBNodeSpecificFileName(sf.dbNode), err) return } @@ -215,7 +216,7 @@ func (sf *StorageFetcher) FetchBetween(ctx context.Context, } buf.Reset() if !firstFound { // TODO: do we need this check, add skip flag - errc <- fmt.Errorf("'from' timestamp '%s' was not found in first archive: %s", from, arch.Filename()) + errc <- fmt.Errorf("'from' timestamp '%s' was not found in first archive: %s", from, arch.DBNodeSpecificFileName(sf.dbNode)) return } } diff --git a/main/mongo/Dockerfile b/main/mongo/Dockerfile new file mode 100644 index 000000000..8e5854d6a --- /dev/null +++ b/main/mongo/Dockerfile @@ -0,0 +1,13 @@ +FROM debian:bookworm + +RUN set -x \ + && apt-get update \ + && apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl; + +#RUN curl -LO https://fastdl.mongodb.org/tools/db/mongodb-database-tools-ubuntu2004-x86_64-100.5.4.tgz \ +# && tar -zxvf mongodb-database-tools-ubuntu2004-x86_64-100.5.4.tgz \ +# && cp -r ./mongodb-database-tools-ubuntu2004-x86_64-100.5.4/bin/* /bin/ ; + +COPY wal-g /bin/wal-g + +ENTRYPOINT [ "/bin/wal-g" ] \ No newline at end of file From 1cce874e946db81e378a4d97aa2fec3c364b3aa8 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Wed, 17 Apr 2024 19:13:32 +0600 Subject: [PATCH 2/4] wip --- internal/databases/mongo/client/client.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/databases/mongo/client/client.go b/internal/databases/mongo/client/client.go index 4c9cd05ad..9bec7774e 100644 --- a/internal/databases/mongo/client/client.go +++ b/internal/databases/mongo/client/client.go @@ -350,9 +350,10 @@ func (mc *MongoClient) ApplyOp(ctx context.Context, dbop db.Oplog) error { cmd[0] = bson.E{Key: "applyOps", Value: []interface{}{op}} apply := mc.c.Database("admin").RunCommand(ctx, cmd) if err := apply.Err(); err != nil { - //if mongo.IsDuplicateKeyError(err) { - // return nil - //} + fmt.Println("-------------------------------------", err) + if mongo.IsDuplicateKeyError(err) { + return nil + } return err } resp := CmdResponse{} From 9026ab2b97eeb37ad9787b1473bc884ca071a9f4 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Thu, 25 Apr 2024 11:25:42 +0600 Subject: [PATCH 3/4] Add: shard support for MongoDB Signed-off-by: sayedppqq --- cmd/mongo/oplog_replay.go | 4 +-- internal/databases/mongo/archive/layout.go | 16 +-------- internal/databases/mongo/archive/loader.go | 7 ++-- internal/databases/mongo/client/client.go | 4 --- internal/databases/mongo/oplog/applier.go | 35 ++++++------------ internal/databases/mongo/shake/filter.go | 42 ++++++++++------------ internal/databases/mongo/stages/applier.go | 1 - 7 files changed, 36 insertions(+), 73 deletions(-) diff --git a/cmd/mongo/oplog_replay.go b/cmd/mongo/oplog_replay.go index 03edf041a..dafe5a248 100644 --- a/cmd/mongo/oplog_replay.go +++ b/cmd/mongo/oplog_replay.go @@ -138,8 +138,8 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { return err } - filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter), new(shake.DDLFilter)} - dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList) + filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter)} + dbApplier := oplog.NewDBApplier(mongoClient, true, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList) oplogApplier := stages.NewGenericApplier(dbApplier) // set up storage downloader client diff --git a/internal/databases/mongo/archive/layout.go b/internal/databases/mongo/archive/layout.go index 1ba2603ab..19a8d8941 100644 --- a/internal/databases/mongo/archive/layout.go +++ b/internal/databases/mongo/archive/layout.go @@ -75,7 +75,6 @@ func SequenceBetweenTS(archives []models.Archive, since, until models.Timestamp) func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timestamp) (models.Timestamp, models.Timestamp) { var updatedSince models.Timestamp - //var minimum []models.Timestamp for i := range archives { arch := archives[i] if arch.Type != models.ArchiveTypeOplog { @@ -85,20 +84,7 @@ func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timest if arch.In(since) { updatedSince = arch.Start } - //minimum = append(minimum, arch.Start) - //fmt.Printf("ar---- %v %v\n", arch.Start, arch.End) - } - //sort.Slice(minimum, func(i, j int) bool { - // if minimum[i].TS == minimum[j].TS { - // return minimum[i].Inc < minimum[j].Inc - // } - // return minimum[i].TS < minimum[j].TS - //}) - //fmt.Println("------", minimum[0], updatedSince) - //if structs.IsZero(updatedSince) { - // updatedSince = minimum[0] - //} - + } return updatedSince, until } diff --git a/internal/databases/mongo/archive/loader.go b/internal/databases/mongo/archive/loader.go index 2f5cd4dee..0aed04da3 100644 --- a/internal/databases/mongo/archive/loader.go +++ b/internal/databases/mongo/archive/loader.go @@ -132,7 +132,8 @@ func (sd *StorageDownloader) LastBackupName() (string, error) { // DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive. func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error { - return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser) + return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), + arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser) } // ListOplogArchives fetches all oplog archives existed in storage. @@ -170,7 +171,6 @@ func (sd *StorageDownloader) LastKnownArchiveTS() (models.Timestamp, error) { continue } arch, err := models.ArchFromFilename(filename, sd.dbNode) - tracelog.InfoLogger.Printf("key: %v fileName: %v arch: %v", key, filename, arch) if err != nil { return models.Timestamp{}, fmt.Errorf("can not build archive since filename '%s': %w", filename, err) } @@ -323,7 +323,8 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model return fmt.Errorf("can not build archive: %w", err) } - if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.DBNodeSpecificFileName(su.dbNode)); err != nil { + if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), + arch.DBNodeSpecificFileName(su.dbNode)); err != nil { return fmt.Errorf("error while uploading stream: %w", err) } return nil diff --git a/internal/databases/mongo/client/client.go b/internal/databases/mongo/client/client.go index 9bec7774e..9ffc589fc 100644 --- a/internal/databases/mongo/client/client.go +++ b/internal/databases/mongo/client/client.go @@ -350,10 +350,6 @@ func (mc *MongoClient) ApplyOp(ctx context.Context, dbop db.Oplog) error { cmd[0] = bson.E{Key: "applyOps", Value: []interface{}{op}} apply := mc.c.Database("admin").RunCommand(ctx, cmd) if err := apply.Err(); err != nil { - fmt.Println("-------------------------------------", err) - if mongo.IsDuplicateKeyError(err) { - return nil - } return err } resp := CmdResponse{} diff --git a/internal/databases/mongo/oplog/applier.go b/internal/databases/mongo/oplog/applier.go index 55c1c7e34..1f78d7cfc 100644 --- a/internal/databases/mongo/oplog/applier.go +++ b/internal/databases/mongo/oplog/applier.go @@ -13,7 +13,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "io" - "strings" ) type TypeAssertionError struct { @@ -83,8 +82,10 @@ type DBApplier struct { } // NewDBApplier builds DBApplier with given args. -func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, node string, filterList shake.OplogFilterChain) *DBApplier { - return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList} +func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, + node string, filterList shake.OplogFilterChain) *DBApplier { + return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, + applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList} } func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { @@ -99,17 +100,12 @@ func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { return nil } - if strings.HasPrefix(ap.dbNode, "shard") { + if ap.dbNode != "configsvr" { if ap.filterList.IterateFilter(&op) { return nil } } - //if err := ap.shouldSkip(op.Operation, op.Namespace); err != nil { - // tracelog.DebugLogger.Printf("skipping op %+v due to: %+v", op, err) - // return nil - //} - meta, err := txn.NewMeta(op) if err != nil { return fmt.Errorf("can not extract op metadata: %w", err) @@ -142,26 +138,15 @@ func (ap *DBApplier) Close(ctx context.Context) error { return nil } -func (ap *DBApplier) shouldSkip(op, ns string) error { - if op == "n" { - return fmt.Errorf("noop op") - } - - // sharded clusters are not supported yet - if (strings.HasPrefix(ns, "config.") || strings.HasPrefix(ns, "admin.")) && ap.dbNode != "configsvr" { - return fmt.Errorf("config database op") - } - - return nil -} - // shouldIgnore checks if error should be ignored func (ap *DBApplier) shouldIgnore(op string, err error) bool { ce, ok := err.(mongo.CommandError) if !ok { return false } - + if mongo.IsDuplicateKeyError(err) { + return true + } ignoreErrorCodes, ok := ap.applyIgnoreErrorCodes[op] if !ok { return false @@ -273,8 +258,8 @@ func indexSpecFromCommitIndexBuilds(op db.Oplog) (string, []client.IndexDocument if !ok { return "", nil, NewTypeAssertionError("bson.D", fmt.Sprintf("indexes[%d]", i), elemE.Value) } - for i := range elements { - elemE = elements[i] + for j := range elements { + elemE = elements[j] if elemE.Key == "key" { if indexSpecs[i].Key, ok = elemE.Value.(bson.D); !ok { return "", nil, NewTypeAssertionError("bson.D", "key", elemE.Value) diff --git a/internal/databases/mongo/shake/filter.go b/internal/databases/mongo/shake/filter.go index 575ab7f0b..7da4a8c60 100644 --- a/internal/databases/mongo/shake/filter.go +++ b/internal/databases/mongo/shake/filter.go @@ -1,14 +1,13 @@ package shake import ( - "fmt" "github.com/mongodb/mongo-tools-common/db" "github.com/wal-g/tracelog" "reflect" "strings" ) -// OplogFilter: AutologousFilter, NoopFilter, DDLFilter +// OplogFilter: AutologousFilter, NoopFilter type OplogFilter interface { Filter(log *db.Oplog) bool } @@ -28,21 +27,16 @@ type AutologousFilter struct { } func (filter *AutologousFilter) Filter(log *db.Oplog) bool { - // Filter out unnecessary commands if operation, found := ExtraCommandName(log.Object); found { - fmt.Printf("unnecessary commands. operation: %v found: %v\n", operation) if IsNeedFilterCommand(operation) { return true } } - - // for namespace. we filter noop operation and collection name - // that are admin, local, mongoshake, mongoshake_conflict return filter.FilterNs(log.Namespace) } -// namespace should be filtered. +// NsShouldBeIgnore for namespaces should be filtered. // key: ns, value: true means prefix, false means contain var NsShouldBeIgnore = map[string]bool{ "admin.": true, @@ -51,32 +45,33 @@ var NsShouldBeIgnore = map[string]bool{ "system.views": false, } -// namespace should not be filtered. // NsShouldNotBeIgnore has a higher priority than NsShouldBeIgnore // key: ns, value: true means prefix, false means contain var NsShouldNotBeIgnore = map[string]bool{ - "admin.$cmd": true, + "admin.$cmd": true, + "admin.system.users": false, + "admin.system.roles": false, } func (filter *AutologousFilter) FilterNs(namespace string) bool { // for namespace. we filter noop operation and collection name - // that are admin, local, config, mongoshake, mongoshake_conflict + // that are admin, local, config // v2.4.13, don't filter admin.$cmd which may include transaction + // we don't filter admin.system.users and admin.system.roles to retrieve roles and users for key, val := range NsShouldNotBeIgnore { - if val == true && strings.HasPrefix(namespace, key) { + if val && strings.HasPrefix(namespace, key) { return false } - if val == false && strings.Contains(namespace, key) { + if !val && strings.Contains(namespace, key) { return false } } - for key, val := range NsShouldBeIgnore { - if val == true && strings.HasPrefix(namespace, key) { + if val && strings.HasPrefix(namespace, key) { return true } - if val == false && strings.Contains(namespace, key) { + if !val && strings.Contains(namespace, key) { return true } } @@ -90,10 +85,11 @@ func (filter *NoopFilter) Filter(log *db.Oplog) bool { return log.Operation == "n" } -type DDLFilter struct { -} - -func (filter *DDLFilter) Filter(log *db.Oplog) bool { - operation, _ := ExtraCommandName(log.Object) - return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes") -} +//type DDLFilter struct { +//} +// +//func (filter *DDLFilter) Filter(log *db.Oplog) bool { +// //operation, _ := ExtraCommandName(log.Object) +// //return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes") +// return false +//} diff --git a/internal/databases/mongo/stages/applier.go b/internal/databases/mongo/stages/applier.go index 8085be2b4..b1883b5d8 100644 --- a/internal/databases/mongo/stages/applier.go +++ b/internal/databases/mongo/stages/applier.go @@ -34,7 +34,6 @@ func NewGenericApplier(applier oplog.Applier) *GenericApplier { // Apply runs working cycle that applies oplog records. func (dba *GenericApplier) Apply(ctx context.Context, ch chan *models.Oplog) (chan error, error) { - errc := make(chan error) go func() { defer close(errc) From ae2a275776511e9c709358eaa46769ab05806269 Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Fri, 31 May 2024 19:05:42 +0600 Subject: [PATCH 4/4] nts Signed-off-by: sayedppqq --- Makefile | 5 +++++ internal/databases/mongo/oplog/applier.go | 2 +- internal/databases/mongo/shake/filter.go | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index eb2a4ffb6..5fca7b11b 100644 --- a/Makefile +++ b/Makefile @@ -285,3 +285,8 @@ unlink_libsodium: build_client: cd cmd/daemonclient && \ go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`" + +#update: mongo_build +# docker build --tag walg:1.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo +# docker tag walg:1.0 sayedppqq/walg:1.0 +# docker push sayedppqq/walg:1.0 \ No newline at end of file diff --git a/internal/databases/mongo/oplog/applier.go b/internal/databases/mongo/oplog/applier.go index 1f78d7cfc..b23675b09 100644 --- a/internal/databases/mongo/oplog/applier.go +++ b/internal/databases/mongo/oplog/applier.go @@ -194,7 +194,7 @@ func (ap *DBApplier) handleNonTxnOp(ctx context.Context, op db.Oplog) error { return ap.db.DropIndexes(ctx, dbName, op.Object) } - tracelog.InfoLogger.Printf("applying op: %+v", op) + //tracelog.DebugLogger.Printf("applying op: %+v", op) if err := ap.db.ApplyOp(ctx, op); err != nil { // we ignore some errors (for example 'duplicate key error') // TODO: check after TOOLS-2041 diff --git a/internal/databases/mongo/shake/filter.go b/internal/databases/mongo/shake/filter.go index 7da4a8c60..a1ccbf512 100644 --- a/internal/databases/mongo/shake/filter.go +++ b/internal/databases/mongo/shake/filter.go @@ -16,7 +16,7 @@ type OplogFilterChain []OplogFilter func (chain OplogFilterChain) IterateFilter(log *db.Oplog) bool { for _, filter := range chain { if filter.Filter(log) { - tracelog.InfoLogger.Printf("%v filter oplog[%v]", reflect.TypeOf(filter), log) + tracelog.DebugLogger.Printf("%v filter oplog[%v]", reflect.TypeOf(filter), log) return true } }