diff --git a/Makefile b/Makefile index a3cf3f922..5fca7b11b 100644 --- a/Makefile +++ b/Makefile @@ -286,8 +286,7 @@ build_client: cd cmd/daemonclient && \ go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`" - -update: mongo_build - docker build --tag walg:2.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo - docker tag walg:2.0 sayedppqq/walg:2.0 - docker push sayedppqq/walg:2.0 \ No newline at end of file +#update: mongo_build +# docker build --tag walg:1.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo +# docker tag walg:1.0 sayedppqq/walg:1.0 +# docker push sayedppqq/walg:1.0 \ No newline at end of file diff --git a/cmd/mongo/oplog_fetch.go b/cmd/mongo/oplog_fetch.go index 9d81c63b5..f6cded29e 100644 --- a/cmd/mongo/oplog_fetch.go +++ b/cmd/mongo/oplog_fetch.go @@ -50,7 +50,7 @@ var oplogFetchCmd = &cobra.Command{ tracelog.ErrorLogger.FatalOnError(err) // setup storage fetcher - oplogFetcher := stages.NewStorageFetcher(downloader, path) + oplogFetcher := stages.NewStorageFetcher(downloader, path, "") // run worker cycle err = mongo.HandleOplogReplay(ctx, since, until, oplogFetcher, oplogApplier) diff --git a/cmd/mongo/oplog_push.go b/cmd/mongo/oplog_push.go index fe8a7205d..bab3e74ca 100644 --- a/cmd/mongo/oplog_push.go +++ b/cmd/mongo/oplog_push.go @@ -83,6 +83,7 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo uploader := archive.NewStorageUploader(uplProvider) uploader.SetKubeClient(pushArgs.kubeClient) uploader.SetSnapshot(snapshotName, snapshotNamespace) + uploader.SetDBNode(pushArgs.dbNode) // set up mongodb client and oplog fetcher mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL) @@ -111,6 +112,8 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo if err != nil { return err } + downloader.SetNodeSpecificDownloader(uploader.GetDBNode()) + since, err := discovery.ResolveStartingTS(ctx, downloader, mongoClient) if err != nil { return err @@ -148,6 +151,7 @@ type oplogPushRunArgs struct { archiveAfterSize int archiveTimeout time.Duration mongodbURL string + dbNode string primaryWait bool primaryWaitTimeout time.Duration lwUpdate time.Duration @@ -171,6 +175,10 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { if err != nil { return } + args.dbNode, err = internal.GetRequiredSetting(internal.MongoDBNode) + if err != nil { + return + } args.dbProvider = internal.GetNonRequiredSetting(internal.MongoDBProvider) diff --git a/cmd/mongo/oplog_replay.go b/cmd/mongo/oplog_replay.go index 8784fddc5..dafe5a248 100644 --- a/cmd/mongo/oplog_replay.go +++ b/cmd/mongo/oplog_replay.go @@ -3,6 +3,10 @@ package mongo import ( "context" "encoding/json" + "github.com/wal-g/wal-g/internal/databases/mongo/shake" + "os" + "syscall" + "github.com/spf13/cobra" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal" @@ -13,8 +17,6 @@ import ( "github.com/wal-g/wal-g/internal/databases/mongo/oplog" "github.com/wal-g/wal-g/internal/databases/mongo/stages" "github.com/wal-g/wal-g/utility" - "os" - "syscall" ) const LatestBackupString = "LATEST_BACKUP" @@ -47,6 +49,7 @@ type oplogReplayRunArgs struct { ignoreErrCodes map[string][]int32 mongodbURL string + dbNode string oplogAlwaysUpsert *bool oplogApplicationMode *string @@ -73,7 +76,10 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err if err != nil { return } - + args.dbNode, err = internal.GetRequiredSetting(internal.MongoDBNode) + if err != nil { + return + } oplogAlwaysUpsert, hasOplogAlwaysUpsert, err := internal.GetBoolSetting(internal.OplogReplayOplogAlwaysUpsert) if err != nil { return @@ -132,7 +138,8 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { return err } - dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes) + filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter)} + dbApplier := oplog.NewDBApplier(mongoClient, true, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList) oplogApplier := stages.NewGenericApplier(dbApplier) // set up storage downloader client @@ -140,22 +147,23 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { if err != nil { return err } + downloader.SetNodeSpecificDownloader(replayArgs.dbNode) // discover archive sequence to replay archives, err := downloader.ListOplogArchives() if err != nil { return err } - // update since and until. since = matched archive start ts , until = matched archiver end ts replayArgs.since, replayArgs.until = archive.GetUpdatedBackupTimes(archives, replayArgs.since, replayArgs.until) dbApplier.SetUntilTime(replayArgs.until) + path, err := archive.SequenceBetweenTS(archives, replayArgs.since, replayArgs.until) if err != nil { return err } // setup storage fetcher - oplogFetcher := stages.NewStorageFetcher(downloader, path) + oplogFetcher := stages.NewStorageFetcher(downloader, path, replayArgs.dbNode) // run worker cycle return mongo.HandleOplogReplay(ctx, replayArgs.since, replayArgs.until, oplogFetcher, oplogApplier) diff --git a/internal/config.go b/internal/config.go index f2dc7ac48..c4ce88e4a 100644 --- a/internal/config.go +++ b/internal/config.go @@ -111,6 +111,7 @@ const ( MongoDBProvider = "MONGODB_PROVIDER" MongoDBPath = "MONGODB_PATH" + MongoDBNode = "MONGODB_NODE" MongoDBUriSetting = "MONGODB_URI" MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL" MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP" diff --git a/internal/databases/mongo/archive/layout.go b/internal/databases/mongo/archive/layout.go index b8d32d966..19a8d8941 100644 --- a/internal/databases/mongo/archive/layout.go +++ b/internal/databases/mongo/archive/layout.go @@ -85,7 +85,6 @@ func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timest updatedSince = arch.Start } } - return updatedSince, until } diff --git a/internal/databases/mongo/archive/loader.go b/internal/databases/mongo/archive/loader.go index aac6204ec..0aed04da3 100644 --- a/internal/databases/mongo/archive/loader.go +++ b/internal/databases/mongo/archive/loader.go @@ -73,6 +73,7 @@ type StorageDownloader struct { rootFolder storage.Folder oplogsFolder storage.Folder backupsFolder storage.Folder + dbNode string } // NewStorageDownloader builds mongodb downloader. @@ -87,6 +88,13 @@ func NewStorageDownloader(opts StorageSettings) (*StorageDownloader, error) { nil } +func (sd *StorageDownloader) SetNodeSpecificDownloader(node string) { + sd.dbNode = node +} +func (sd *StorageDownloader) GetNodeSpecificDownloader() string { + return sd.dbNode +} + // BackupMeta downloads sentinel contents. func (sd *StorageDownloader) BackupMeta(name string) (*models.Backup, error) { return common.DownloadSentinel(sd.backupsFolder, name) @@ -124,7 +132,8 @@ func (sd *StorageDownloader) LastBackupName() (string, error) { // DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive. func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error { - return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.Filename(), arch.Extension(), writeCloser) + return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), + arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser) } // ListOplogArchives fetches all oplog archives existed in storage. @@ -137,7 +146,10 @@ func (sd *StorageDownloader) ListOplogArchives() ([]models.Archive, error) { archives := make([]models.Archive, 0, len(objects)) for _, key := range objects { archName := key.GetName() - arch, err := models.ArchFromFilename(archName) + if !isOplogForSpecificNode(archName, sd.dbNode) { + continue + } + arch, err := models.ArchFromFilename(archName, sd.dbNode) if err != nil { return nil, fmt.Errorf("can not convert retrieve timestamps since oplog archive Ext '%s': %w", archName, err) } @@ -155,7 +167,10 @@ func (sd *StorageDownloader) LastKnownArchiveTS() (models.Timestamp, error) { } for _, key := range keys { filename := key.GetName() - arch, err := models.ArchFromFilename(filename) + if !isOplogForSpecificNode(filename, sd.dbNode) { + continue + } + arch, err := models.ArchFromFilename(filename, sd.dbNode) if err != nil { return models.Timestamp{}, fmt.Errorf("can not build archive since filename '%s': %w", filename, err) } @@ -209,6 +224,7 @@ type StorageUploader struct { kubeClient controllerruntime.Client snapshotName string snapshotNamespace string + dbNode string } // NewStorageUploader builds mongodb uploader. @@ -225,6 +241,12 @@ func (su *StorageUploader) SetSnapshot(name, namespace string) { su.snapshotName = name su.snapshotNamespace = namespace } +func (su *StorageUploader) SetDBNode(node string) { + su.dbNode = node +} +func (su *StorageUploader) GetDBNode() string { + return su.dbNode +} func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error { var snapshot storageapi.Snapshot @@ -236,6 +258,9 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro return err } + compName := "wal" + compName = compName + "-" + su.GetDBNode() + _, err = kmc.PatchStatus( context.TODO(), su.kubeClient, @@ -244,17 +269,17 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro in := obj.(*storageapi.Snapshot) if len(in.Status.Components) == 0 { in.Status.Components = make(map[string]storageapi.Component) - + } + if _, ok := in.Status.Components[compName]; !ok { walSegments := make([]storageapi.WalSegment, 1) walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)} - in.Status.Components["wal"] = storageapi.Component{ + in.Status.Components[compName] = storageapi.Component{ WalSegments: walSegments, } } - - component := in.Status.Components["wal"] + component := in.Status.Components[compName] component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)} - in.Status.Components["wal"] = component + in.Status.Components[compName] = component return in }, @@ -281,8 +306,10 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea return err } + fileName := arch.DBNodeSpecificFileName(su.dbNode) + // providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage - return su.Upload(ctx, arch.Filename(), bytes.NewReader(su.buf.Bytes())) + return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes())) } // UploadGap uploads mark indicating archiving gap. @@ -296,7 +323,8 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model return fmt.Errorf("can not build archive: %w", err) } - if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.Filename()); err != nil { + if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), + arch.DBNodeSpecificFileName(su.dbNode)); err != nil { return fmt.Errorf("error while uploading stream: %w", err) } return nil @@ -366,3 +394,7 @@ func (sp *StoragePurger) DeleteOplogArchives(archives []models.Archive) error { tracelog.DebugLogger.Printf("Oplog keys will be deleted: %+v\n", oplogKeys) return sp.oplogsFolder.DeleteObjects(oplogKeys) } + +func isOplogForSpecificNode(fileName, node string) bool { + return strings.Contains(fileName, node) +} diff --git a/internal/databases/mongo/models/archive.go b/internal/databases/mongo/models/archive.go index f0c24195c..238c52618 100644 --- a/internal/databases/mongo/models/archive.go +++ b/internal/databases/mongo/models/archive.go @@ -15,9 +15,9 @@ const ( ArchiveTypeGap = "gap" ) -var ( - ArchRegexp = regexp.MustCompile(`^(oplog|gap)_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`) -) +//var ( +// ArchRegexp = regexp.MustCompile(`^(oplog|gap)_shard0_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`) +//) // Archive defines oplog archive representation. type Archive struct { @@ -49,6 +49,9 @@ func (a Archive) In(ts Timestamp) bool { func (a Archive) Filename() string { return fmt.Sprintf("%s_%v%s%v.%s", a.Type, a.Start, ArchNameTSDelimiter, a.End, a.Ext) } +func (a Archive) DBNodeSpecificFileName(node string) string { + return fmt.Sprintf("%s_%s_%v%s%v.%s", a.Type, node, a.Start, ArchNameTSDelimiter, a.End, a.Ext) +} // Extension returns extension of archive file name. func (a Archive) Extension() string { @@ -57,7 +60,10 @@ func (a Archive) Extension() string { // ArchFromFilename builds Arch from given path. // TODO: support empty extension -func ArchFromFilename(path string) (Archive, error) { +func ArchFromFilename(path string, node string) (Archive, error) { + format := fmt.Sprintf(`^(oplog|gap)_%v_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`, node) + ArchRegexp := regexp.MustCompile(format) + res := ArchRegexp.FindAllStringSubmatch(path, -1) if len(res) != 1 { return Archive{}, fmt.Errorf("can not parse oplog path: %s", path) diff --git a/internal/databases/mongo/models/archive_test.go b/internal/databases/mongo/models/archive_test.go index efb21b1ad..f01d7c8fa 100644 --- a/internal/databases/mongo/models/archive_test.go +++ b/internal/databases/mongo/models/archive_test.go @@ -280,7 +280,7 @@ func TestArchFromFilename(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := ArchFromFilename(tt.args.path) + got, err := ArchFromFilename(tt.args.path, "shard0") if (err != nil) != tt.wantErr { t.Errorf("ArchFromFilename() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/databases/mongo/oplog/applier.go b/internal/databases/mongo/oplog/applier.go index 2b140f8aa..b23675b09 100644 --- a/internal/databases/mongo/oplog/applier.go +++ b/internal/databases/mongo/oplog/applier.go @@ -3,17 +3,16 @@ package oplog import ( "context" "fmt" - "io" - "strings" - "github.com/mongodb/mongo-tools-common/db" "github.com/mongodb/mongo-tools-common/txn" "github.com/mongodb/mongo-tools-common/util" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal/databases/mongo/client" "github.com/wal-g/wal-g/internal/databases/mongo/models" + "github.com/wal-g/wal-g/internal/databases/mongo/shake" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "io" ) type TypeAssertionError struct { @@ -78,11 +77,15 @@ type DBApplier struct { preserveUUID bool applyIgnoreErrorCodes map[string][]int32 until models.Timestamp + dbNode string + filterList shake.OplogFilterChain } // NewDBApplier builds DBApplier with given args. -func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32) *DBApplier { - return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, applyIgnoreErrorCodes: ignoreErrCodes} +func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, + node string, filterList shake.OplogFilterChain) *DBApplier { + return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, + applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList} } func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { @@ -97,9 +100,10 @@ func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { return nil } - if err := ap.shouldSkip(op.Operation, op.Namespace); err != nil { - tracelog.DebugLogger.Printf("skipping op %+v due to: %+v", op, err) - return nil + if ap.dbNode != "configsvr" { + if ap.filterList.IterateFilter(&op) { + return nil + } } meta, err := txn.NewMeta(op) @@ -134,26 +138,15 @@ func (ap *DBApplier) Close(ctx context.Context) error { return nil } -func (ap *DBApplier) shouldSkip(op, ns string) error { - if op == "n" { - return fmt.Errorf("noop op") - } - - // sharded clusters are not supported yet - if strings.HasPrefix(ns, "config.") { - return fmt.Errorf("config database op") - } - - return nil -} - // shouldIgnore checks if error should be ignored func (ap *DBApplier) shouldIgnore(op string, err error) bool { ce, ok := err.(mongo.CommandError) if !ok { return false } - + if mongo.IsDuplicateKeyError(err) { + return true + } ignoreErrorCodes, ok := ap.applyIgnoreErrorCodes[op] if !ok { return false @@ -265,8 +258,8 @@ func indexSpecFromCommitIndexBuilds(op db.Oplog) (string, []client.IndexDocument if !ok { return "", nil, NewTypeAssertionError("bson.D", fmt.Sprintf("indexes[%d]", i), elemE.Value) } - for i := range elements { - elemE = elements[i] + for j := range elements { + elemE = elements[j] if elemE.Key == "key" { if indexSpecs[i].Key, ok = elemE.Value.(bson.D); !ok { return "", nil, NewTypeAssertionError("bson.D", "key", elemE.Value) diff --git a/internal/databases/mongo/shake/filter.go b/internal/databases/mongo/shake/filter.go new file mode 100644 index 000000000..a1ccbf512 --- /dev/null +++ b/internal/databases/mongo/shake/filter.go @@ -0,0 +1,95 @@ +package shake + +import ( + "github.com/mongodb/mongo-tools-common/db" + "github.com/wal-g/tracelog" + "reflect" + "strings" +) + +// OplogFilter: AutologousFilter, NoopFilter +type OplogFilter interface { + Filter(log *db.Oplog) bool +} +type OplogFilterChain []OplogFilter + +func (chain OplogFilterChain) IterateFilter(log *db.Oplog) bool { + for _, filter := range chain { + if filter.Filter(log) { + tracelog.DebugLogger.Printf("%v filter oplog[%v]", reflect.TypeOf(filter), log) + return true + } + } + return false +} + +type AutologousFilter struct { +} + +func (filter *AutologousFilter) Filter(log *db.Oplog) bool { + // Filter out unnecessary commands + if operation, found := ExtraCommandName(log.Object); found { + if IsNeedFilterCommand(operation) { + return true + } + } + return filter.FilterNs(log.Namespace) +} + +// NsShouldBeIgnore for namespaces should be filtered. +// key: ns, value: true means prefix, false means contain +var NsShouldBeIgnore = map[string]bool{ + "admin.": true, + "local.": true, + "config.": true, + "system.views": false, +} + +// NsShouldNotBeIgnore has a higher priority than NsShouldBeIgnore +// key: ns, value: true means prefix, false means contain +var NsShouldNotBeIgnore = map[string]bool{ + "admin.$cmd": true, + "admin.system.users": false, + "admin.system.roles": false, +} + +func (filter *AutologousFilter) FilterNs(namespace string) bool { + // for namespace. we filter noop operation and collection name + // that are admin, local, config + + // v2.4.13, don't filter admin.$cmd which may include transaction + // we don't filter admin.system.users and admin.system.roles to retrieve roles and users + for key, val := range NsShouldNotBeIgnore { + if val && strings.HasPrefix(namespace, key) { + return false + } + if !val && strings.Contains(namespace, key) { + return false + } + } + for key, val := range NsShouldBeIgnore { + if val && strings.HasPrefix(namespace, key) { + return true + } + if !val && strings.Contains(namespace, key) { + return true + } + } + return false +} + +type NoopFilter struct { +} + +func (filter *NoopFilter) Filter(log *db.Oplog) bool { + return log.Operation == "n" +} + +//type DDLFilter struct { +//} +// +//func (filter *DDLFilter) Filter(log *db.Oplog) bool { +// //operation, _ := ExtraCommandName(log.Object) +// //return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes") +// return false +//} diff --git a/internal/databases/mongo/shake/oplog.go b/internal/databases/mongo/shake/oplog.go new file mode 100644 index 000000000..9deaf8dcf --- /dev/null +++ b/internal/databases/mongo/shake/oplog.go @@ -0,0 +1,53 @@ +package shake + +import ( + "go.mongodb.org/mongo-driver/bson" + "strings" +) + +const ( + PrimaryKey = "_id" +) + +type CommandOperation struct { + concernSyncData bool + runOnAdmin bool // some commands like `renameCollection` need run on admin database + needFilter bool // should be ignored in shake +} + +var opsMap = map[string]*CommandOperation{ + "create": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "createIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "collMod": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropDatabase": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "drop": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "deleteIndex": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "deleteIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropIndex": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "renameCollection": {concernSyncData: false, runOnAdmin: true, needFilter: false}, + "convertToCapped": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "emptycapped": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "applyOps": {concernSyncData: true, runOnAdmin: false, needFilter: false}, + "startIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: true}, + "commitIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "abortIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: true}, +} + +func ExtraCommandName(o bson.D) (string, bool) { + // command name must be at the first position + if len(o) > 0 { + if _, exist := opsMap[o[0].Key]; exist { + return o[0].Key, true + } + } + + return "", false +} + +func IsNeedFilterCommand(operation string) bool { + if op, ok := opsMap[strings.TrimSpace(operation)]; ok { + return op.needFilter + } + return false +} diff --git a/internal/databases/mongo/stages/fetcher.go b/internal/databases/mongo/stages/fetcher.go index 487477cff..8830c4fc0 100644 --- a/internal/databases/mongo/stages/fetcher.go +++ b/internal/databases/mongo/stages/fetcher.go @@ -142,11 +142,12 @@ func (cb *CloserBuffer) Close() error { type StorageFetcher struct { downloader archive.Downloader path archive.Sequence + dbNode string } // NewStorageFetcher builds StorageFetcher instance -func NewStorageFetcher(downloader archive.Downloader, path archive.Sequence) *StorageFetcher { - return &StorageFetcher{downloader: downloader, path: path} +func NewStorageFetcher(downloader archive.Downloader, path archive.Sequence, node string) *StorageFetcher { + return &StorageFetcher{downloader: downloader, path: path, dbNode: node} } // FetchBetween returns channel of oplog records, channel is filled in background. @@ -168,11 +169,11 @@ func (sf *StorageFetcher) FetchBetween(ctx context.Context, firstFound := false for _, arch := range path { - tracelog.DebugLogger.Printf("Fetching archive %s", arch.Filename()) + tracelog.DebugLogger.Printf("Fetching archive %s", arch.DBNodeSpecificFileName(sf.dbNode)) err := sf.downloader.DownloadOplogArchive(arch, buf) if err != nil { - errc <- fmt.Errorf("failed to download archive %s: %w", arch.Filename(), err) + errc <- fmt.Errorf("failed to download archive %s: %w", arch.DBNodeSpecificFileName(sf.dbNode), err) return } @@ -215,7 +216,7 @@ func (sf *StorageFetcher) FetchBetween(ctx context.Context, } buf.Reset() if !firstFound { // TODO: do we need this check, add skip flag - errc <- fmt.Errorf("'from' timestamp '%s' was not found in first archive: %s", from, arch.Filename()) + errc <- fmt.Errorf("'from' timestamp '%s' was not found in first archive: %s", from, arch.DBNodeSpecificFileName(sf.dbNode)) return } } diff --git a/main/mongo/Dockerfile b/main/mongo/Dockerfile new file mode 100644 index 000000000..8e5854d6a --- /dev/null +++ b/main/mongo/Dockerfile @@ -0,0 +1,13 @@ +FROM debian:bookworm + +RUN set -x \ + && apt-get update \ + && apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl; + +#RUN curl -LO https://fastdl.mongodb.org/tools/db/mongodb-database-tools-ubuntu2004-x86_64-100.5.4.tgz \ +# && tar -zxvf mongodb-database-tools-ubuntu2004-x86_64-100.5.4.tgz \ +# && cp -r ./mongodb-database-tools-ubuntu2004-x86_64-100.5.4/bin/* /bin/ ; + +COPY wal-g /bin/wal-g + +ENTRYPOINT [ "/bin/wal-g" ] \ No newline at end of file