diff --git a/server/channels/app/app_iface.go b/server/channels/app/app_iface.go index 1b261582e280c..18c5319707324 100644 --- a/server/channels/app/app_iface.go +++ b/server/channels/app/app_iface.go @@ -471,6 +471,7 @@ type AppIface interface { Cluster() einterfaces.ClusterInterface CompareAndDeletePluginKey(pluginID string, key string, oldValue []byte) (bool, *model.AppError) CompareAndSetPluginKey(pluginID string, key string, oldValue, newValue []byte) (bool, *model.AppError) + CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) CompleteOAuth(c request.CTX, service string, body io.ReadCloser, teamID string, props map[string]string, tokenUser *model.User) (*model.User, *model.AppError) CompleteOnboarding(c request.CTX, request *model.CompleteOnboardingRequest) *model.AppError CompleteSwitchWithOAuth(c request.CTX, service string, userData io.Reader, email string, tokenUser *model.User) (*model.User, *model.AppError) @@ -1011,6 +1012,7 @@ type AppIface interface { SaveBrandImage(rctx request.CTX, imageData *multipart.FileHeader) *model.AppError SaveComplianceReport(rctx request.CTX, job *model.Compliance) (*model.Compliance, *model.AppError) SaveReactionForPost(c request.CTX, reaction *model.Reaction) (*model.Reaction, *model.AppError) + SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError SaveSharedChannel(c request.CTX, sc *model.SharedChannel) (*model.SharedChannel, error) SaveSharedChannelRemote(remote *model.SharedChannelRemote) (*model.SharedChannelRemote, error) SaveUserTermsOfService(userID, termsOfServiceId string, accepted bool) *model.AppError @@ -1047,6 +1049,7 @@ type AppIface interface { SendPasswordReset(email string, siteURL string) (bool, *model.AppError) SendPaymentFailedEmail(failedPayment *model.FailedPayment) *model.AppError SendPersistentNotifications() error + SendReportToUser(userID string, filename string) *model.AppError SendTestPushNotification(deviceID string) string SendUpgradeConfirmationEmail(isYearly bool) *model.AppError ServeInterPluginRequest(w http.ResponseWriter, r *http.Request, sourcePluginId, destinationPluginId string) diff --git a/server/channels/app/opentracing/opentracing_layer.go b/server/channels/app/opentracing/opentracing_layer.go index 1dcec6f0e617b..42bad4811cd7f 100644 --- a/server/channels/app/opentracing/opentracing_layer.go +++ b/server/channels/app/opentracing/opentracing_layer.go @@ -1669,6 +1669,28 @@ func (a *OpenTracingAppLayer) CompareAndSetPluginKey(pluginID string, key string return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CompileReportChunks") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0, resultVar1 := a.app.CompileReportChunks(format, prefix, numberOfChunks, headers) + + if resultVar1 != nil { + span.LogFields(spanlog.Error(resultVar1)) + ext.Error.Set(span, true) + } + + return resultVar0, resultVar1 +} + func (a *OpenTracingAppLayer) CompleteOAuth(c request.CTX, service string, body io.ReadCloser, teamID string, props map[string]string, tokenUser *model.User) (*model.User, *model.AppError) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CompleteOAuth") @@ -14826,6 +14848,28 @@ func (a *OpenTracingAppLayer) SaveReactionForPost(c request.CTX, reaction *model return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SaveReportChunk") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0 := a.app.SaveReportChunk(format, prefix, count, reportData) + + if resultVar0 != nil { + span.LogFields(spanlog.Error(resultVar0)) + ext.Error.Set(span, true) + } + + return resultVar0 +} + func (a *OpenTracingAppLayer) SaveSharedChannel(c request.CTX, sc *model.SharedChannel) (*model.SharedChannel, error) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SaveSharedChannel") @@ -15669,6 +15713,28 @@ func (a *OpenTracingAppLayer) SendPersistentNotifications() error { return resultVar0 } +func (a *OpenTracingAppLayer) SendReportToUser(userID string, filename string) *model.AppError { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SendReportToUser") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0 := a.app.SendReportToUser(userID, filename) + + if resultVar0 != nil { + span.LogFields(spanlog.Error(resultVar0)) + ext.Error.Set(span, true) + } + + return resultVar0 +} + func (a *OpenTracingAppLayer) SendSubscriptionHistoryEvent(userID string) (*model.SubscriptionHistory, error) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SendSubscriptionHistoryEvent") diff --git a/server/channels/app/report.go b/server/channels/app/report.go new file mode 100644 index 0000000000000..a63811eee65bb --- /dev/null +++ b/server/channels/app/report.go @@ -0,0 +1,89 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package app + +import ( + "bytes" + "encoding/csv" + "fmt" + "net/http" + + "github.com/mattermost/mattermost/server/public/model" +) + +func (a *App) SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError { + switch format { + case "csv": + return a.saveCSVChunk(prefix, count, reportData) + } + return model.NewAppError("SaveReportChunk", "", nil, "unsupported report format", http.StatusInternalServerError) +} + +func (a *App) saveCSVChunk(prefix string, count int, reportData []model.ReportableObject) *model.AppError { + var buf bytes.Buffer + w := csv.NewWriter(&buf) + + for _, report := range reportData { + err := w.Write(report.ToReport()) + if err != nil { + return model.NewAppError("saveCSVChunk", "", nil, "failed to write report data to CSV", http.StatusInternalServerError) + } + } + + w.Flush() + _, appErr := a.WriteFile(&buf, makeFilename(prefix, count, "csv")) + if appErr != nil { + return appErr + } + + return nil +} + +func (a *App) CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) { + switch format { + case "csv": + return a.compileCSVChunks(prefix, numberOfChunks, headers) + } + return "", model.NewAppError("CompileReportChunks", "", nil, "unsupported report format", http.StatusInternalServerError) +} + +func (a *App) compileCSVChunks(prefix string, numberOfChunks int, headers []string) (string, *model.AppError) { + filename := fmt.Sprintf("batch_report_%s.csv", prefix) + + var headerBuf bytes.Buffer + w := csv.NewWriter(&headerBuf) + err := w.Write(headers) + if err != nil { + return "", model.NewAppError("compileCSVChunks", "", nil, "failed to write headers", http.StatusInternalServerError) + } + w.Flush() + _, appErr := a.WriteFile(&headerBuf, filename) + if appErr != nil { + return "", appErr + } + + for i := 0; i < numberOfChunks; i++ { + chunkFilename := makeFilename(prefix, i, "csv") + chunk, err := a.ReadFile(chunkFilename) + if err != nil { + return "", err + } + if _, err = a.AppendFile(bytes.NewReader(chunk), filename); err != nil { + return "", err + } + if err = a.RemoveFile(chunkFilename); err != nil { + return "", err + } + } + + return filename, nil +} + +func (a *App) SendReportToUser(userID string, filename string) *model.AppError { + return nil +} + +func makeFilename(prefix string, count int, extension string) string { + return fmt.Sprintf("batch_report_%s__%d.%s", prefix, count, extension) +} diff --git a/server/channels/app/server.go b/server/channels/app/server.go index 4412c328b9c9a..8335b61ed8244 100644 --- a/server/channels/app/server.go +++ b/server/channels/app/server.go @@ -44,6 +44,7 @@ import ( "github.com/mattermost/mattermost/server/v8/channels/jobs/expirynotify" "github.com/mattermost/mattermost/server/v8/channels/jobs/export_delete" "github.com/mattermost/mattermost/server/v8/channels/jobs/export_process" + "github.com/mattermost/mattermost/server/v8/channels/jobs/export_users_to_csv" "github.com/mattermost/mattermost/server/v8/channels/jobs/extract_content" "github.com/mattermost/mattermost/server/v8/channels/jobs/hosted_purchase_screening" "github.com/mattermost/mattermost/server/v8/channels/jobs/import_delete" @@ -1674,6 +1675,12 @@ func (s *Server) initJobs() { refresh_post_stats.MakeScheduler(s.Jobs, *s.platform.Config().SqlSettings.DriverName), ) + s.Jobs.RegisterJobType( + model.JobTypeExportUsersToCSV, + export_users_to_csv.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))), + nil, + ) + s.platform.Jobs = s.Jobs } diff --git a/server/channels/jobs/batch_migration_worker.go b/server/channels/jobs/batch_migration_worker.go index 072c8e3174bf7..efe28c91ce086 100644 --- a/server/channels/jobs/batch_migration_worker.go +++ b/server/channels/jobs/batch_migration_worker.go @@ -5,7 +5,6 @@ package jobs import ( "net/http" - "sync/atomic" "time" "github.com/mattermost/mattermost/server/public/model" @@ -29,83 +28,66 @@ type BatchMigrationWorkerAppIFace interface { // server in order to retry a failed migration job. Refactoring the job infrastructure is left as // a future exercise. type BatchMigrationWorker struct { - jobServer *JobServer - logger mlog.LoggerIFace - store store.Store - app BatchMigrationWorkerAppIFace - - stop chan struct{} - stopped chan bool - closed atomic.Bool - jobs chan model.Job - - migrationKey string - timeBetweenBatches time.Duration - doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error) + BatchWorker + app BatchMigrationWorkerAppIFace + migrationKey string + doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error) } // MakeBatchMigrationWorker creates a worker to process the given migration batch function. -func MakeBatchMigrationWorker(jobServer *JobServer, store store.Store, app BatchMigrationWorkerAppIFace, migrationKey string, timeBetweenBatches time.Duration, doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error)) model.Worker { +func MakeBatchMigrationWorker( + jobServer *JobServer, + store store.Store, + app BatchMigrationWorkerAppIFace, + migrationKey string, + timeBetweenBatches time.Duration, + doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error), +) model.Worker { worker := &BatchMigrationWorker{ + app: app, + migrationKey: migrationKey, + doMigrationBatch: doMigrationBatch, + } + worker.BatchWorker = BatchWorker{ jobServer: jobServer, logger: jobServer.Logger().With(mlog.String("worker_name", migrationKey)), store: store, - app: app, stop: make(chan struct{}), stopped: make(chan bool, 1), jobs: make(chan model.Job), - migrationKey: migrationKey, timeBetweenBatches: timeBetweenBatches, - doMigrationBatch: doMigrationBatch, + doBatch: worker.doBatch, } return worker } -// Run starts the worker dedicated to the unique migration batch job it will be given to process. -func (worker *BatchMigrationWorker) Run() { - worker.logger.Debug("Worker started") - // We have to re-assign the stop channel again, because - // it might happen that the job was restarted due to a config change. - if worker.closed.CompareAndSwap(true, false) { - worker.stop = make(chan struct{}) - } - - defer func() { - worker.logger.Debug("Worker finished") - worker.stopped <- true - }() - - for { - select { - case <-worker.stop: - worker.logger.Debug("Worker received stop signal") - return - case job := <-worker.jobs: - worker.DoJob(&job) - } +func (worker *BatchMigrationWorker) doBatch(rctx *request.Context, job *model.Job) bool { + // Ensure the cluster remains in sync, otherwise we restart the job to + // ensure a complete migration. Technically, the cluster could go out of + // sync briefly within a batch, but we accept that risk. + if !worker.checkIsClusterInSync(rctx) { + worker.logger.Warn("Worker: Resetting job") + worker.resetJob(worker.logger, job) + return true } -} -// Stop interrupts the worker even if the migration has not yet completed. -func (worker *BatchMigrationWorker) Stop() { - // Set to close, and if already closed before, then return. - if !worker.closed.CompareAndSwap(false, true) { - return + nextData, done, err := worker.doMigrationBatch(job.Data, worker.store) + if err != nil { + worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + return true + } else if done { + worker.logger.Info("Worker: Job is complete") + worker.setJobSuccess(worker.logger, job) + worker.markAsComplete() + return true } - worker.logger.Debug("Worker stopping") - close(worker.stop) - <-worker.stopped -} - -// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. -func (worker *BatchMigrationWorker) JobChannel() chan<- model.Job { - return worker.jobs -} + job.Data = nextData -// IsEnabled is always true for batch migrations. -func (worker *BatchMigrationWorker) IsEnabled(_ *model.Config) bool { - return true + // Migrations currently don't support reporting meaningful progress. + worker.jobServer.SetJobProgress(job, 0) + return false } // checkIsClusterInSync returns true if all nodes in the cluster are running the same version, @@ -128,108 +110,6 @@ func (worker *BatchMigrationWorker) checkIsClusterInSync(rctx request.CTX) bool return true } -// DoJob executes the job picked up through the job channel. -// -// Note that this is a lot of distracting machinery here to claim the job, then double check the -// status, and keep the status up to date in line with job infrastrcuture semantics. Unless an -// error occurs, this worker should hold onto the job until its completed. -func (worker *BatchMigrationWorker) DoJob(job *model.Job) { - logger := worker.logger.With(mlog.Any("job", job)) - logger.Debug("Worker received a new candidate job.") - defer worker.jobServer.HandleJobPanic(logger, job) - - if claimed, err := worker.jobServer.ClaimJob(job); err != nil { - logger.Warn("Worker experienced an error while trying to claim job", mlog.Err(err)) - return - } else if !claimed { - return - } - - c := request.EmptyContext(logger) - var appErr *model.AppError - - // We get the job again because ClaimJob changes the job status. - job, appErr = worker.jobServer.GetJob(c, job.Id) - if appErr != nil { - worker.logger.Error("Worker: job execution error", mlog.Err(appErr)) - worker.setJobError(logger, job, appErr) - return - } - - if job.Data == nil { - job.Data = make(model.StringMap) - } - - for { - select { - case <-worker.stop: - logger.Info("Worker: Migration has been canceled via Worker Stop. Setting the job back to pending.") - if err := worker.jobServer.SetJobPending(job); err != nil { - worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err)) - } - return - case <-time.After(worker.timeBetweenBatches): - // Ensure the cluster remains in sync, otherwise we restart the job to - // ensure a complete migration. Technically, the cluster could go out of - // sync briefly within a batch, but we accept that risk. - if !worker.checkIsClusterInSync(c) { - worker.logger.Warn("Worker: Resetting job") - worker.resetJob(logger, job) - return - } - - nextData, done, err := worker.doMigrationBatch(job.Data, worker.store) - if err != nil { - worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) - worker.setJobError(logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) - return - } else if done { - logger.Info("Worker: Job is complete") - worker.setJobSuccess(logger, job) - worker.markAsComplete() - return - } - - job.Data = nextData - - // Migrations currently don't support reporting meaningful progress. - worker.jobServer.SetJobProgress(job, 0) - } - } -} - -// resetJob erases the data tracking the next batch to execute and returns the job status to -// pending to allow the job infrastructure to requeue it. -func (worker *BatchMigrationWorker) resetJob(logger mlog.LoggerIFace, job *model.Job) { - job.Data = nil - job.Progress = 0 - job.Status = model.JobStatusPending - - if _, err := worker.store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil { - worker.logger.Error("Worker: Failed to reset job data. May resume instead of restarting.", mlog.Err(err)) - } -} - -// setJobSuccess records the job as successful. -func (worker *BatchMigrationWorker) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) { - if err := worker.jobServer.SetJobProgress(job, 100); err != nil { - logger.Error("Worker: Failed to update progress for job", mlog.Err(err)) - worker.setJobError(logger, job, err) - } - - if err := worker.jobServer.SetJobSuccess(job); err != nil { - logger.Error("Worker: Failed to set success for job", mlog.Err(err)) - worker.setJobError(logger, job, err) - } -} - -// setJobError puts the job into an error state, preventing the job from running again. -func (worker *BatchMigrationWorker) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) { - if err := worker.jobServer.SetJobError(job, appError); err != nil { - logger.Error("Worker: Failed to set job error", mlog.Err(err)) - } -} - // markAsComplete records a discrete migration key to prevent this job from ever running again. func (worker *BatchMigrationWorker) markAsComplete() { system := model.System{ diff --git a/server/channels/jobs/batch_report_worker.go b/server/channels/jobs/batch_report_worker.go new file mode 100644 index 0000000000000..c528ae299bf9d --- /dev/null +++ b/server/channels/jobs/batch_report_worker.go @@ -0,0 +1,147 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs + +import ( + "net/http" + "strconv" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/pkg/errors" +) + +type BatchReportWorkerAppIFace interface { + SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError + CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) + SendReportToUser(userID string, filename string) *model.AppError +} + +type BatchReportWorker[T BatchReportWorkerAppIFace] struct { + BatchWorker + app T + reportFormat string + headers []string + getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error) +} + +func MakeBatchReportWorker[T BatchReportWorkerAppIFace]( + jobServer *JobServer, + store store.Store, + app T, + timeBetweenBatches time.Duration, + reportFormat string, + headers []string, + getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error), +) model.Worker { + worker := &BatchReportWorker[T]{ + app: app, + reportFormat: reportFormat, + headers: headers, + getData: getData, + } + worker.BatchWorker = BatchWorker{ + jobServer: jobServer, + logger: jobServer.Logger(), + store: store, + stop: make(chan struct{}), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + timeBetweenBatches: timeBetweenBatches, + doBatch: worker.doBatch, + } + return worker +} + +func (worker *BatchReportWorker[T]) doBatch(rctx *request.Context, job *model.Job) bool { + reportData, nextData, done, err := worker.getData(job.Data, worker.app) + if err != nil { + // TODO getData error + worker.logger.Error("Worker: Failed to do report batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + return true + } else if done { + if err = worker.complete(job); err != nil { + // TODO complete error + worker.logger.Error("Worker: Failed to do report batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + } else { + worker.logger.Info("Worker: Job is complete") + worker.setJobSuccess(worker.logger, job) + } + + return true + } + + err = worker.saveData(job, reportData) + if err != nil { + // TODO saveData error + worker.logger.Error("Worker: Failed to do report batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + return true + } + + for k, v := range nextData { + job.Data[k] = v + } + + // TODO add progress? + worker.jobServer.SetJobProgress(job, 0) + return false +} + +func getFileCount(jobData model.StringMap) (int, error) { + if jobData["file_count"] != "" { + parsedFileCount, parseErr := strconv.Atoi(jobData["file_count"]) + if parseErr != nil { + return 0, errors.Wrap(parseErr, "failed to parse file_count") + } + return parsedFileCount, nil + } + + // Assume it hasn't been set + return 0, nil +} + +func (worker *BatchReportWorker[T]) saveData(job *model.Job, reportData []model.ReportableObject) error { + fileCount, err := getFileCount(job.Data) + if err != nil { + return err + } + + appErr := worker.app.SaveReportChunk(worker.reportFormat, job.Id, fileCount, reportData) + if appErr != nil { + return err + } + + fileCount++ + job.Data["file_count"] = strconv.Itoa(fileCount) + + return nil +} + +func (worker *BatchReportWorker[T]) complete(job *model.Job) error { + requestingUserId := job.Data["requesting_user_id"] + if requestingUserId == "" { + return errors.New("No user to send the report to") + } + fileCount, err := getFileCount(job.Data) + if err != nil { + return err + } + + compiledFilename, appErr := worker.app.CompileReportChunks(worker.reportFormat, job.Id, fileCount, worker.headers) + if appErr != nil { + return appErr + } + + if appErr = worker.app.SendReportToUser(requestingUserId, compiledFilename); appErr != nil { + return appErr + } + + return nil +} diff --git a/server/channels/jobs/batch_worker.go b/server/channels/jobs/batch_worker.go new file mode 100644 index 0000000000000..f30031c7e13e7 --- /dev/null +++ b/server/channels/jobs/batch_worker.go @@ -0,0 +1,176 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs + +import ( + "sync/atomic" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/store" +) + +type BatchWorker struct { + jobServer *JobServer + logger mlog.LoggerIFace + store store.Store + + stop chan struct{} + stopped chan bool + closed atomic.Bool + jobs chan model.Job + + timeBetweenBatches time.Duration + doBatch func(rctx *request.Context, job *model.Job) bool +} + +// MakeBatchWorker creates a worker to process the given batch function. +func MakeBatchWorker( + jobServer *JobServer, + store store.Store, + timeBetweenBatches time.Duration, + doBatch func(rctx *request.Context, job *model.Job) bool, + onComplete func(), +) model.Worker { + worker := &BatchWorker{ + jobServer: jobServer, + logger: jobServer.Logger(), + store: store, + stop: make(chan struct{}), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + timeBetweenBatches: timeBetweenBatches, + doBatch: doBatch, + } + return worker +} + +// Run starts the worker dedicated to the unique migration batch job it will be given to process. +func (worker *BatchWorker) Run() { + worker.logger.Debug("Worker started") + // We have to re-assign the stop channel again, because + // it might happen that the job was restarted due to a config change. + if worker.closed.CompareAndSwap(true, false) { + worker.stop = make(chan struct{}) + } + + defer func() { + worker.logger.Debug("Worker finished") + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + worker.logger.Debug("Worker received stop signal") + return + case job := <-worker.jobs: + worker.DoJob(&job) + } + } +} + +// Stop interrupts the worker even if the migration has not yet completed. +func (worker *BatchWorker) Stop() { + // Set to close, and if already closed before, then return. + if !worker.closed.CompareAndSwap(false, true) { + return + } + + worker.logger.Debug("Worker stopping") + close(worker.stop) + <-worker.stopped +} + +// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. +func (worker *BatchWorker) JobChannel() chan<- model.Job { + return worker.jobs +} + +// IsEnabled is always true for batches. +func (worker *BatchWorker) IsEnabled(_ *model.Config) bool { + return true +} + +// DoJob executes the job picked up through the job channel. +// +// Note that this is a lot of distracting machinery here to claim the job, then double check the +// status, and keep the status up to date in line with job infrastrcuture semantics. Unless an +// error occurs, this worker should hold onto the job until its completed. +func (worker *BatchWorker) DoJob(job *model.Job) { + logger := worker.logger.With(mlog.Any("job", job)) + logger.Debug("Worker received a new candidate job.") + defer worker.jobServer.HandleJobPanic(logger, job) + + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + logger.Warn("Worker experienced an error while trying to claim job", mlog.Err(err)) + return + } else if !claimed { + return + } + + c := request.EmptyContext(logger) + var appErr *model.AppError + + // We get the job again because ClaimJob changes the job status. + job, appErr = worker.jobServer.GetJob(c, job.Id) + if appErr != nil { + worker.logger.Error("Worker: job execution error", mlog.Err(appErr)) + worker.setJobError(logger, job, appErr) + return + } + + if job.Data == nil { + job.Data = make(model.StringMap) + } + + for { + select { + case <-worker.stop: + logger.Info("Worker: Batch has been canceled via Worker Stop. Setting the job back to pending.") + if err := worker.jobServer.SetJobPending(job); err != nil { + worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err)) + } + return + case <-time.After(worker.timeBetweenBatches): + if stop := worker.doBatch(c, job); stop { + return + } + } + } +} + +// resetJob erases the data tracking the next batch to execute and returns the job status to +// pending to allow the job infrastructure to requeue it. +func (worker *BatchWorker) resetJob(logger mlog.LoggerIFace, job *model.Job) { + job.Data = nil + job.Progress = 0 + job.Status = model.JobStatusPending + + if _, err := worker.store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil { + worker.logger.Error("Worker: Failed to reset job data. May resume instead of restarting.", mlog.Err(err)) + } +} + +// setJobSuccess records the job as successful. +func (worker *BatchWorker) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) { + if err := worker.jobServer.SetJobProgress(job, 100); err != nil { + logger.Error("Worker: Failed to update progress for job", mlog.Err(err)) + worker.setJobError(logger, job, err) + } + + if err := worker.jobServer.SetJobSuccess(job); err != nil { + logger.Error("Worker: Failed to set success for job", mlog.Err(err)) + worker.setJobError(logger, job, err) + } +} + +// setJobError puts the job into an error state, preventing the job from running again. +func (worker *BatchWorker) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) { + if err := worker.jobServer.SetJobError(job, appError); err != nil { + logger.Error("Worker: Failed to set job error", mlog.Err(err)) + } +} diff --git a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go new file mode 100644 index 0000000000000..307a43ac1f600 --- /dev/null +++ b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go @@ -0,0 +1,84 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package export_users_to_csv + +import ( + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/v8/channels/jobs" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/pkg/errors" +) + +const ( + timeBetweenBatches = 1 * time.Second +) + +type ExportUsersToCSVAppIFace interface { + jobs.BatchReportWorkerAppIFace + GetUsersForReporting(filter *model.UserReportOptions) ([]*model.UserReport, *model.AppError) +} + +// MakeWorker creates a batch migration worker to delete empty drafts. +func MakeWorker(jobServer *jobs.JobServer, store store.Store, app ExportUsersToCSVAppIFace) model.Worker { + return jobs.MakeBatchReportWorker( + jobServer, + store, + app, + timeBetweenBatches, + "csv", + model.UserReport.GetHeaders(model.UserReport{}), // TODO no? + getData, + ) +} + +// parseJobMetadata parses the opaque job metadata to return the information needed to decide which +// batch to process next. +func parseJobMetadata(data model.StringMap) (*model.UserReportOptions, error) { + options := model.UserReportOptionsAPI{ + UserReportOptionsWithoutDateRange: model.UserReportOptionsWithoutDateRange{ + SortColumn: "Username", + PageSize: 100, + LastSortColumnValue: data["last_column_value"], + LastUserId: data["last_user_id"], + }, + DateRange: data["date_range"], + } + + return options.ToBaseOptions(time.Now()), nil +} + +// makeJobMetadata encodes the information needed to decide which batch to process next back into +// the opaque job metadata. +func makeJobMetadata(lastColumnValue string, userID string) model.StringMap { + data := make(model.StringMap) + data["last_column_value"] = lastColumnValue + data["last_user_id"] = userID + + return data +} + +func getData(jobData model.StringMap, app ExportUsersToCSVAppIFace) ([]model.ReportableObject, model.StringMap, bool, error) { + filter, err := parseJobMetadata(jobData) + if err != nil { + return nil, nil, false, errors.Wrap(err, "failed to parse job metadata") + } + + users, appErr := app.GetUsersForReporting(filter) + if appErr != nil { + return nil, nil, false, errors.Wrapf(err, "failed to get the next batch (column_value=%v, user_id=%v)", filter.LastSortColumnValue, filter.LastUserId) + } + + if len(users) == 0 { + return nil, nil, true, nil + } + + reportableObjects := []model.ReportableObject{} + for i := 0; i < len(users); i++ { + reportableObjects = append(reportableObjects, users[i]) + } + + return reportableObjects, makeJobMetadata(users[len(users)-1].Username, users[len(users)-1].Id), false, nil +} diff --git a/server/public/model/job.go b/server/public/model/job.go index a592e7b61dae3..9c91cd8d80880 100644 --- a/server/public/model/job.go +++ b/server/public/model/job.go @@ -38,6 +38,7 @@ const ( JobTypeCleanupDesktopTokens = "cleanup_desktop_tokens" JobTypeDeleteEmptyDraftsMigration = "delete_empty_drafts_migration" JobTypeRefreshPostStats = "refresh_post_stats" + JobTypeExportUsersToCSV = "export_users_to_csv" JobStatusPending = "pending" JobStatusInProgress = "in_progress" diff --git a/server/public/model/report.go b/server/public/model/report.go new file mode 100644 index 0000000000000..1e57bc610863a --- /dev/null +++ b/server/public/model/report.go @@ -0,0 +1,9 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package model + +type ReportableObject interface { + GetHeaders() []string + ToReport() []string +} diff --git a/server/public/model/user.go b/server/public/model/user.go index 956914abae9e1..bc260a28960d2 100644 --- a/server/public/model/user.go +++ b/server/public/model/user.go @@ -10,6 +10,7 @@ import ( "net/http" "regexp" "sort" + "strconv" "strings" "time" "unicode/utf8" @@ -1039,6 +1040,59 @@ type UserReport struct { UserPostStats } +func (u UserReport) GetHeaders() []string { + return []string{ + "Id", + "Username", + "Email", + "CreateAt", + "Name", + "Roles", + "LastLogin", + "LastStatusAt", + "LastPostDate", + "DaysActive", + "TotalPosts", + } +} + +func (u *UserReport) ToReport() []string { + lastStatusAt := "" + if u.LastStatusAt != nil { + lastStatusAt = time.UnixMilli(*u.LastStatusAt).String() + } + lastPostDate := "" + if u.LastPostDate != nil { + lastStatusAt = time.UnixMilli(*u.LastPostDate).String() + } + daysActive := "" + if u.DaysActive != nil { + daysActive = strconv.Itoa(*u.DaysActive) + } + totalPosts := "" + if u.TotalPosts != nil { + totalPosts = strconv.Itoa(*u.TotalPosts) + } + lastLogin := "" + if u.LastLogin > 0 { + lastLogin = time.UnixMilli(u.LastLogin).String() + } + + return []string{ + u.Id, + u.Username, + u.Email, + time.UnixMilli(u.CreateAt).String(), + u.DisplayName, + u.Roles, + lastLogin, + lastStatusAt, + lastPostDate, + daysActive, + totalPosts, + } +} + type UserReportOptionsWithoutDateRange struct { SortColumn string SortDesc bool