Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MongoDB Shard support for oplog-push and oplog-replay #25

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ build_client:
cd cmd/daemonclient && \
go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`"


update: mongo_build
docker build --tag walg:2.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo
docker tag walg:2.0 sayedppqq/walg:2.0
docker push sayedppqq/walg:2.0
#update: mongo_build
# docker build --tag walg:1.0 /home/sayed/go/src/kubedb.dev/wal-g/main/mongo
# docker tag walg:1.0 sayedppqq/walg:1.0
# docker push sayedppqq/walg:1.0
2 changes: 1 addition & 1 deletion cmd/mongo/oplog_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var oplogFetchCmd = &cobra.Command{
tracelog.ErrorLogger.FatalOnError(err)

// setup storage fetcher
oplogFetcher := stages.NewStorageFetcher(downloader, path)
oplogFetcher := stages.NewStorageFetcher(downloader, path, "")

// run worker cycle
err = mongo.HandleOplogReplay(ctx, since, until, oplogFetcher, oplogApplier)
Expand Down
8 changes: 8 additions & 0 deletions cmd/mongo/oplog_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
uploader := archive.NewStorageUploader(uplProvider)
uploader.SetKubeClient(pushArgs.kubeClient)
uploader.SetSnapshot(snapshotName, snapshotNamespace)
uploader.SetDBNode(pushArgs.dbNode)

// set up mongodb client and oplog fetcher
mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL)
Expand Down Expand Up @@ -111,6 +112,8 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
if err != nil {
return err
}
downloader.SetNodeSpecificDownloader(uploader.GetDBNode())

since, err := discovery.ResolveStartingTS(ctx, downloader, mongoClient)
if err != nil {
return err
Expand Down Expand Up @@ -148,6 +151,7 @@ type oplogPushRunArgs struct {
archiveAfterSize int
archiveTimeout time.Duration
mongodbURL string
dbNode string
primaryWait bool
primaryWaitTimeout time.Duration
lwUpdate time.Duration
Expand All @@ -171,6 +175,10 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
if err != nil {
return
}
args.dbNode, err = internal.GetRequiredSetting(internal.MongoDBNode)
if err != nil {
return
}

args.dbProvider = internal.GetNonRequiredSetting(internal.MongoDBProvider)

Expand Down
20 changes: 14 additions & 6 deletions cmd/mongo/oplog_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package mongo
import (
"context"
"encoding/json"
"github.com/wal-g/wal-g/internal/databases/mongo/shake"
"os"
"syscall"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
Expand All @@ -13,8 +17,6 @@ import (
"github.com/wal-g/wal-g/internal/databases/mongo/oplog"
"github.com/wal-g/wal-g/internal/databases/mongo/stages"
"github.com/wal-g/wal-g/utility"
"os"
"syscall"
)

const LatestBackupString = "LATEST_BACKUP"
Expand Down Expand Up @@ -47,6 +49,7 @@ type oplogReplayRunArgs struct {

ignoreErrCodes map[string][]int32
mongodbURL string
dbNode string

oplogAlwaysUpsert *bool
oplogApplicationMode *string
Expand All @@ -73,7 +76,10 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err
if err != nil {
return
}

args.dbNode, err = internal.GetRequiredSetting(internal.MongoDBNode)
if err != nil {
return
}
oplogAlwaysUpsert, hasOplogAlwaysUpsert, err := internal.GetBoolSetting(internal.OplogReplayOplogAlwaysUpsert)
if err != nil {
return
Expand Down Expand Up @@ -132,30 +138,32 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error {
return err
}

dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes)
filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter)}
dbApplier := oplog.NewDBApplier(mongoClient, true, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList)
oplogApplier := stages.NewGenericApplier(dbApplier)

// set up storage downloader client
downloader, err := archive.NewStorageDownloader(archive.NewDefaultStorageSettings())
if err != nil {
return err
}
downloader.SetNodeSpecificDownloader(replayArgs.dbNode)
// discover archive sequence to replay
archives, err := downloader.ListOplogArchives()
if err != nil {
return err
}

// update since and until. since = matched archive start ts , until = matched archiver end ts
replayArgs.since, replayArgs.until = archive.GetUpdatedBackupTimes(archives, replayArgs.since, replayArgs.until)
dbApplier.SetUntilTime(replayArgs.until)

path, err := archive.SequenceBetweenTS(archives, replayArgs.since, replayArgs.until)
if err != nil {
return err
}

// setup storage fetcher
oplogFetcher := stages.NewStorageFetcher(downloader, path)
oplogFetcher := stages.NewStorageFetcher(downloader, path, replayArgs.dbNode)

// run worker cycle
return mongo.HandleOplogReplay(ctx, replayArgs.since, replayArgs.until, oplogFetcher, oplogApplier)
Expand Down
1 change: 1 addition & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ const (

MongoDBProvider = "MONGODB_PROVIDER"
MongoDBPath = "MONGODB_PATH"
MongoDBNode = "MONGODB_NODE"
MongoDBUriSetting = "MONGODB_URI"
MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL"
MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP"
Expand Down
1 change: 0 additions & 1 deletion internal/databases/mongo/archive/layout.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func GetUpdatedBackupTimes(archives []models.Archive, since, until models.Timest
updatedSince = arch.Start
}
}

return updatedSince, until
}

Expand Down
52 changes: 42 additions & 10 deletions internal/databases/mongo/archive/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type StorageDownloader struct {
rootFolder storage.Folder
oplogsFolder storage.Folder
backupsFolder storage.Folder
dbNode string
}

// NewStorageDownloader builds mongodb downloader.
Expand All @@ -87,6 +88,13 @@ func NewStorageDownloader(opts StorageSettings) (*StorageDownloader, error) {
nil
}

func (sd *StorageDownloader) SetNodeSpecificDownloader(node string) {
sd.dbNode = node
}
func (sd *StorageDownloader) GetNodeSpecificDownloader() string {
return sd.dbNode
}

// BackupMeta downloads sentinel contents.
func (sd *StorageDownloader) BackupMeta(name string) (*models.Backup, error) {
return common.DownloadSentinel(sd.backupsFolder, name)
Expand Down Expand Up @@ -124,7 +132,8 @@ func (sd *StorageDownloader) LastBackupName() (string, error) {

// DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive.
func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error {
return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.Filename(), arch.Extension(), writeCloser)
return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder),
arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser)
}

// ListOplogArchives fetches all oplog archives existed in storage.
Expand All @@ -137,7 +146,10 @@ func (sd *StorageDownloader) ListOplogArchives() ([]models.Archive, error) {
archives := make([]models.Archive, 0, len(objects))
for _, key := range objects {
archName := key.GetName()
arch, err := models.ArchFromFilename(archName)
if !isOplogForSpecificNode(archName, sd.dbNode) {
continue
}
arch, err := models.ArchFromFilename(archName, sd.dbNode)
if err != nil {
return nil, fmt.Errorf("can not convert retrieve timestamps since oplog archive Ext '%s': %w", archName, err)
}
Expand All @@ -155,7 +167,10 @@ func (sd *StorageDownloader) LastKnownArchiveTS() (models.Timestamp, error) {
}
for _, key := range keys {
filename := key.GetName()
arch, err := models.ArchFromFilename(filename)
if !isOplogForSpecificNode(filename, sd.dbNode) {
continue
}
arch, err := models.ArchFromFilename(filename, sd.dbNode)
if err != nil {
return models.Timestamp{}, fmt.Errorf("can not build archive since filename '%s': %w", filename, err)
}
Expand Down Expand Up @@ -209,6 +224,7 @@ type StorageUploader struct {
kubeClient controllerruntime.Client
snapshotName string
snapshotNamespace string
dbNode string
}

// NewStorageUploader builds mongodb uploader.
Expand All @@ -225,6 +241,12 @@ func (su *StorageUploader) SetSnapshot(name, namespace string) {
su.snapshotName = name
su.snapshotNamespace = namespace
}
func (su *StorageUploader) SetDBNode(node string) {
su.dbNode = node
}
func (su *StorageUploader) GetDBNode() string {
return su.dbNode
}

func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error {
var snapshot storageapi.Snapshot
Expand All @@ -236,6 +258,9 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
return err
}

compName := "wal"
compName = compName + "-" + su.GetDBNode()

_, err = kmc.PatchStatus(
context.TODO(),
su.kubeClient,
Expand All @@ -244,17 +269,17 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
in := obj.(*storageapi.Snapshot)
if len(in.Status.Components) == 0 {
in.Status.Components = make(map[string]storageapi.Component)

}
if _, ok := in.Status.Components[compName]; !ok {
walSegments := make([]storageapi.WalSegment, 1)
walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}
in.Status.Components["wal"] = storageapi.Component{
in.Status.Components[compName] = storageapi.Component{
WalSegments: walSegments,
}
}

component := in.Status.Components["wal"]
component := in.Status.Components[compName]
component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}
in.Status.Components["wal"] = component
in.Status.Components[compName] = component

return in
},
Expand All @@ -281,8 +306,10 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea
return err
}

fileName := arch.DBNodeSpecificFileName(su.dbNode)

// providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage
return su.Upload(ctx, arch.Filename(), bytes.NewReader(su.buf.Bytes()))
return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
}

// UploadGap uploads mark indicating archiving gap.
Expand All @@ -296,7 +323,8 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model
return fmt.Errorf("can not build archive: %w", err)
}

if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.Filename()); err != nil {
if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()),
arch.DBNodeSpecificFileName(su.dbNode)); err != nil {
return fmt.Errorf("error while uploading stream: %w", err)
}
return nil
Expand Down Expand Up @@ -366,3 +394,7 @@ func (sp *StoragePurger) DeleteOplogArchives(archives []models.Archive) error {
tracelog.DebugLogger.Printf("Oplog keys will be deleted: %+v\n", oplogKeys)
return sp.oplogsFolder.DeleteObjects(oplogKeys)
}

func isOplogForSpecificNode(fileName, node string) bool {
return strings.Contains(fileName, node)
}
14 changes: 10 additions & 4 deletions internal/databases/mongo/models/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ const (
ArchiveTypeGap = "gap"
)

var (
ArchRegexp = regexp.MustCompile(`^(oplog|gap)_(?P<startTS>[0-9]+\.[0-9]+)_(?P<endTS>[0-9]+\.[0-9]+)\.(?P<Ext>[^$]+)$`)
)
//var (
// ArchRegexp = regexp.MustCompile(`^(oplog|gap)_shard0_(?P<startTS>[0-9]+\.[0-9]+)_(?P<endTS>[0-9]+\.[0-9]+)\.(?P<Ext>[^$]+)$`)
//)

// Archive defines oplog archive representation.
type Archive struct {
Expand Down Expand Up @@ -49,6 +49,9 @@ func (a Archive) In(ts Timestamp) bool {
func (a Archive) Filename() string {
return fmt.Sprintf("%s_%v%s%v.%s", a.Type, a.Start, ArchNameTSDelimiter, a.End, a.Ext)
}
func (a Archive) DBNodeSpecificFileName(node string) string {
return fmt.Sprintf("%s_%s_%v%s%v.%s", a.Type, node, a.Start, ArchNameTSDelimiter, a.End, a.Ext)
}

// Extension returns extension of archive file name.
func (a Archive) Extension() string {
Expand All @@ -57,7 +60,10 @@ func (a Archive) Extension() string {

// ArchFromFilename builds Arch from given path.
// TODO: support empty extension
func ArchFromFilename(path string) (Archive, error) {
func ArchFromFilename(path string, node string) (Archive, error) {
format := fmt.Sprintf(`^(oplog|gap)_%v_(?P<startTS>[0-9]+\.[0-9]+)_(?P<endTS>[0-9]+\.[0-9]+)\.(?P<Ext>[^$]+)$`, node)
ArchRegexp := regexp.MustCompile(format)

res := ArchRegexp.FindAllStringSubmatch(path, -1)
if len(res) != 1 {
return Archive{}, fmt.Errorf("can not parse oplog path: %s", path)
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/mongo/models/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestArchFromFilename(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ArchFromFilename(tt.args.path)
got, err := ArchFromFilename(tt.args.path, "shard0")
if (err != nil) != tt.wantErr {
t.Errorf("ArchFromFilename() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Loading
Loading