Skip to content

Commit

Permalink
Fixed an issue when the timeline update job stops working if network …
Browse files Browse the repository at this point in the history
…exception occurs (#5022)
  • Loading branch information
DenisNikulin5 authored Oct 31, 2024
1 parent f129ebf commit 31298ce
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
7 changes: 6 additions & 1 deletion src/Agent.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,12 @@ private async Task ShutdownQueue(bool throwOnFailure)
}
catch (AggregateException ex)
{
ExceptionsUtil.HandleAggregateException((AggregateException)ex, Trace.Error);
ExceptionsUtil.HandleAggregateException(ex, Trace.Error);

if (throwOnFailure)
{
throw;
}
}
catch (Exception ex) when (!throwOnFailure)
{
Expand Down
50 changes: 23 additions & 27 deletions src/Microsoft.VisualStudio.Services.Agent/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,13 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
bool shouldDrain = ForceDrainTimelineQueue;

List<PendingTimelineRecord> pendingUpdates = new List<PendingTimelineRecord>();
var pendingUpdates = new List<PendingTimelineRecord>();
foreach (var timeline in _allTimelines)
{
ConcurrentQueue<TimelineRecord> recordQueue;
if (_timelineUpdateQueue.TryGetValue(timeline, out recordQueue))
if (_timelineUpdateQueue.TryGetValue(timeline, out ConcurrentQueue<TimelineRecord> recordQueue))
{
List<TimelineRecord> records = new List<TimelineRecord>();
TimelineRecord record;
while (recordQueue.TryDequeue(out record))
var records = new List<TimelineRecord>();
while (recordQueue.TryDequeue(out TimelineRecord record))
{
records.Add(record);
// process at most 25 timeline records update for each timeline.
Expand All @@ -470,8 +468,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
foreach (var update in pendingUpdates)
{
List<TimelineRecord> bufferedRecords;
if (_bufferedRetryRecords.TryGetValue(update.TimelineId, out bufferedRecords))
if (_bufferedRetryRecords.TryGetValue(update.TimelineId, out List<TimelineRecord> bufferedRecords))
{
update.PendingRecords.InsertRange(0, bufferedRecords);
}
Expand All @@ -484,7 +481,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
try
{
Timeline newTimeline = await _jobServer.CreateTimelineAsync(_scopeIdentifier, _hubName, _planId, detailTimeline.Details.Id, default(CancellationToken));
Timeline newTimeline = await _jobServer.CreateTimelineAsync(_scopeIdentifier, _hubName, _planId, detailTimeline.Details.Id, CancellationToken.None);
_allTimelines.Add(newTimeline.Id);
pendingSubtimelineUpdate = true;
}
Expand All @@ -502,7 +499,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)

try
{
await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, default(CancellationToken));
await _jobServer.UpdateTimelineRecordsAsync(_scopeIdentifier, _hubName, _planId, update.TimelineId, update.PendingRecords, CancellationToken.None);
if (_bufferedRetryRecords.Remove(update.TimelineId))
{
Trace.Verbose("Cleanup buffered timeline record for timeline: {0}.", update.TimelineId);
Expand All @@ -512,7 +509,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
Trace.Info("Catch exception during update timeline records, try to update these timeline records next time.");
Trace.Error(ex);
_bufferedRetryRecords[update.TimelineId] = update.PendingRecords.ToList();
_bufferedRetryRecords[update.TimelineId] = update.PendingRecords;
if (update.TimelineId == _jobTimelineId)
{
mainTimelineRecordsUpdateErrors.Add(ex);
Expand All @@ -532,26 +529,25 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
}
else
{
if (mainTimelineRecordsUpdateErrors.Count > 0 &&
if (ForceDrainTimelineQueue)
{
ForceDrainTimelineQueue = false;
}
}
}

if (runOnce)
{
if (mainTimelineRecordsUpdateErrors.Count > 0 &&
_bufferedRetryRecords.ContainsKey(_jobTimelineId) &&
_bufferedRetryRecords[_jobTimelineId] != null &&
_bufferedRetryRecords[_jobTimelineId].Any(r => r.Variables.Count > 0))
{
Trace.Info("Fail to update timeline records with output variables. Throw exception to fail the job since output variables are critical to downstream jobs.");
throw new AggregateException(StringUtil.Loc("OutputVariablePublishFailed"), mainTimelineRecordsUpdateErrors);
}
else
{
if (ForceDrainTimelineQueue)
{
ForceDrainTimelineQueue = false;
}
if (runOnce)
{
break;
}
}
{
Trace.Info("Fail to update timeline records with output variables. Throw exception to fail the job since output variables are critical to downstream jobs.");
throw new AggregateException(StringUtil.Loc("OutputVariablePublishFailed"), mainTimelineRecordsUpdateErrors);
}

break;
}

await Task.Delay(_delayForTimelineUpdateDequeue);
Expand Down

0 comments on commit 31298ce

Please sign in to comment.