Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to the panic due to the send on a closed channel #2824

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
33 changes: 27 additions & 6 deletions ste/jobStatusManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,28 @@ func (jm *jobMgr) statusMgrClosed() bool {

/* These functions should not fail */
func (jm *jobMgr) SendJobPartCreatedMsg(msg JobPartCreatedMsg) {
jm.jstm.partCreated <- msg
if msg.IsFinalPart {
// Inform statusManager that this is all parts we've
close(jm.jstm.partCreated)
defer func() {
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
gapra-msft marked this conversation as resolved.
Show resolved Hide resolved
if recErr := recover(); recErr != nil {
jm.Log(common.LogError, "Cannot send message on closed channel")
}
}()
if jm.jstm.partCreated != nil { // Sends not allowed if channel is closed
jm.jstm.partCreated <- msg

if msg.IsFinalPart {
// Inform statusManager that this is all parts we've
close(jm.jstm.partCreated)
jm.jstm.partCreated = nil
}
}
}

func (jm *jobMgr) SendXferDoneMsg(msg xferDoneMsg) {
defer func() {
if recErr := recover(); recErr != nil {
jm.Log(common.LogError, "Cannot send message on channel")
}
}()
jm.jstm.xferDone <- msg
}

Expand Down Expand Up @@ -155,8 +169,14 @@ func (jm *jobMgr) handleStatusUpdateMessage() {
case <-jstm.listReq:
/* Display stats */
js.Timestamp = time.Now().UTC()
jstm.respChan <- *js

if jstm.respChan != nil {
jstm.respChan <- *js // Send on the channel
defer func() { // Exit gracefully if panic
if recErr := recover(); recErr != nil {
jm.Log(common.LogError, "Cannot send message on respChan")
}
}()
}
// Reset the lists so that they don't keep accumulating and take up excessive memory
// There is no need to keep sending the same items over and over again
js.FailedTransfers = []common.TransferDetail{}
Expand All @@ -168,6 +188,7 @@ func (jm *jobMgr) handleStatusUpdateMessage() {
close(jstm.listReq)
jstm.listReq = nil
jstm.respChan = nil
jstm.statusMgrDone = nil
return
}
}
Expand Down
7 changes: 4 additions & 3 deletions ste/mgr-JobMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ type AddJobPartArgs struct {

// These clients are valid if this fits the FromTo. i.e if
// we're uploading
SrcClient *common.ServiceClient
DstClient *common.ServiceClient
SrcClient *common.ServiceClient
DstClient *common.ServiceClient
SrcIsOAuth bool // true if source is authenticated via token

ScheduleTransfers bool
Expand All @@ -446,7 +446,7 @@ func (jm *jobMgr) AddJobPart2(args *AddJobPartArgs) IJobPartMgr {
cacheLimiter: jm.cacheLimiter,
fileCountLimiter: jm.fileCountLimiter,
closeOnCompletion: args.CompletionChan,
srcIsOAuth: args.SrcIsOAuth,
srcIsOAuth: args.SrcIsOAuth,
}
// If an existing plan MMF was supplied, re use it. Otherwise, init a new one.
if args.ExistingPlanMMF == nil {
Expand Down Expand Up @@ -716,6 +716,7 @@ func (jm *jobMgr) reportJobPartDoneHandler() {
if shouldComplete {
// Inform StatusManager that all parts are done.
close(jm.jstm.xferDone)

// Wait for all XferDone messages to be processed by statusManager. Front end
// depends on JobStatus to determine if we've to quit job. Setting it here without
// draining XferDone will make it report incorrect statistics.
Expand Down
Loading