From 161a50c013c728f3fa23c7cb1e3142ed04849e73 Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Thu, 30 Nov 2023 14:21:58 -0500 Subject: [PATCH 1/8] Split out migration logic and create generic BatchWorker --- .../channels/jobs/batch_migration_worker.go | 197 ++++-------------- server/channels/jobs/batch_worker.go | 179 ++++++++++++++++ 2 files changed, 217 insertions(+), 159 deletions(-) create mode 100644 server/channels/jobs/batch_worker.go diff --git a/server/channels/jobs/batch_migration_worker.go b/server/channels/jobs/batch_migration_worker.go index 072c8e3174bf7..be930bc1001da 100644 --- a/server/channels/jobs/batch_migration_worker.go +++ b/server/channels/jobs/batch_migration_worker.go @@ -5,7 +5,6 @@ package jobs import ( "net/http" - "sync/atomic" "time" "github.com/mattermost/mattermost/server/public/model" @@ -29,24 +28,25 @@ type BatchMigrationWorkerAppIFace interface { // server in order to retry a failed migration job. Refactoring the job infrastructure is left as // a future exercise. type BatchMigrationWorker struct { - jobServer *JobServer - logger mlog.LoggerIFace - store store.Store - app BatchMigrationWorkerAppIFace - - stop chan struct{} - stopped chan bool - closed atomic.Bool - jobs chan model.Job - - migrationKey string - timeBetweenBatches time.Duration - doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error) + BatchWorker[BatchMigrationWorkerAppIFace] + migrationKey string + doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error) } // MakeBatchMigrationWorker creates a worker to process the given migration batch function. -func MakeBatchMigrationWorker(jobServer *JobServer, store store.Store, app BatchMigrationWorkerAppIFace, migrationKey string, timeBetweenBatches time.Duration, doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error)) model.Worker { +func MakeBatchMigrationWorker( + jobServer *JobServer, + store store.Store, + app BatchMigrationWorkerAppIFace, + migrationKey string, + timeBetweenBatches time.Duration, + doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error), +) model.Worker { worker := &BatchMigrationWorker{ + migrationKey: migrationKey, + doMigrationBatch: doMigrationBatch, + } + worker.BatchWorker = BatchWorker[BatchMigrationWorkerAppIFace]{ jobServer: jobServer, logger: jobServer.Logger().With(mlog.String("worker_name", migrationKey)), store: store, @@ -54,58 +54,39 @@ func MakeBatchMigrationWorker(jobServer *JobServer, store store.Store, app Batch stop: make(chan struct{}), stopped: make(chan bool, 1), jobs: make(chan model.Job), - migrationKey: migrationKey, timeBetweenBatches: timeBetweenBatches, - doMigrationBatch: doMigrationBatch, + doBatch: worker.doBatch, } return worker } -// Run starts the worker dedicated to the unique migration batch job it will be given to process. -func (worker *BatchMigrationWorker) Run() { - worker.logger.Debug("Worker started") - // We have to re-assign the stop channel again, because - // it might happen that the job was restarted due to a config change. - if worker.closed.CompareAndSwap(true, false) { - worker.stop = make(chan struct{}) - } - - defer func() { - worker.logger.Debug("Worker finished") - worker.stopped <- true - }() - - for { - select { - case <-worker.stop: - worker.logger.Debug("Worker received stop signal") - return - case job := <-worker.jobs: - worker.DoJob(&job) - } +func (worker *BatchMigrationWorker) doBatch(rctx *request.Context, job *model.Job, logger mlog.LoggerIFace, store store.Store, app BatchMigrationWorkerAppIFace) bool { + // Ensure the cluster remains in sync, otherwise we restart the job to + // ensure a complete migration. Technically, the cluster could go out of + // sync briefly within a batch, but we accept that risk. + if !worker.checkIsClusterInSync(rctx) { + worker.logger.Warn("Worker: Resetting job") + worker.resetJob(logger, job) + return true } -} -// Stop interrupts the worker even if the migration has not yet completed. -func (worker *BatchMigrationWorker) Stop() { - // Set to close, and if already closed before, then return. - if !worker.closed.CompareAndSwap(false, true) { - return + nextData, done, err := worker.doMigrationBatch(job.Data, worker.store) + if err != nil { + worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) + worker.setJobError(logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + return true + } else if done { + logger.Info("Worker: Job is complete") + worker.setJobSuccess(logger, job) + worker.markAsComplete() + return true } - worker.logger.Debug("Worker stopping") - close(worker.stop) - <-worker.stopped -} - -// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. -func (worker *BatchMigrationWorker) JobChannel() chan<- model.Job { - return worker.jobs -} + job.Data = nextData -// IsEnabled is always true for batch migrations. -func (worker *BatchMigrationWorker) IsEnabled(_ *model.Config) bool { - return true + // Migrations currently don't support reporting meaningful progress. + worker.jobServer.SetJobProgress(job, 0) + return false } // checkIsClusterInSync returns true if all nodes in the cluster are running the same version, @@ -128,108 +109,6 @@ func (worker *BatchMigrationWorker) checkIsClusterInSync(rctx request.CTX) bool return true } -// DoJob executes the job picked up through the job channel. -// -// Note that this is a lot of distracting machinery here to claim the job, then double check the -// status, and keep the status up to date in line with job infrastrcuture semantics. Unless an -// error occurs, this worker should hold onto the job until its completed. -func (worker *BatchMigrationWorker) DoJob(job *model.Job) { - logger := worker.logger.With(mlog.Any("job", job)) - logger.Debug("Worker received a new candidate job.") - defer worker.jobServer.HandleJobPanic(logger, job) - - if claimed, err := worker.jobServer.ClaimJob(job); err != nil { - logger.Warn("Worker experienced an error while trying to claim job", mlog.Err(err)) - return - } else if !claimed { - return - } - - c := request.EmptyContext(logger) - var appErr *model.AppError - - // We get the job again because ClaimJob changes the job status. - job, appErr = worker.jobServer.GetJob(c, job.Id) - if appErr != nil { - worker.logger.Error("Worker: job execution error", mlog.Err(appErr)) - worker.setJobError(logger, job, appErr) - return - } - - if job.Data == nil { - job.Data = make(model.StringMap) - } - - for { - select { - case <-worker.stop: - logger.Info("Worker: Migration has been canceled via Worker Stop. Setting the job back to pending.") - if err := worker.jobServer.SetJobPending(job); err != nil { - worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err)) - } - return - case <-time.After(worker.timeBetweenBatches): - // Ensure the cluster remains in sync, otherwise we restart the job to - // ensure a complete migration. Technically, the cluster could go out of - // sync briefly within a batch, but we accept that risk. - if !worker.checkIsClusterInSync(c) { - worker.logger.Warn("Worker: Resetting job") - worker.resetJob(logger, job) - return - } - - nextData, done, err := worker.doMigrationBatch(job.Data, worker.store) - if err != nil { - worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) - worker.setJobError(logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) - return - } else if done { - logger.Info("Worker: Job is complete") - worker.setJobSuccess(logger, job) - worker.markAsComplete() - return - } - - job.Data = nextData - - // Migrations currently don't support reporting meaningful progress. - worker.jobServer.SetJobProgress(job, 0) - } - } -} - -// resetJob erases the data tracking the next batch to execute and returns the job status to -// pending to allow the job infrastructure to requeue it. -func (worker *BatchMigrationWorker) resetJob(logger mlog.LoggerIFace, job *model.Job) { - job.Data = nil - job.Progress = 0 - job.Status = model.JobStatusPending - - if _, err := worker.store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil { - worker.logger.Error("Worker: Failed to reset job data. May resume instead of restarting.", mlog.Err(err)) - } -} - -// setJobSuccess records the job as successful. -func (worker *BatchMigrationWorker) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) { - if err := worker.jobServer.SetJobProgress(job, 100); err != nil { - logger.Error("Worker: Failed to update progress for job", mlog.Err(err)) - worker.setJobError(logger, job, err) - } - - if err := worker.jobServer.SetJobSuccess(job); err != nil { - logger.Error("Worker: Failed to set success for job", mlog.Err(err)) - worker.setJobError(logger, job, err) - } -} - -// setJobError puts the job into an error state, preventing the job from running again. -func (worker *BatchMigrationWorker) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) { - if err := worker.jobServer.SetJobError(job, appError); err != nil { - logger.Error("Worker: Failed to set job error", mlog.Err(err)) - } -} - // markAsComplete records a discrete migration key to prevent this job from ever running again. func (worker *BatchMigrationWorker) markAsComplete() { system := model.System{ diff --git a/server/channels/jobs/batch_worker.go b/server/channels/jobs/batch_worker.go new file mode 100644 index 0000000000000..14e5cad97965d --- /dev/null +++ b/server/channels/jobs/batch_worker.go @@ -0,0 +1,179 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs + +import ( + "sync/atomic" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/store" +) + +type BatchWorker[T interface{}] struct { + jobServer *JobServer + logger mlog.LoggerIFace + store store.Store + app T + + stop chan struct{} + stopped chan bool + closed atomic.Bool + jobs chan model.Job + + timeBetweenBatches time.Duration + doBatch func(rctx *request.Context, job *model.Job, logger mlog.LoggerIFace, store store.Store, app T) bool +} + +// MakeBatchWorker creates a worker to process the given batch function. +func MakeBatchWorker[T interface{}]( + jobServer *JobServer, + store store.Store, + app T, + timeBetweenBatches time.Duration, + doBatch func(rctx *request.Context, job *model.Job, logger mlog.LoggerIFace, store store.Store, app T) bool, + onComplete func(), +) model.Worker { + worker := &BatchWorker[T]{ + jobServer: jobServer, + logger: jobServer.Logger(), + store: store, + app: app, + stop: make(chan struct{}), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + timeBetweenBatches: timeBetweenBatches, + doBatch: doBatch, + } + return worker +} + +// Run starts the worker dedicated to the unique migration batch job it will be given to process. +func (worker *BatchWorker[T]) Run() { + worker.logger.Debug("Worker started") + // We have to re-assign the stop channel again, because + // it might happen that the job was restarted due to a config change. + if worker.closed.CompareAndSwap(true, false) { + worker.stop = make(chan struct{}) + } + + defer func() { + worker.logger.Debug("Worker finished") + worker.stopped <- true + }() + + for { + select { + case <-worker.stop: + worker.logger.Debug("Worker received stop signal") + return + case job := <-worker.jobs: + worker.DoJob(&job) + } + } +} + +// Stop interrupts the worker even if the migration has not yet completed. +func (worker *BatchWorker[T]) Stop() { + // Set to close, and if already closed before, then return. + if !worker.closed.CompareAndSwap(false, true) { + return + } + + worker.logger.Debug("Worker stopping") + close(worker.stop) + <-worker.stopped +} + +// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. +func (worker *BatchWorker[T]) JobChannel() chan<- model.Job { + return worker.jobs +} + +// IsEnabled is always true for batches. +func (worker *BatchWorker[T]) IsEnabled(_ *model.Config) bool { + return true +} + +// DoJob executes the job picked up through the job channel. +// +// Note that this is a lot of distracting machinery here to claim the job, then double check the +// status, and keep the status up to date in line with job infrastrcuture semantics. Unless an +// error occurs, this worker should hold onto the job until its completed. +func (worker *BatchWorker[T]) DoJob(job *model.Job) { + logger := worker.logger.With(mlog.Any("job", job)) + logger.Debug("Worker received a new candidate job.") + defer worker.jobServer.HandleJobPanic(logger, job) + + if claimed, err := worker.jobServer.ClaimJob(job); err != nil { + logger.Warn("Worker experienced an error while trying to claim job", mlog.Err(err)) + return + } else if !claimed { + return + } + + c := request.EmptyContext(logger) + var appErr *model.AppError + + // We get the job again because ClaimJob changes the job status. + job, appErr = worker.jobServer.GetJob(c, job.Id) + if appErr != nil { + worker.logger.Error("Worker: job execution error", mlog.Err(appErr)) + worker.setJobError(logger, job, appErr) + return + } + + if job.Data == nil { + job.Data = make(model.StringMap) + } + + for { + select { + case <-worker.stop: + logger.Info("Worker: Batch has been canceled via Worker Stop. Setting the job back to pending.") + if err := worker.jobServer.SetJobPending(job); err != nil { + worker.logger.Error("Worker: Failed to mark job as pending", mlog.Err(err)) + } + return + case <-time.After(worker.timeBetweenBatches): + if stop := worker.doBatch(c, job, logger, worker.store, worker.app); stop { + return + } + } + } +} + +// resetJob erases the data tracking the next batch to execute and returns the job status to +// pending to allow the job infrastructure to requeue it. +func (worker *BatchWorker[T]) resetJob(logger mlog.LoggerIFace, job *model.Job) { + job.Data = nil + job.Progress = 0 + job.Status = model.JobStatusPending + + if _, err := worker.store.Job().UpdateOptimistically(job, model.JobStatusInProgress); err != nil { + worker.logger.Error("Worker: Failed to reset job data. May resume instead of restarting.", mlog.Err(err)) + } +} + +// setJobSuccess records the job as successful. +func (worker *BatchWorker[T]) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) { + if err := worker.jobServer.SetJobProgress(job, 100); err != nil { + logger.Error("Worker: Failed to update progress for job", mlog.Err(err)) + worker.setJobError(logger, job, err) + } + + if err := worker.jobServer.SetJobSuccess(job); err != nil { + logger.Error("Worker: Failed to set success for job", mlog.Err(err)) + worker.setJobError(logger, job, err) + } +} + +// setJobError puts the job into an error state, preventing the job from running again. +func (worker *BatchWorker[T]) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) { + if err := worker.jobServer.SetJobError(job, appError); err != nil { + logger.Error("Worker: Failed to set job error", mlog.Err(err)) + } +} From 8cb535fae5583b51a3fab9633214df84e2c94dc4 Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Tue, 5 Dec 2023 09:33:50 -0500 Subject: [PATCH 2/8] WIP --- server/channels/app/app_iface.go | 3 + .../app/opentracing/opentracing_layer.go | 66 ++++++++ server/channels/app/report.go | 56 +++++++ .../channels/jobs/batch_migration_worker.go | 12 +- server/channels/jobs/batch_report_worker.go | 149 ++++++++++++++++++ server/channels/jobs/batch_worker.go | 6 +- .../export_users_to_csv.go | 64 ++++++++ 7 files changed, 347 insertions(+), 9 deletions(-) create mode 100644 server/channels/app/report.go create mode 100644 server/channels/jobs/batch_report_worker.go create mode 100644 server/channels/jobs/export_users_to_csv/export_users_to_csv.go diff --git a/server/channels/app/app_iface.go b/server/channels/app/app_iface.go index 7f105f732255f..0920a9d635f7e 100644 --- a/server/channels/app/app_iface.go +++ b/server/channels/app/app_iface.go @@ -471,6 +471,7 @@ type AppIface interface { Cluster() einterfaces.ClusterInterface CompareAndDeletePluginKey(pluginID string, key string, oldValue []byte) (bool, *model.AppError) CompareAndSetPluginKey(pluginID string, key string, oldValue, newValue []byte) (bool, *model.AppError) + CompileReportChunks(format string, filenames []string) (string, error) CompleteOAuth(c request.CTX, service string, body io.ReadCloser, teamID string, props map[string]string, tokenUser *model.User) (*model.User, *model.AppError) CompleteOnboarding(c request.CTX, request *model.CompleteOnboardingRequest) *model.AppError CompleteSwitchWithOAuth(c request.CTX, service string, userData io.Reader, email string, tokenUser *model.User) (*model.User, *model.AppError) @@ -1010,6 +1011,7 @@ type AppIface interface { SaveBrandImage(rctx request.CTX, imageData *multipart.FileHeader) *model.AppError SaveComplianceReport(rctx request.CTX, job *model.Compliance) (*model.Compliance, *model.AppError) SaveReactionForPost(c request.CTX, reaction *model.Reaction) (*model.Reaction, *model.AppError) + SaveReportChunk(format string, filename string, reportData []interface{}) error SaveSharedChannel(c request.CTX, sc *model.SharedChannel) (*model.SharedChannel, error) SaveSharedChannelRemote(remote *model.SharedChannelRemote) (*model.SharedChannelRemote, error) SaveUserTermsOfService(userID, termsOfServiceId string, accepted bool) *model.AppError @@ -1046,6 +1048,7 @@ type AppIface interface { SendPasswordReset(email string, siteURL string) (bool, *model.AppError) SendPaymentFailedEmail(failedPayment *model.FailedPayment) *model.AppError SendPersistentNotifications() error + SendReportToUser(userID string, filename string) error SendTestPushNotification(deviceID string) string SendUpgradeConfirmationEmail(isYearly bool) *model.AppError ServeInterPluginRequest(w http.ResponseWriter, r *http.Request, sourcePluginId, destinationPluginId string) diff --git a/server/channels/app/opentracing/opentracing_layer.go b/server/channels/app/opentracing/opentracing_layer.go index 44908ad5bb275..2079e697d1bbc 100644 --- a/server/channels/app/opentracing/opentracing_layer.go +++ b/server/channels/app/opentracing/opentracing_layer.go @@ -1669,6 +1669,28 @@ func (a *OpenTracingAppLayer) CompareAndSetPluginKey(pluginID string, key string return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) CompileReportChunks(format string, filenames []string) (string, error) { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CompileReportChunks") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0, resultVar1 := a.app.CompileReportChunks(format, filenames) + + if resultVar1 != nil { + span.LogFields(spanlog.Error(resultVar1)) + ext.Error.Set(span, true) + } + + return resultVar0, resultVar1 +} + func (a *OpenTracingAppLayer) CompleteOAuth(c request.CTX, service string, body io.ReadCloser, teamID string, props map[string]string, tokenUser *model.User) (*model.User, *model.AppError) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CompleteOAuth") @@ -14804,6 +14826,28 @@ func (a *OpenTracingAppLayer) SaveReactionForPost(c request.CTX, reaction *model return resultVar0, resultVar1 } +func (a *OpenTracingAppLayer) SaveReportChunk(format string, filename string, reportData []interface{}) error { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SaveReportChunk") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0 := a.app.SaveReportChunk(format, filename, reportData) + + if resultVar0 != nil { + span.LogFields(spanlog.Error(resultVar0)) + ext.Error.Set(span, true) + } + + return resultVar0 +} + func (a *OpenTracingAppLayer) SaveSharedChannel(c request.CTX, sc *model.SharedChannel) (*model.SharedChannel, error) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SaveSharedChannel") @@ -15647,6 +15691,28 @@ func (a *OpenTracingAppLayer) SendPersistentNotifications() error { return resultVar0 } +func (a *OpenTracingAppLayer) SendReportToUser(userID string, filename string) error { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SendReportToUser") + + a.ctx = newCtx + a.app.Srv().Store().SetContext(newCtx) + defer func() { + a.app.Srv().Store().SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0 := a.app.SendReportToUser(userID, filename) + + if resultVar0 != nil { + span.LogFields(spanlog.Error(resultVar0)) + ext.Error.Set(span, true) + } + + return resultVar0 +} + func (a *OpenTracingAppLayer) SendSubscriptionHistoryEvent(userID string) (*model.SubscriptionHistory, error) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SendSubscriptionHistoryEvent") diff --git a/server/channels/app/report.go b/server/channels/app/report.go new file mode 100644 index 0000000000000..84cd6ad40825b --- /dev/null +++ b/server/channels/app/report.go @@ -0,0 +1,56 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package app + +import ( + "bytes" + "encoding/csv" + "errors" + "net/http" + + "github.com/mattermost/mattermost/server/public/model" +) + +func (a *App) SaveReportChunk(format string, filename string, reportData []interface{}) *model.AppError { + switch format { + case "csv": + return a.saveCSVChunk(filename, reportData) + } + return model.NewAppError("SaveReportChunk", "", nil, "unsupported report format", http.StatusInternalServerError) +} + +func (a *App) saveCSVChunk(filename string, reportData []interface{}) *model.AppError { + var buf bytes.Buffer + w := csv.NewWriter(&buf) + + // TODO: Fill this array with report data + records := [][]string{} + + err := w.WriteAll(records) + if err != nil { + return model.NewAppError("saveCSVChunk", "", nil, "failed to write report data to CSV", http.StatusInternalServerError) + } + _, appErr := a.WriteFile(&buf, filename) + if appErr != nil { + return appErr + } + + return nil +} + +func (a *App) CompileReportChunks(format string, filenames []string) (string, error) { + switch format { + case "csv": + return a.compileCSVChunks(filenames) + } + return "", errors.New("unsupported report format") +} + +func (a *App) compileCSVChunks(filenames []string) (string, error) { + return "", nil +} + +func (a *App) SendReportToUser(userID string, filename string) error { + return nil +} diff --git a/server/channels/jobs/batch_migration_worker.go b/server/channels/jobs/batch_migration_worker.go index be930bc1001da..4146f4dd99d2a 100644 --- a/server/channels/jobs/batch_migration_worker.go +++ b/server/channels/jobs/batch_migration_worker.go @@ -24,7 +24,7 @@ type BatchMigrationWorkerAppIFace interface { // resets the migration if the cluster version diverges after starting. // // In principle, the job infrastructure is overkill for this kind of work, as there's a worker -// created per migration. There's also complication with edge cases, like having to restart the +// created per migration. There's alsaswfvsdo complication with edge cases, like having to restart the // server in order to retry a failed migration job. Refactoring the job infrastructure is left as // a future exercise. type BatchMigrationWorker struct { @@ -60,24 +60,24 @@ func MakeBatchMigrationWorker( return worker } -func (worker *BatchMigrationWorker) doBatch(rctx *request.Context, job *model.Job, logger mlog.LoggerIFace, store store.Store, app BatchMigrationWorkerAppIFace) bool { +func (worker *BatchMigrationWorker) doBatch(rctx *request.Context, job *model.Job) bool { // Ensure the cluster remains in sync, otherwise we restart the job to // ensure a complete migration. Technically, the cluster could go out of // sync briefly within a batch, but we accept that risk. if !worker.checkIsClusterInSync(rctx) { worker.logger.Warn("Worker: Resetting job") - worker.resetJob(logger, job) + worker.resetJob(worker.logger, job) return true } nextData, done, err := worker.doMigrationBatch(job.Data, worker.store) if err != nil { worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) - worker.setJobError(logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) return true } else if done { - logger.Info("Worker: Job is complete") - worker.setJobSuccess(logger, job) + worker.logger.Info("Worker: Job is complete") + worker.setJobSuccess(worker.logger, job) worker.markAsComplete() return true } diff --git a/server/channels/jobs/batch_report_worker.go b/server/channels/jobs/batch_report_worker.go new file mode 100644 index 0000000000000..503e6903056c1 --- /dev/null +++ b/server/channels/jobs/batch_report_worker.go @@ -0,0 +1,149 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package jobs + +import ( + "fmt" + "net/http" + "strconv" + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/public/shared/mlog" + "github.com/mattermost/mattermost/server/public/shared/request" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/pkg/errors" +) + +type BatchReportWorkerAppIFace interface { + SaveReportChunk(format string, filename string, reportData []interface{}) error + CompileReportChunks(format string, filenames []string) (string, error) + SendReportToUser(userID string, filename string) error +} + +type BatchReportWorker struct { + BatchWorker[BatchReportWorkerAppIFace] + reportFormat string + getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]interface{}, model.StringMap, bool, error) +} + +func MakeBatchReportWorker( + jobServer *JobServer, + store store.Store, + app BatchReportWorkerAppIFace, + timeBetweenBatches time.Duration, + reportFormat string, + getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]interface{}, model.StringMap, bool, error), +) model.Worker { + worker := &BatchReportWorker{ + reportFormat: reportFormat, + getData: getData, + } + worker.BatchWorker = BatchWorker[BatchReportWorkerAppIFace]{ + jobServer: jobServer, + logger: jobServer.Logger(), + store: store, + app: app, + stop: make(chan struct{}), + stopped: make(chan bool, 1), + jobs: make(chan model.Job), + timeBetweenBatches: timeBetweenBatches, + doBatch: worker.doBatch, + } + return worker +} + +func (worker *BatchReportWorker) doBatch(rctx *request.Context, job *model.Job) bool { + reportData, nextData, done, err := worker.getData(job.Data, worker.app) + if err != nil { + // TODO getData error + worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + return true + } else if done { + if err = worker.complete(job); err != nil { + // TODO complete error + worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + } else { + worker.logger.Info("Worker: Job is complete") + worker.setJobSuccess(worker.logger, job) + } + + return true + } + + err = worker.saveData(job, reportData) + if err != nil { + // TODO saveData error + worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + return true + } + + for k, v := range nextData { + job.Data[k] = v + } + + // TODO add progress? + worker.jobServer.SetJobProgress(job, 0) + return false +} + +func makeFilename(jobId string, fileCounter int) string { + return fmt.Sprintf("%s__%d", jobId, fileCounter) +} + +func getFileCount(jobData model.StringMap) (int, error) { + if jobData["file_count"] != "" { + parsedFileCount, parseErr := strconv.Atoi(jobData["file_count"]) + if parseErr != nil { + return 0, errors.Wrap(parseErr, "failed to parse file_count") + } + return parsedFileCount, nil + } + + // Assume it hasn't been set + return 0, nil +} + +func (worker *BatchReportWorker) saveData(job *model.Job, reportData []interface{}) error { + fileCount, err := getFileCount(job.Data) + if err != nil { + return err + } + + err = worker.app.SaveReportChunk(worker.reportFormat, makeFilename(job.Id, fileCount), reportData) + if err != nil { + return err + } + + fileCount++ + job.Data["file_count"] = strconv.Itoa(fileCount) + + return nil +} + +func (worker *BatchReportWorker) complete(job *model.Job) error { + requestingUserId := job.Data["requesting_user_id"] + if requestingUserId == "" { + return errors.New("No user to send the report to") + } + fileCount, err := getFileCount(job.Data) + if err != nil { + return err + } + + filenames := []string{} + for i := 0; i < fileCount; i++ { + filenames = append(filenames, makeFilename(job.Id, i)) + } + + compiledFilename, err := worker.app.CompileReportChunks(worker.reportFormat, filenames) + if err != nil { + return err + } + + return worker.app.SendReportToUser(requestingUserId, compiledFilename) +} diff --git a/server/channels/jobs/batch_worker.go b/server/channels/jobs/batch_worker.go index 14e5cad97965d..5ab75516cecea 100644 --- a/server/channels/jobs/batch_worker.go +++ b/server/channels/jobs/batch_worker.go @@ -25,7 +25,7 @@ type BatchWorker[T interface{}] struct { jobs chan model.Job timeBetweenBatches time.Duration - doBatch func(rctx *request.Context, job *model.Job, logger mlog.LoggerIFace, store store.Store, app T) bool + doBatch func(rctx *request.Context, job *model.Job) bool } // MakeBatchWorker creates a worker to process the given batch function. @@ -34,7 +34,7 @@ func MakeBatchWorker[T interface{}]( store store.Store, app T, timeBetweenBatches time.Duration, - doBatch func(rctx *request.Context, job *model.Job, logger mlog.LoggerIFace, store store.Store, app T) bool, + doBatch func(rctx *request.Context, job *model.Job) bool, onComplete func(), ) model.Worker { worker := &BatchWorker[T]{ @@ -139,7 +139,7 @@ func (worker *BatchWorker[T]) DoJob(job *model.Job) { } return case <-time.After(worker.timeBetweenBatches): - if stop := worker.doBatch(c, job, logger, worker.store, worker.app); stop { + if stop := worker.doBatch(c, job); stop { return } } diff --git a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go new file mode 100644 index 0000000000000..4f7c5957160d4 --- /dev/null +++ b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go @@ -0,0 +1,64 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package export_users_to_csv + +import ( + "time" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/v8/channels/jobs" + "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/pkg/errors" +) + +const ( + timeBetweenBatches = 1 * time.Second +) + +// MakeWorker creates a batch migration worker to delete empty drafts. +func MakeWorker(jobServer *jobs.JobServer, store store.Store, app jobs.BatchReportWorkerAppIFace) model.Worker { + return jobs.MakeBatchReportWorker( + jobServer, + store, + app, + timeBetweenBatches, + "csv", + getData, + ) +} + +// parseJobMetadata parses the opaque job metadata to return the information needed to decide which +// batch to process next. +func parseJobMetadata(data model.StringMap) (interface{}, error) { + return struct { + LastSortColumnValue string + LastUserId string + }{ + LastSortColumnValue: data["last_column_value"], + LastUserId: data["last_user_id"], + }, nil +} + +// makeJobMetadata encodes the information needed to decide which batch to process next back into +// the opaque job metadata. +func makeJobMetadata(lastColumnValue string, userID string) model.StringMap { + data := make(model.StringMap) + data["last_column_value"] = lastColumnValue + data["last_user_id"] = userID + + return data +} + +func getData(jobData model.StringMap, app jobs.BatchReportWorkerAppIFace) ([]interface{}, model.StringMap, bool, error) { + filter, err := parseJobMetadata(jobData) + if err != nil { + return nil, nil, false, errors.Wrap(err, "failed to parse job metadata") + } + + users := []model.User{model.User{Id: "test"}} + + // Actually get the data + + return []interface{}{users}, makeJobMetadata("todo", "me"), false, nil +} From 098761d1c91569394c6f1fc3b20f7ef71da9aa81 Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Wed, 6 Dec 2023 09:32:44 -0500 Subject: [PATCH 3/8] WIP --- server/channels/app/app_iface.go | 6 +-- .../app/opentracing/opentracing_layer.go | 10 ++--- server/channels/app/report.go | 40 ++++++++++++++----- server/channels/app/server.go | 7 ++++ server/channels/jobs/batch_report_worker.go | 20 +++------- server/public/model/job.go | 1 + 6 files changed, 50 insertions(+), 34 deletions(-) diff --git a/server/channels/app/app_iface.go b/server/channels/app/app_iface.go index 0920a9d635f7e..33863a63905b0 100644 --- a/server/channels/app/app_iface.go +++ b/server/channels/app/app_iface.go @@ -471,7 +471,7 @@ type AppIface interface { Cluster() einterfaces.ClusterInterface CompareAndDeletePluginKey(pluginID string, key string, oldValue []byte) (bool, *model.AppError) CompareAndSetPluginKey(pluginID string, key string, oldValue, newValue []byte) (bool, *model.AppError) - CompileReportChunks(format string, filenames []string) (string, error) + CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) CompleteOAuth(c request.CTX, service string, body io.ReadCloser, teamID string, props map[string]string, tokenUser *model.User) (*model.User, *model.AppError) CompleteOnboarding(c request.CTX, request *model.CompleteOnboardingRequest) *model.AppError CompleteSwitchWithOAuth(c request.CTX, service string, userData io.Reader, email string, tokenUser *model.User) (*model.User, *model.AppError) @@ -1011,7 +1011,7 @@ type AppIface interface { SaveBrandImage(rctx request.CTX, imageData *multipart.FileHeader) *model.AppError SaveComplianceReport(rctx request.CTX, job *model.Compliance) (*model.Compliance, *model.AppError) SaveReactionForPost(c request.CTX, reaction *model.Reaction) (*model.Reaction, *model.AppError) - SaveReportChunk(format string, filename string, reportData []interface{}) error + SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError SaveSharedChannel(c request.CTX, sc *model.SharedChannel) (*model.SharedChannel, error) SaveSharedChannelRemote(remote *model.SharedChannelRemote) (*model.SharedChannelRemote, error) SaveUserTermsOfService(userID, termsOfServiceId string, accepted bool) *model.AppError @@ -1048,7 +1048,7 @@ type AppIface interface { SendPasswordReset(email string, siteURL string) (bool, *model.AppError) SendPaymentFailedEmail(failedPayment *model.FailedPayment) *model.AppError SendPersistentNotifications() error - SendReportToUser(userID string, filename string) error + SendReportToUser(userID string, filename string) *model.AppError SendTestPushNotification(deviceID string) string SendUpgradeConfirmationEmail(isYearly bool) *model.AppError ServeInterPluginRequest(w http.ResponseWriter, r *http.Request, sourcePluginId, destinationPluginId string) diff --git a/server/channels/app/opentracing/opentracing_layer.go b/server/channels/app/opentracing/opentracing_layer.go index 2079e697d1bbc..8872f0830239c 100644 --- a/server/channels/app/opentracing/opentracing_layer.go +++ b/server/channels/app/opentracing/opentracing_layer.go @@ -1669,7 +1669,7 @@ func (a *OpenTracingAppLayer) CompareAndSetPluginKey(pluginID string, key string return resultVar0, resultVar1 } -func (a *OpenTracingAppLayer) CompileReportChunks(format string, filenames []string) (string, error) { +func (a *OpenTracingAppLayer) CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CompileReportChunks") @@ -1681,7 +1681,7 @@ func (a *OpenTracingAppLayer) CompileReportChunks(format string, filenames []str }() defer span.Finish() - resultVar0, resultVar1 := a.app.CompileReportChunks(format, filenames) + resultVar0, resultVar1 := a.app.CompileReportChunks(format, prefix, numberOfChunks) if resultVar1 != nil { span.LogFields(spanlog.Error(resultVar1)) @@ -14826,7 +14826,7 @@ func (a *OpenTracingAppLayer) SaveReactionForPost(c request.CTX, reaction *model return resultVar0, resultVar1 } -func (a *OpenTracingAppLayer) SaveReportChunk(format string, filename string, reportData []interface{}) error { +func (a *OpenTracingAppLayer) SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SaveReportChunk") @@ -14838,7 +14838,7 @@ func (a *OpenTracingAppLayer) SaveReportChunk(format string, filename string, re }() defer span.Finish() - resultVar0 := a.app.SaveReportChunk(format, filename, reportData) + resultVar0 := a.app.SaveReportChunk(format, prefix, count, reportData) if resultVar0 != nil { span.LogFields(spanlog.Error(resultVar0)) @@ -15691,7 +15691,7 @@ func (a *OpenTracingAppLayer) SendPersistentNotifications() error { return resultVar0 } -func (a *OpenTracingAppLayer) SendReportToUser(userID string, filename string) error { +func (a *OpenTracingAppLayer) SendReportToUser(userID string, filename string) *model.AppError { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SendReportToUser") diff --git a/server/channels/app/report.go b/server/channels/app/report.go index 84cd6ad40825b..0e75a19f7d32a 100644 --- a/server/channels/app/report.go +++ b/server/channels/app/report.go @@ -6,21 +6,21 @@ package app import ( "bytes" "encoding/csv" - "errors" + "fmt" "net/http" "github.com/mattermost/mattermost/server/public/model" ) -func (a *App) SaveReportChunk(format string, filename string, reportData []interface{}) *model.AppError { +func (a *App) SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError { switch format { case "csv": - return a.saveCSVChunk(filename, reportData) + return a.saveCSVChunk(prefix, count, reportData) } return model.NewAppError("SaveReportChunk", "", nil, "unsupported report format", http.StatusInternalServerError) } -func (a *App) saveCSVChunk(filename string, reportData []interface{}) *model.AppError { +func (a *App) saveCSVChunk(prefix string, count int, reportData []interface{}) *model.AppError { var buf bytes.Buffer w := csv.NewWriter(&buf) @@ -31,7 +31,7 @@ func (a *App) saveCSVChunk(filename string, reportData []interface{}) *model.App if err != nil { return model.NewAppError("saveCSVChunk", "", nil, "failed to write report data to CSV", http.StatusInternalServerError) } - _, appErr := a.WriteFile(&buf, filename) + _, appErr := a.WriteFile(&buf, makeFilename(prefix, count)) if appErr != nil { return appErr } @@ -39,18 +39,36 @@ func (a *App) saveCSVChunk(filename string, reportData []interface{}) *model.App return nil } -func (a *App) CompileReportChunks(format string, filenames []string) (string, error) { +func (a *App) CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) { switch format { case "csv": - return a.compileCSVChunks(filenames) + return a.compileCSVChunks(prefix, numberOfChunks) } - return "", errors.New("unsupported report format") + return "", model.NewAppError("CompileReportChunks", "", nil, "unsupported report format", http.StatusInternalServerError) } -func (a *App) compileCSVChunks(filenames []string) (string, error) { - return "", nil +func (a *App) compileCSVChunks(prefix string, numberOfChunks int) (string, *model.AppError) { + for i := 0; i < numberOfChunks; i++ { + var buf bytes.Buffer + chunk, err := a.ReadFile(makeFilename(prefix, i)) + if err != nil { + return "", err + } + if _, bufErr := buf.Read(chunk); bufErr != nil { + return "", model.NewAppError("compileCSVChunks", "", nil, bufErr.Error(), http.StatusInternalServerError) + } + if _, err = a.AppendFile(&buf, prefix); err != nil { + return "", err + } + } + + return prefix, nil } -func (a *App) SendReportToUser(userID string, filename string) error { +func (a *App) SendReportToUser(userID string, filename string) *model.AppError { return nil } + +func makeFilename(prefix string, count int) string { + return fmt.Sprintf("%s__%d", prefix, count) +} diff --git a/server/channels/app/server.go b/server/channels/app/server.go index 4412c328b9c9a..8335b61ed8244 100644 --- a/server/channels/app/server.go +++ b/server/channels/app/server.go @@ -44,6 +44,7 @@ import ( "github.com/mattermost/mattermost/server/v8/channels/jobs/expirynotify" "github.com/mattermost/mattermost/server/v8/channels/jobs/export_delete" "github.com/mattermost/mattermost/server/v8/channels/jobs/export_process" + "github.com/mattermost/mattermost/server/v8/channels/jobs/export_users_to_csv" "github.com/mattermost/mattermost/server/v8/channels/jobs/extract_content" "github.com/mattermost/mattermost/server/v8/channels/jobs/hosted_purchase_screening" "github.com/mattermost/mattermost/server/v8/channels/jobs/import_delete" @@ -1674,6 +1675,12 @@ func (s *Server) initJobs() { refresh_post_stats.MakeScheduler(s.Jobs, *s.platform.Config().SqlSettings.DriverName), ) + s.Jobs.RegisterJobType( + model.JobTypeExportUsersToCSV, + export_users_to_csv.MakeWorker(s.Jobs, s.Store(), New(ServerConnector(s.Channels()))), + nil, + ) + s.platform.Jobs = s.Jobs } diff --git a/server/channels/jobs/batch_report_worker.go b/server/channels/jobs/batch_report_worker.go index 503e6903056c1..b40205a74c8ff 100644 --- a/server/channels/jobs/batch_report_worker.go +++ b/server/channels/jobs/batch_report_worker.go @@ -4,7 +4,6 @@ package jobs import ( - "fmt" "net/http" "strconv" "time" @@ -17,9 +16,9 @@ import ( ) type BatchReportWorkerAppIFace interface { - SaveReportChunk(format string, filename string, reportData []interface{}) error - CompileReportChunks(format string, filenames []string) (string, error) - SendReportToUser(userID string, filename string) error + SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError + CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) + SendReportToUser(userID string, filename string) *model.AppError } type BatchReportWorker struct { @@ -91,10 +90,6 @@ func (worker *BatchReportWorker) doBatch(rctx *request.Context, job *model.Job) return false } -func makeFilename(jobId string, fileCounter int) string { - return fmt.Sprintf("%s__%d", jobId, fileCounter) -} - func getFileCount(jobData model.StringMap) (int, error) { if jobData["file_count"] != "" { parsedFileCount, parseErr := strconv.Atoi(jobData["file_count"]) @@ -114,7 +109,7 @@ func (worker *BatchReportWorker) saveData(job *model.Job, reportData []interface return err } - err = worker.app.SaveReportChunk(worker.reportFormat, makeFilename(job.Id, fileCount), reportData) + err = worker.app.SaveReportChunk(worker.reportFormat, job.Id, fileCount, reportData) if err != nil { return err } @@ -135,12 +130,7 @@ func (worker *BatchReportWorker) complete(job *model.Job) error { return err } - filenames := []string{} - for i := 0; i < fileCount; i++ { - filenames = append(filenames, makeFilename(job.Id, i)) - } - - compiledFilename, err := worker.app.CompileReportChunks(worker.reportFormat, filenames) + compiledFilename, err := worker.app.CompileReportChunks(worker.reportFormat, job.Id, fileCount) if err != nil { return err } diff --git a/server/public/model/job.go b/server/public/model/job.go index a592e7b61dae3..9c91cd8d80880 100644 --- a/server/public/model/job.go +++ b/server/public/model/job.go @@ -38,6 +38,7 @@ const ( JobTypeCleanupDesktopTokens = "cleanup_desktop_tokens" JobTypeDeleteEmptyDraftsMigration = "delete_empty_drafts_migration" JobTypeRefreshPostStats = "refresh_post_stats" + JobTypeExportUsersToCSV = "export_users_to_csv" JobStatusPending = "pending" JobStatusInProgress = "in_progress" From e42b778fb89f1ef0ec22b533424e4d2383bbb559 Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Thu, 7 Dec 2023 10:03:59 -0500 Subject: [PATCH 4/8] POC batch reporting --- server/channels/app/report.go | 17 +++++++++++------ server/channels/jobs/batch_report_worker.go | 16 ++++++++-------- .../export_users_to_csv/export_users_to_csv.go | 4 ++-- server/public/model/report.go | 9 +++++++++ 4 files changed, 30 insertions(+), 16 deletions(-) create mode 100644 server/public/model/report.go diff --git a/server/channels/app/report.go b/server/channels/app/report.go index 0e75a19f7d32a..0e646e96ebf76 100644 --- a/server/channels/app/report.go +++ b/server/channels/app/report.go @@ -12,7 +12,7 @@ import ( "github.com/mattermost/mattermost/server/public/model" ) -func (a *App) SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError { +func (a *App) SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError { switch format { case "csv": return a.saveCSVChunk(prefix, count, reportData) @@ -20,17 +20,22 @@ func (a *App) SaveReportChunk(format string, prefix string, count int, reportDat return model.NewAppError("SaveReportChunk", "", nil, "unsupported report format", http.StatusInternalServerError) } -func (a *App) saveCSVChunk(prefix string, count int, reportData []interface{}) *model.AppError { +func (a *App) saveCSVChunk(prefix string, count int, reportData []model.ReportableObject) *model.AppError { var buf bytes.Buffer w := csv.NewWriter(&buf) - // TODO: Fill this array with report data - records := [][]string{} - - err := w.WriteAll(records) + err := w.Write(reportData[0].GetHeaders()) if err != nil { return model.NewAppError("saveCSVChunk", "", nil, "failed to write report data to CSV", http.StatusInternalServerError) } + + for _, report := range reportData { + err := w.Write(report.ToReport()) + if err != nil { + return model.NewAppError("saveCSVChunk", "", nil, "failed to write report data to CSV", http.StatusInternalServerError) + } + } + _, appErr := a.WriteFile(&buf, makeFilename(prefix, count)) if appErr != nil { return appErr diff --git a/server/channels/jobs/batch_report_worker.go b/server/channels/jobs/batch_report_worker.go index b40205a74c8ff..7b12fb29e9e47 100644 --- a/server/channels/jobs/batch_report_worker.go +++ b/server/channels/jobs/batch_report_worker.go @@ -16,7 +16,7 @@ import ( ) type BatchReportWorkerAppIFace interface { - SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError + SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) SendReportToUser(userID string, filename string) *model.AppError } @@ -24,7 +24,7 @@ type BatchReportWorkerAppIFace interface { type BatchReportWorker struct { BatchWorker[BatchReportWorkerAppIFace] reportFormat string - getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]interface{}, model.StringMap, bool, error) + getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]model.ReportableObject, model.StringMap, bool, error) } func MakeBatchReportWorker( @@ -33,7 +33,7 @@ func MakeBatchReportWorker( app BatchReportWorkerAppIFace, timeBetweenBatches time.Duration, reportFormat string, - getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]interface{}, model.StringMap, bool, error), + getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]model.ReportableObject, model.StringMap, bool, error), ) model.Worker { worker := &BatchReportWorker{ reportFormat: reportFormat, @@ -103,14 +103,14 @@ func getFileCount(jobData model.StringMap) (int, error) { return 0, nil } -func (worker *BatchReportWorker) saveData(job *model.Job, reportData []interface{}) error { +func (worker *BatchReportWorker) saveData(job *model.Job, reportData []model.ReportableObject) error { fileCount, err := getFileCount(job.Data) if err != nil { return err } - err = worker.app.SaveReportChunk(worker.reportFormat, job.Id, fileCount, reportData) - if err != nil { + appErr := worker.app.SaveReportChunk(worker.reportFormat, job.Id, fileCount, reportData) + if appErr != nil { return err } @@ -130,8 +130,8 @@ func (worker *BatchReportWorker) complete(job *model.Job) error { return err } - compiledFilename, err := worker.app.CompileReportChunks(worker.reportFormat, job.Id, fileCount) - if err != nil { + compiledFilename, appErr := worker.app.CompileReportChunks(worker.reportFormat, job.Id, fileCount) + if appErr != nil { return err } diff --git a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go index 4f7c5957160d4..4849667fd5a68 100644 --- a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go +++ b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go @@ -50,7 +50,7 @@ func makeJobMetadata(lastColumnValue string, userID string) model.StringMap { return data } -func getData(jobData model.StringMap, app jobs.BatchReportWorkerAppIFace) ([]interface{}, model.StringMap, bool, error) { +func getData(jobData model.StringMap, app jobs.BatchReportWorkerAppIFace) ([]model.ReportableObject, model.StringMap, bool, error) { filter, err := parseJobMetadata(jobData) if err != nil { return nil, nil, false, errors.Wrap(err, "failed to parse job metadata") @@ -60,5 +60,5 @@ func getData(jobData model.StringMap, app jobs.BatchReportWorkerAppIFace) ([]int // Actually get the data - return []interface{}{users}, makeJobMetadata("todo", "me"), false, nil + return []model.ReportableObject{users}, makeJobMetadata("todo", "me"), false, nil } diff --git a/server/public/model/report.go b/server/public/model/report.go new file mode 100644 index 0000000000000..1e57bc610863a --- /dev/null +++ b/server/public/model/report.go @@ -0,0 +1,9 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package model + +type ReportableObject interface { + GetHeaders() []string + ToReport() []string +} From 568bd8890e042a4c18367a78eed1541181640993 Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Thu, 7 Dec 2023 10:10:54 -0500 Subject: [PATCH 5/8] Oops --- server/channels/jobs/batch_migration_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/channels/jobs/batch_migration_worker.go b/server/channels/jobs/batch_migration_worker.go index 4146f4dd99d2a..37b0a5cd03d02 100644 --- a/server/channels/jobs/batch_migration_worker.go +++ b/server/channels/jobs/batch_migration_worker.go @@ -24,7 +24,7 @@ type BatchMigrationWorkerAppIFace interface { // resets the migration if the cluster version diverges after starting. // // In principle, the job infrastructure is overkill for this kind of work, as there's a worker -// created per migration. There's alsaswfvsdo complication with edge cases, like having to restart the +// created per migration. There's also complication with edge cases, like having to restart the // server in order to retry a failed migration job. Refactoring the job infrastructure is left as // a future exercise. type BatchMigrationWorker struct { From 0a5e1f955e686b6735986c502021699077bce868 Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Fri, 8 Dec 2023 11:41:46 -0500 Subject: [PATCH 6/8] Job hookup --- server/channels/app/app_iface.go | 2 +- .../app/opentracing/opentracing_layer.go | 2 +- server/channels/jobs/batch_report_worker.go | 34 ++++++------- .../export_users_to_csv.go | 45 ++++++++++++----- server/public/model/user.go | 50 +++++++++++++++++++ 5 files changed, 101 insertions(+), 32 deletions(-) diff --git a/server/channels/app/app_iface.go b/server/channels/app/app_iface.go index a16af144c899a..b49ada4ca6409 100644 --- a/server/channels/app/app_iface.go +++ b/server/channels/app/app_iface.go @@ -1012,7 +1012,7 @@ type AppIface interface { SaveBrandImage(rctx request.CTX, imageData *multipart.FileHeader) *model.AppError SaveComplianceReport(rctx request.CTX, job *model.Compliance) (*model.Compliance, *model.AppError) SaveReactionForPost(c request.CTX, reaction *model.Reaction) (*model.Reaction, *model.AppError) - SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError + SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError SaveSharedChannel(c request.CTX, sc *model.SharedChannel) (*model.SharedChannel, error) SaveSharedChannelRemote(remote *model.SharedChannelRemote) (*model.SharedChannelRemote, error) SaveUserTermsOfService(userID, termsOfServiceId string, accepted bool) *model.AppError diff --git a/server/channels/app/opentracing/opentracing_layer.go b/server/channels/app/opentracing/opentracing_layer.go index 80e9af2387e2f..acf4b8db37489 100644 --- a/server/channels/app/opentracing/opentracing_layer.go +++ b/server/channels/app/opentracing/opentracing_layer.go @@ -14848,7 +14848,7 @@ func (a *OpenTracingAppLayer) SaveReactionForPost(c request.CTX, reaction *model return resultVar0, resultVar1 } -func (a *OpenTracingAppLayer) SaveReportChunk(format string, prefix string, count int, reportData []interface{}) *model.AppError { +func (a *OpenTracingAppLayer) SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SaveReportChunk") diff --git a/server/channels/jobs/batch_report_worker.go b/server/channels/jobs/batch_report_worker.go index 7b12fb29e9e47..fc3c442805296 100644 --- a/server/channels/jobs/batch_report_worker.go +++ b/server/channels/jobs/batch_report_worker.go @@ -21,25 +21,25 @@ type BatchReportWorkerAppIFace interface { SendReportToUser(userID string, filename string) *model.AppError } -type BatchReportWorker struct { - BatchWorker[BatchReportWorkerAppIFace] +type BatchReportWorker[T BatchReportWorkerAppIFace] struct { + BatchWorker[T] reportFormat string - getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]model.ReportableObject, model.StringMap, bool, error) + getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error) } -func MakeBatchReportWorker( +func MakeBatchReportWorker[T BatchReportWorkerAppIFace]( jobServer *JobServer, store store.Store, - app BatchReportWorkerAppIFace, + app T, timeBetweenBatches time.Duration, reportFormat string, - getData func(jobData model.StringMap, app BatchReportWorkerAppIFace) ([]model.ReportableObject, model.StringMap, bool, error), + getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error), ) model.Worker { - worker := &BatchReportWorker{ + worker := &BatchReportWorker[T]{ reportFormat: reportFormat, getData: getData, } - worker.BatchWorker = BatchWorker[BatchReportWorkerAppIFace]{ + worker.BatchWorker = BatchWorker[T]{ jobServer: jobServer, logger: jobServer.Logger(), store: store, @@ -53,18 +53,18 @@ func MakeBatchReportWorker( return worker } -func (worker *BatchReportWorker) doBatch(rctx *request.Context, job *model.Job) bool { +func (worker *BatchReportWorker[T]) doBatch(rctx *request.Context, job *model.Job) bool { reportData, nextData, done, err := worker.getData(job.Data, worker.app) if err != nil { // TODO getData error - worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) - worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + worker.logger.Error("Worker: Failed to do report batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) return true } else if done { if err = worker.complete(job); err != nil { // TODO complete error - worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) - worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + worker.logger.Error("Worker: Failed to do report batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) } else { worker.logger.Info("Worker: Job is complete") worker.setJobSuccess(worker.logger, job) @@ -76,8 +76,8 @@ func (worker *BatchReportWorker) doBatch(rctx *request.Context, job *model.Job) err = worker.saveData(job, reportData) if err != nil { // TODO saveData error - worker.logger.Error("Worker: Failed to do migration batch. Exiting", mlog.Err(err)) - worker.setJobError(worker.logger, job, model.NewAppError("doMigrationBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) + worker.logger.Error("Worker: Failed to do report batch. Exiting", mlog.Err(err)) + worker.setJobError(worker.logger, job, model.NewAppError("doBatch", model.NoTranslation, nil, "", http.StatusInternalServerError).Wrap(err)) return true } @@ -103,7 +103,7 @@ func getFileCount(jobData model.StringMap) (int, error) { return 0, nil } -func (worker *BatchReportWorker) saveData(job *model.Job, reportData []model.ReportableObject) error { +func (worker *BatchReportWorker[T]) saveData(job *model.Job, reportData []model.ReportableObject) error { fileCount, err := getFileCount(job.Data) if err != nil { return err @@ -120,7 +120,7 @@ func (worker *BatchReportWorker) saveData(job *model.Job, reportData []model.Rep return nil } -func (worker *BatchReportWorker) complete(job *model.Job) error { +func (worker *BatchReportWorker[T]) complete(job *model.Job) error { requestingUserId := job.Data["requesting_user_id"] if requestingUserId == "" { return errors.New("No user to send the report to") diff --git a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go index 4849667fd5a68..a9a5941fe0510 100644 --- a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go +++ b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go @@ -16,8 +16,13 @@ const ( timeBetweenBatches = 1 * time.Second ) +type ExportUsersToCSVAppIFace interface { + jobs.BatchReportWorkerAppIFace + GetUsersForReporting(filter *model.UserReportOptions) ([]*model.UserReport, *model.AppError) +} + // MakeWorker creates a batch migration worker to delete empty drafts. -func MakeWorker(jobServer *jobs.JobServer, store store.Store, app jobs.BatchReportWorkerAppIFace) model.Worker { +func MakeWorker(jobServer *jobs.JobServer, store store.Store, app ExportUsersToCSVAppIFace) model.Worker { return jobs.MakeBatchReportWorker( jobServer, store, @@ -30,14 +35,18 @@ func MakeWorker(jobServer *jobs.JobServer, store store.Store, app jobs.BatchRepo // parseJobMetadata parses the opaque job metadata to return the information needed to decide which // batch to process next. -func parseJobMetadata(data model.StringMap) (interface{}, error) { - return struct { - LastSortColumnValue string - LastUserId string - }{ - LastSortColumnValue: data["last_column_value"], - LastUserId: data["last_user_id"], - }, nil +func parseJobMetadata(data model.StringMap) (*model.UserReportOptions, error) { + options := model.UserReportOptionsAPI{ + UserReportOptionsWithoutDateRange: model.UserReportOptionsWithoutDateRange{ + SortColumn: "Username", + PageSize: 100, + LastSortColumnValue: data["last_column_value"], + LastUserId: data["last_user_id"], + }, + DateRange: data["date_range"], + } + + return options.ToBaseOptions(time.Now()), nil } // makeJobMetadata encodes the information needed to decide which batch to process next back into @@ -50,15 +59,25 @@ func makeJobMetadata(lastColumnValue string, userID string) model.StringMap { return data } -func getData(jobData model.StringMap, app jobs.BatchReportWorkerAppIFace) ([]model.ReportableObject, model.StringMap, bool, error) { +func getData(jobData model.StringMap, app ExportUsersToCSVAppIFace) ([]model.ReportableObject, model.StringMap, bool, error) { filter, err := parseJobMetadata(jobData) if err != nil { return nil, nil, false, errors.Wrap(err, "failed to parse job metadata") } - users := []model.User{model.User{Id: "test"}} + users, appErr := app.GetUsersForReporting(filter) + if appErr != nil { + return nil, nil, false, errors.Wrapf(err, "failed to get the next batch (column_value=%v, user_id=%v)", filter.LastSortColumnValue, filter.LastUserId) + } + + if len(users) == 0 { + return nil, nil, true, nil + } - // Actually get the data + reportableObjects := []model.ReportableObject{} + for i := 0; i < len(users); i++ { + reportableObjects = append(reportableObjects, users[i]) + } - return []model.ReportableObject{users}, makeJobMetadata("todo", "me"), false, nil + return reportableObjects, makeJobMetadata(users[len(users)-1].Username, users[len(users)-1].Id), false, nil } diff --git a/server/public/model/user.go b/server/public/model/user.go index 956914abae9e1..7f0068832d79b 100644 --- a/server/public/model/user.go +++ b/server/public/model/user.go @@ -10,6 +10,7 @@ import ( "net/http" "regexp" "sort" + "strconv" "strings" "time" "unicode/utf8" @@ -1039,6 +1040,55 @@ type UserReport struct { UserPostStats } +func (u *UserReport) GetHeaders() []string { + return []string{ + "Id", + "Username", + "Email", + "CreateAt", + "Name", + "Roles", + "LastLogin", + "LastStatusAt", + "LastPostDate", + "DaysActive", + "TotalPosts", + } +} + +func (u *UserReport) ToReport() []string { + lastStatusAt := "" + if u.LastStatusAt != nil { + lastStatusAt = time.UnixMilli(*u.LastStatusAt).String() + } + lastPostDate := "" + if u.LastPostDate != nil { + lastStatusAt = time.UnixMilli(*u.LastPostDate).String() + } + daysActive := "" + if u.DaysActive != nil { + daysActive = strconv.Itoa(*u.DaysActive) + } + totalPosts := "" + if u.TotalPosts != nil { + totalPosts = strconv.Itoa(*u.TotalPosts) + } + + return []string{ + u.Id, + u.Username, + u.Email, + time.UnixMilli(u.CreateAt).String(), + u.DisplayName, + u.Roles, + time.UnixMilli(u.LastLogin).String(), + lastStatusAt, + lastPostDate, + daysActive, + totalPosts, + } +} + type UserReportOptionsWithoutDateRange struct { SortColumn string SortDesc bool From ee26263656a53adfa53cb7c301d1b83d1d1ac5ed Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Fri, 8 Dec 2023 14:10:23 -0500 Subject: [PATCH 7/8] Working export to file --- server/channels/app/app_iface.go | 2 +- .../app/opentracing/opentracing_layer.go | 4 +- server/channels/app/report.go | 44 ++++++++++++------- server/channels/jobs/batch_report_worker.go | 15 +++++-- .../export_users_to_csv.go | 1 + server/public/model/user.go | 8 +++- 6 files changed, 48 insertions(+), 26 deletions(-) diff --git a/server/channels/app/app_iface.go b/server/channels/app/app_iface.go index b49ada4ca6409..18c5319707324 100644 --- a/server/channels/app/app_iface.go +++ b/server/channels/app/app_iface.go @@ -471,7 +471,7 @@ type AppIface interface { Cluster() einterfaces.ClusterInterface CompareAndDeletePluginKey(pluginID string, key string, oldValue []byte) (bool, *model.AppError) CompareAndSetPluginKey(pluginID string, key string, oldValue, newValue []byte) (bool, *model.AppError) - CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) + CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) CompleteOAuth(c request.CTX, service string, body io.ReadCloser, teamID string, props map[string]string, tokenUser *model.User) (*model.User, *model.AppError) CompleteOnboarding(c request.CTX, request *model.CompleteOnboardingRequest) *model.AppError CompleteSwitchWithOAuth(c request.CTX, service string, userData io.Reader, email string, tokenUser *model.User) (*model.User, *model.AppError) diff --git a/server/channels/app/opentracing/opentracing_layer.go b/server/channels/app/opentracing/opentracing_layer.go index acf4b8db37489..42bad4811cd7f 100644 --- a/server/channels/app/opentracing/opentracing_layer.go +++ b/server/channels/app/opentracing/opentracing_layer.go @@ -1669,7 +1669,7 @@ func (a *OpenTracingAppLayer) CompareAndSetPluginKey(pluginID string, key string return resultVar0, resultVar1 } -func (a *OpenTracingAppLayer) CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) { +func (a *OpenTracingAppLayer) CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.CompileReportChunks") @@ -1681,7 +1681,7 @@ func (a *OpenTracingAppLayer) CompileReportChunks(format string, prefix string, }() defer span.Finish() - resultVar0, resultVar1 := a.app.CompileReportChunks(format, prefix, numberOfChunks) + resultVar0, resultVar1 := a.app.CompileReportChunks(format, prefix, numberOfChunks, headers) if resultVar1 != nil { span.LogFields(spanlog.Error(resultVar1)) diff --git a/server/channels/app/report.go b/server/channels/app/report.go index 0e646e96ebf76..a63811eee65bb 100644 --- a/server/channels/app/report.go +++ b/server/channels/app/report.go @@ -24,11 +24,6 @@ func (a *App) saveCSVChunk(prefix string, count int, reportData []model.Reportab var buf bytes.Buffer w := csv.NewWriter(&buf) - err := w.Write(reportData[0].GetHeaders()) - if err != nil { - return model.NewAppError("saveCSVChunk", "", nil, "failed to write report data to CSV", http.StatusInternalServerError) - } - for _, report := range reportData { err := w.Write(report.ToReport()) if err != nil { @@ -36,7 +31,8 @@ func (a *App) saveCSVChunk(prefix string, count int, reportData []model.Reportab } } - _, appErr := a.WriteFile(&buf, makeFilename(prefix, count)) + w.Flush() + _, appErr := a.WriteFile(&buf, makeFilename(prefix, count, "csv")) if appErr != nil { return appErr } @@ -44,36 +40,50 @@ func (a *App) saveCSVChunk(prefix string, count int, reportData []model.Reportab return nil } -func (a *App) CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) { +func (a *App) CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) { switch format { case "csv": - return a.compileCSVChunks(prefix, numberOfChunks) + return a.compileCSVChunks(prefix, numberOfChunks, headers) } return "", model.NewAppError("CompileReportChunks", "", nil, "unsupported report format", http.StatusInternalServerError) } -func (a *App) compileCSVChunks(prefix string, numberOfChunks int) (string, *model.AppError) { +func (a *App) compileCSVChunks(prefix string, numberOfChunks int, headers []string) (string, *model.AppError) { + filename := fmt.Sprintf("batch_report_%s.csv", prefix) + + var headerBuf bytes.Buffer + w := csv.NewWriter(&headerBuf) + err := w.Write(headers) + if err != nil { + return "", model.NewAppError("compileCSVChunks", "", nil, "failed to write headers", http.StatusInternalServerError) + } + w.Flush() + _, appErr := a.WriteFile(&headerBuf, filename) + if appErr != nil { + return "", appErr + } + for i := 0; i < numberOfChunks; i++ { - var buf bytes.Buffer - chunk, err := a.ReadFile(makeFilename(prefix, i)) + chunkFilename := makeFilename(prefix, i, "csv") + chunk, err := a.ReadFile(chunkFilename) if err != nil { return "", err } - if _, bufErr := buf.Read(chunk); bufErr != nil { - return "", model.NewAppError("compileCSVChunks", "", nil, bufErr.Error(), http.StatusInternalServerError) + if _, err = a.AppendFile(bytes.NewReader(chunk), filename); err != nil { + return "", err } - if _, err = a.AppendFile(&buf, prefix); err != nil { + if err = a.RemoveFile(chunkFilename); err != nil { return "", err } } - return prefix, nil + return filename, nil } func (a *App) SendReportToUser(userID string, filename string) *model.AppError { return nil } -func makeFilename(prefix string, count int) string { - return fmt.Sprintf("%s__%d", prefix, count) +func makeFilename(prefix string, count int, extension string) string { + return fmt.Sprintf("batch_report_%s__%d.%s", prefix, count, extension) } diff --git a/server/channels/jobs/batch_report_worker.go b/server/channels/jobs/batch_report_worker.go index fc3c442805296..687146c112503 100644 --- a/server/channels/jobs/batch_report_worker.go +++ b/server/channels/jobs/batch_report_worker.go @@ -17,13 +17,14 @@ import ( type BatchReportWorkerAppIFace interface { SaveReportChunk(format string, prefix string, count int, reportData []model.ReportableObject) *model.AppError - CompileReportChunks(format string, prefix string, numberOfChunks int) (string, *model.AppError) + CompileReportChunks(format string, prefix string, numberOfChunks int, headers []string) (string, *model.AppError) SendReportToUser(userID string, filename string) *model.AppError } type BatchReportWorker[T BatchReportWorkerAppIFace] struct { BatchWorker[T] reportFormat string + headers []string getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error) } @@ -33,10 +34,12 @@ func MakeBatchReportWorker[T BatchReportWorkerAppIFace]( app T, timeBetweenBatches time.Duration, reportFormat string, + headers []string, getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error), ) model.Worker { worker := &BatchReportWorker[T]{ reportFormat: reportFormat, + headers: headers, getData: getData, } worker.BatchWorker = BatchWorker[T]{ @@ -130,10 +133,14 @@ func (worker *BatchReportWorker[T]) complete(job *model.Job) error { return err } - compiledFilename, appErr := worker.app.CompileReportChunks(worker.reportFormat, job.Id, fileCount) + compiledFilename, appErr := worker.app.CompileReportChunks(worker.reportFormat, job.Id, fileCount, worker.headers) if appErr != nil { - return err + return appErr + } + + if appErr = worker.app.SendReportToUser(requestingUserId, compiledFilename); appErr != nil { + return appErr } - return worker.app.SendReportToUser(requestingUserId, compiledFilename) + return nil } diff --git a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go index a9a5941fe0510..307a43ac1f600 100644 --- a/server/channels/jobs/export_users_to_csv/export_users_to_csv.go +++ b/server/channels/jobs/export_users_to_csv/export_users_to_csv.go @@ -29,6 +29,7 @@ func MakeWorker(jobServer *jobs.JobServer, store store.Store, app ExportUsersToC app, timeBetweenBatches, "csv", + model.UserReport.GetHeaders(model.UserReport{}), // TODO no? getData, ) } diff --git a/server/public/model/user.go b/server/public/model/user.go index 7f0068832d79b..bc260a28960d2 100644 --- a/server/public/model/user.go +++ b/server/public/model/user.go @@ -1040,7 +1040,7 @@ type UserReport struct { UserPostStats } -func (u *UserReport) GetHeaders() []string { +func (u UserReport) GetHeaders() []string { return []string{ "Id", "Username", @@ -1073,6 +1073,10 @@ func (u *UserReport) ToReport() []string { if u.TotalPosts != nil { totalPosts = strconv.Itoa(*u.TotalPosts) } + lastLogin := "" + if u.LastLogin > 0 { + lastLogin = time.UnixMilli(u.LastLogin).String() + } return []string{ u.Id, @@ -1081,7 +1085,7 @@ func (u *UserReport) ToReport() []string { time.UnixMilli(u.CreateAt).String(), u.DisplayName, u.Roles, - time.UnixMilli(u.LastLogin).String(), + lastLogin, lastStatusAt, lastPostDate, daysActive, From 3e521fe17db12b568040460b3702e5106f9af9eb Mon Sep 17 00:00:00 2001 From: Devin Binnie Date: Tue, 12 Dec 2023 10:35:27 -0500 Subject: [PATCH 8/8] PR feedback --- .../channels/jobs/batch_migration_worker.go | 7 +++--- server/channels/jobs/batch_report_worker.go | 7 +++--- server/channels/jobs/batch_worker.go | 25 ++++++++----------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/server/channels/jobs/batch_migration_worker.go b/server/channels/jobs/batch_migration_worker.go index 37b0a5cd03d02..efe28c91ce086 100644 --- a/server/channels/jobs/batch_migration_worker.go +++ b/server/channels/jobs/batch_migration_worker.go @@ -28,7 +28,8 @@ type BatchMigrationWorkerAppIFace interface { // server in order to retry a failed migration job. Refactoring the job infrastructure is left as // a future exercise. type BatchMigrationWorker struct { - BatchWorker[BatchMigrationWorkerAppIFace] + BatchWorker + app BatchMigrationWorkerAppIFace migrationKey string doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error) } @@ -43,14 +44,14 @@ func MakeBatchMigrationWorker( doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error), ) model.Worker { worker := &BatchMigrationWorker{ + app: app, migrationKey: migrationKey, doMigrationBatch: doMigrationBatch, } - worker.BatchWorker = BatchWorker[BatchMigrationWorkerAppIFace]{ + worker.BatchWorker = BatchWorker{ jobServer: jobServer, logger: jobServer.Logger().With(mlog.String("worker_name", migrationKey)), store: store, - app: app, stop: make(chan struct{}), stopped: make(chan bool, 1), jobs: make(chan model.Job), diff --git a/server/channels/jobs/batch_report_worker.go b/server/channels/jobs/batch_report_worker.go index 687146c112503..c528ae299bf9d 100644 --- a/server/channels/jobs/batch_report_worker.go +++ b/server/channels/jobs/batch_report_worker.go @@ -22,7 +22,8 @@ type BatchReportWorkerAppIFace interface { } type BatchReportWorker[T BatchReportWorkerAppIFace] struct { - BatchWorker[T] + BatchWorker + app T reportFormat string headers []string getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error) @@ -38,15 +39,15 @@ func MakeBatchReportWorker[T BatchReportWorkerAppIFace]( getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error), ) model.Worker { worker := &BatchReportWorker[T]{ + app: app, reportFormat: reportFormat, headers: headers, getData: getData, } - worker.BatchWorker = BatchWorker[T]{ + worker.BatchWorker = BatchWorker{ jobServer: jobServer, logger: jobServer.Logger(), store: store, - app: app, stop: make(chan struct{}), stopped: make(chan bool, 1), jobs: make(chan model.Job), diff --git a/server/channels/jobs/batch_worker.go b/server/channels/jobs/batch_worker.go index 5ab75516cecea..f30031c7e13e7 100644 --- a/server/channels/jobs/batch_worker.go +++ b/server/channels/jobs/batch_worker.go @@ -13,11 +13,10 @@ import ( "github.com/mattermost/mattermost/server/v8/channels/store" ) -type BatchWorker[T interface{}] struct { +type BatchWorker struct { jobServer *JobServer logger mlog.LoggerIFace store store.Store - app T stop chan struct{} stopped chan bool @@ -29,19 +28,17 @@ type BatchWorker[T interface{}] struct { } // MakeBatchWorker creates a worker to process the given batch function. -func MakeBatchWorker[T interface{}]( +func MakeBatchWorker( jobServer *JobServer, store store.Store, - app T, timeBetweenBatches time.Duration, doBatch func(rctx *request.Context, job *model.Job) bool, onComplete func(), ) model.Worker { - worker := &BatchWorker[T]{ + worker := &BatchWorker{ jobServer: jobServer, logger: jobServer.Logger(), store: store, - app: app, stop: make(chan struct{}), stopped: make(chan bool, 1), jobs: make(chan model.Job), @@ -52,7 +49,7 @@ func MakeBatchWorker[T interface{}]( } // Run starts the worker dedicated to the unique migration batch job it will be given to process. -func (worker *BatchWorker[T]) Run() { +func (worker *BatchWorker) Run() { worker.logger.Debug("Worker started") // We have to re-assign the stop channel again, because // it might happen that the job was restarted due to a config change. @@ -77,7 +74,7 @@ func (worker *BatchWorker[T]) Run() { } // Stop interrupts the worker even if the migration has not yet completed. -func (worker *BatchWorker[T]) Stop() { +func (worker *BatchWorker) Stop() { // Set to close, and if already closed before, then return. if !worker.closed.CompareAndSwap(false, true) { return @@ -89,12 +86,12 @@ func (worker *BatchWorker[T]) Stop() { } // JobChannel is the means by which the jobs infrastructure provides the worker the job to execute. -func (worker *BatchWorker[T]) JobChannel() chan<- model.Job { +func (worker *BatchWorker) JobChannel() chan<- model.Job { return worker.jobs } // IsEnabled is always true for batches. -func (worker *BatchWorker[T]) IsEnabled(_ *model.Config) bool { +func (worker *BatchWorker) IsEnabled(_ *model.Config) bool { return true } @@ -103,7 +100,7 @@ func (worker *BatchWorker[T]) IsEnabled(_ *model.Config) bool { // Note that this is a lot of distracting machinery here to claim the job, then double check the // status, and keep the status up to date in line with job infrastrcuture semantics. Unless an // error occurs, this worker should hold onto the job until its completed. -func (worker *BatchWorker[T]) DoJob(job *model.Job) { +func (worker *BatchWorker) DoJob(job *model.Job) { logger := worker.logger.With(mlog.Any("job", job)) logger.Debug("Worker received a new candidate job.") defer worker.jobServer.HandleJobPanic(logger, job) @@ -148,7 +145,7 @@ func (worker *BatchWorker[T]) DoJob(job *model.Job) { // resetJob erases the data tracking the next batch to execute and returns the job status to // pending to allow the job infrastructure to requeue it. -func (worker *BatchWorker[T]) resetJob(logger mlog.LoggerIFace, job *model.Job) { +func (worker *BatchWorker) resetJob(logger mlog.LoggerIFace, job *model.Job) { job.Data = nil job.Progress = 0 job.Status = model.JobStatusPending @@ -159,7 +156,7 @@ func (worker *BatchWorker[T]) resetJob(logger mlog.LoggerIFace, job *model.Job) } // setJobSuccess records the job as successful. -func (worker *BatchWorker[T]) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) { +func (worker *BatchWorker) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) { if err := worker.jobServer.SetJobProgress(job, 100); err != nil { logger.Error("Worker: Failed to update progress for job", mlog.Err(err)) worker.setJobError(logger, job, err) @@ -172,7 +169,7 @@ func (worker *BatchWorker[T]) setJobSuccess(logger mlog.LoggerIFace, job *model. } // setJobError puts the job into an error state, preventing the job from running again. -func (worker *BatchWorker[T]) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) { +func (worker *BatchWorker) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) { if err := worker.jobServer.SetJobError(job, appError); err != nil { logger.Error("Worker: Failed to set job error", mlog.Err(err)) }