Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
devinbinnie committed Dec 12, 2023
1 parent ee26263 commit 3e521fe
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 20 deletions.
7 changes: 4 additions & 3 deletions server/channels/jobs/batch_migration_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type BatchMigrationWorkerAppIFace interface {
// server in order to retry a failed migration job. Refactoring the job infrastructure is left as
// a future exercise.
type BatchMigrationWorker struct {
BatchWorker[BatchMigrationWorkerAppIFace]
BatchWorker
app BatchMigrationWorkerAppIFace
migrationKey string
doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error)
}
Expand All @@ -43,14 +44,14 @@ func MakeBatchMigrationWorker(
doMigrationBatch func(data model.StringMap, store store.Store) (model.StringMap, bool, error),
) model.Worker {
worker := &BatchMigrationWorker{
app: app,
migrationKey: migrationKey,
doMigrationBatch: doMigrationBatch,
}
worker.BatchWorker = BatchWorker[BatchMigrationWorkerAppIFace]{
worker.BatchWorker = BatchWorker{
jobServer: jobServer,
logger: jobServer.Logger().With(mlog.String("worker_name", migrationKey)),
store: store,
app: app,
stop: make(chan struct{}),
stopped: make(chan bool, 1),
jobs: make(chan model.Job),
Expand Down
7 changes: 4 additions & 3 deletions server/channels/jobs/batch_report_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type BatchReportWorkerAppIFace interface {
}

type BatchReportWorker[T BatchReportWorkerAppIFace] struct {
BatchWorker[T]
BatchWorker
app T
reportFormat string
headers []string
getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error)
Expand All @@ -38,15 +39,15 @@ func MakeBatchReportWorker[T BatchReportWorkerAppIFace](
getData func(jobData model.StringMap, app T) ([]model.ReportableObject, model.StringMap, bool, error),
) model.Worker {
worker := &BatchReportWorker[T]{
app: app,
reportFormat: reportFormat,
headers: headers,
getData: getData,
}
worker.BatchWorker = BatchWorker[T]{
worker.BatchWorker = BatchWorker{
jobServer: jobServer,
logger: jobServer.Logger(),
store: store,
app: app,
stop: make(chan struct{}),
stopped: make(chan bool, 1),
jobs: make(chan model.Job),
Expand Down
25 changes: 11 additions & 14 deletions server/channels/jobs/batch_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"github.com/mattermost/mattermost/server/v8/channels/store"
)

type BatchWorker[T interface{}] struct {
type BatchWorker struct {
jobServer *JobServer
logger mlog.LoggerIFace
store store.Store
app T

stop chan struct{}
stopped chan bool
Expand All @@ -29,19 +28,17 @@ type BatchWorker[T interface{}] struct {
}

// MakeBatchWorker creates a worker to process the given batch function.
func MakeBatchWorker[T interface{}](
func MakeBatchWorker(
jobServer *JobServer,
store store.Store,
app T,
timeBetweenBatches time.Duration,
doBatch func(rctx *request.Context, job *model.Job) bool,
onComplete func(),
) model.Worker {
worker := &BatchWorker[T]{
worker := &BatchWorker{
jobServer: jobServer,
logger: jobServer.Logger(),
store: store,
app: app,
stop: make(chan struct{}),
stopped: make(chan bool, 1),
jobs: make(chan model.Job),
Expand All @@ -52,7 +49,7 @@ func MakeBatchWorker[T interface{}](
}

// Run starts the worker dedicated to the unique migration batch job it will be given to process.
func (worker *BatchWorker[T]) Run() {
func (worker *BatchWorker) Run() {
worker.logger.Debug("Worker started")
// We have to re-assign the stop channel again, because
// it might happen that the job was restarted due to a config change.
Expand All @@ -77,7 +74,7 @@ func (worker *BatchWorker[T]) Run() {
}

// Stop interrupts the worker even if the migration has not yet completed.
func (worker *BatchWorker[T]) Stop() {
func (worker *BatchWorker) Stop() {
// Set to close, and if already closed before, then return.
if !worker.closed.CompareAndSwap(false, true) {
return
Expand All @@ -89,12 +86,12 @@ func (worker *BatchWorker[T]) Stop() {
}

// JobChannel is the means by which the jobs infrastructure provides the worker the job to execute.
func (worker *BatchWorker[T]) JobChannel() chan<- model.Job {
func (worker *BatchWorker) JobChannel() chan<- model.Job {
return worker.jobs
}

// IsEnabled is always true for batches.
func (worker *BatchWorker[T]) IsEnabled(_ *model.Config) bool {
func (worker *BatchWorker) IsEnabled(_ *model.Config) bool {
return true
}

Expand All @@ -103,7 +100,7 @@ func (worker *BatchWorker[T]) IsEnabled(_ *model.Config) bool {
// Note that this is a lot of distracting machinery here to claim the job, then double check the
// status, and keep the status up to date in line with job infrastrcuture semantics. Unless an
// error occurs, this worker should hold onto the job until its completed.
func (worker *BatchWorker[T]) DoJob(job *model.Job) {
func (worker *BatchWorker) DoJob(job *model.Job) {
logger := worker.logger.With(mlog.Any("job", job))
logger.Debug("Worker received a new candidate job.")
defer worker.jobServer.HandleJobPanic(logger, job)
Expand Down Expand Up @@ -148,7 +145,7 @@ func (worker *BatchWorker[T]) DoJob(job *model.Job) {

// resetJob erases the data tracking the next batch to execute and returns the job status to
// pending to allow the job infrastructure to requeue it.
func (worker *BatchWorker[T]) resetJob(logger mlog.LoggerIFace, job *model.Job) {
func (worker *BatchWorker) resetJob(logger mlog.LoggerIFace, job *model.Job) {
job.Data = nil
job.Progress = 0
job.Status = model.JobStatusPending
Expand All @@ -159,7 +156,7 @@ func (worker *BatchWorker[T]) resetJob(logger mlog.LoggerIFace, job *model.Job)
}

// setJobSuccess records the job as successful.
func (worker *BatchWorker[T]) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) {
func (worker *BatchWorker) setJobSuccess(logger mlog.LoggerIFace, job *model.Job) {
if err := worker.jobServer.SetJobProgress(job, 100); err != nil {
logger.Error("Worker: Failed to update progress for job", mlog.Err(err))
worker.setJobError(logger, job, err)
Expand All @@ -172,7 +169,7 @@ func (worker *BatchWorker[T]) setJobSuccess(logger mlog.LoggerIFace, job *model.
}

// setJobError puts the job into an error state, preventing the job from running again.
func (worker *BatchWorker[T]) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) {
func (worker *BatchWorker) setJobError(logger mlog.LoggerIFace, job *model.Job, appError *model.AppError) {
if err := worker.jobServer.SetJobError(job, appError); err != nil {
logger.Error("Worker: Failed to set job error", mlog.Err(err))
}
Expand Down

0 comments on commit 3e521fe

Please sign in to comment.