Skip to content

Commit

Permalink
Merge pull request #819 from Project-MONAI/AC-2253
Browse files Browse the repository at this point in the history
AC-2253 Changes to fix execution stats
  • Loading branch information
woodheadio authored Jun 5, 2023
2 parents 9439e8c + 7575af4 commit d819b68
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public async Task<ActionResult<WorkflowTemplate>> CreateArgoTemplate()

if (string.IsNullOrWhiteSpace(value2))
{
return BadRequest("No file recieved");
return BadRequest("No file received");
}
WorkflowTemplate? workflowTemplate = null;
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Contracts.Models;
Expand All @@ -27,54 +28,60 @@ public interface ITaskExecutionStatsRepository
/// <summary>
/// Creates a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to create.</param>
/// <param name="TaskExecutionInfo"></param>
/// <param name="workflowId">workflow id.</param>
/// <param name="correlationId">task id.</param>
/// <returns></returns>
Task CreateAsync(TaskExecution TaskExecutionInfo, string workflowId, string correlationId);

/// <summary>
/// Updates status of a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
/// <param name="taskUpdateEvent"></param>
/// <param name="workflowId">workflow id.</param>
/// <param name="status">task id.</param>
/// <returns></returns>
Task UpdateExecutionStatsAsync(TaskExecution taskUpdateEvent, string workflowId, TaskExecutionStatus? status = null);

/// <summary>
/// Updates status of a task now its been canceled.
/// </summary>
/// <param name="TaskCanceledException">A TaskCanceledException to update.</param>
/// <returns></returns
/// <param name="taskCanceledEvent">A TaskCanceledException to update.</param>
/// <param name="workflowId">workflow id.</param>
/// <param name="correlationId">task id.</param>
/// <returns></returns>
Task UpdateExecutionStatsAsync(TaskCancellationEvent taskCanceledEvent, string workflowId, string correlationId);

/// <summary>
/// Returns paged entries between the two given dates.
/// Returns paged entries between the two given dates
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="PageSize"></param>
/// <param name="PageNumber"></param>
/// <param name="workflowId">optional workflow id.</param>
/// <param name="taskId">optional task id.</param>
/// <returns>a collections of stats</returns>
Task<IEnumerable<ExecutionStats>> GetStatsAsync(DateTime startTime, DateTime endTime, int PageSize = 10, int PageNumber = 1, string workflowId = "", string taskId = "");

/// <summary>
/// Return the total number of stats between the dates
/// Return the count of the entries with this status, or all if no status given.
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>The count of all records in range</returns>
//Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");

/// <summary>
/// Return the count of the entries with this status, or all if no status given
/// </summary>
/// <param name="start">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="status">the status to get count of, or string.empty</param>
/// <param name="workflowId">optional workflow id.</param>
/// <param name="taskId">optional task id.</param>
/// <returns>The count of all records in range</returns>
Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowId = "", string taskId = "");
Task<long> GetStatsStatusCountAsync(DateTime startTime, DateTime endTime, string status = "", string workflowId = "", string taskId = "");

/// <summary>
/// Returns all stats in Succeeded status.
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="workflowId">optional workflow id.</param>
/// <param name="taskId">optional task id.</param>
/// <returns>All stats that succeeded</returns>
Task<long> GetStatsStatusSucceededCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");

Expand All @@ -83,16 +90,43 @@ public interface ITaskExecutionStatsRepository
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="workflowId">optional workflow id.</param>
/// <param name="taskId">optional task id.</param>
/// <returns>All stats that failed or partially failed</returns>
Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");

/// <summary>
/// Calculates the average exection time for the given range
/// Returns total ran executions status that have ran to completion. (not dispatched, created, accepted)
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="workflowId">optional workflow id.</param>
/// <param name="taskId">optional task id.</param>
/// <returns>All stats that failed or partially failed</returns>
Task<long> GetStatsTotalCompleteExecutionsCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");


/// <summary>
/// Calculates the average execution time for the given range
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <returns>the average exection times in the time range</returns>
/// <param name="workflowId">optional workflow id.</param>
/// <param name="taskId">optional task id.</param>
/// <returns>the average execution times in the time range</returns>
Task<(double avgTotalExecution, double avgArgoExecution)> GetAverageStats(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "");


/// <summary>
/// Return the total number of stats between the dates with optional status filter.
/// </summary>
/// <param name="startTime">start of the range.</param>
/// <param name="endTime">end of the range.</param>
/// <param name="statusFilter"></param>
/// <param name="workflowId">optional workflow id.</param>
/// <param name="taskId">optional task id.</param>
/// <returns>The count of all records in range</returns>
/// <summary>
Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, Expression<Func<ExecutionStats, bool>>? statusFilter = null, string workflowId = "", string taskId = "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -159,14 +160,7 @@ public async Task<IEnumerable<ExecutionStats>> GetStatsAsync(DateTime startTime,
T.StartedUTC >= startTime &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowNull || T.WorkflowId == workflowId) &&
(taskIdNull || T.TaskId == taskId)
//&&
//(
// T.Status == TaskExecutionStatus.Succeeded.ToString()
// || T.Status == TaskExecutionStatus.Failed.ToString()
// || T.Status == TaskExecutionStatus.PartialFail.ToString()
// )
)
(taskIdNull || T.TaskId == taskId))
.Limit(PageSize)
.Skip((PageNumber - 1) * PageSize)
.ToListAsync();
Expand All @@ -187,66 +181,86 @@ private static ExecutionStats ExposeExecutionStats(ExecutionStats taskExecutionS
var statKeys = taskUpdateEvent.ExecutionStats.Keys.Where(v => v.StartsWith("podStartTime") || v.StartsWith("podFinishTime"));
if (statKeys.Any())
{
var start = DateTime.Now;
var end = new DateTime();
foreach (var statKey in statKeys)
{
if (statKey.Contains("StartTime") && DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var startTime))
{
start = (startTime < start ? startTime : start);
}
else if (DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var endTime))
{
end = (endTime > end ? endTime : start);
}
}
taskExecutionStats.ExecutionTimeSeconds = (end - start).TotalMilliseconds / 1000;
CalculatePodExecutionTime(taskExecutionStats, taskUpdateEvent, statKeys);
}
}
return taskExecutionStats;
}

public async Task<long> GetStatsStatusCountAsync(DateTime start, DateTime endTime, string status = "", string workflowId = "", string taskId = "")
/// <summary>
/// Calculates and sets ExecutionStats ExecutionTimeSeconds
/// </summary>
/// <param name="taskExecutionStats"></param>
/// <param name="taskUpdateEvent"></param>
/// <param name="statKeys"></param>
private static void CalculatePodExecutionTime(ExecutionStats taskExecutionStats, TaskExecution taskUpdateEvent, IEnumerable<string> statKeys)
{
var statusNull = string.IsNullOrWhiteSpace(status);
var workflowNull = string.IsNullOrWhiteSpace(workflowId);
var taskIdNull = string.IsNullOrWhiteSpace(taskId);
var start = DateTime.Now;
var end = new DateTime();
foreach (var statKey in statKeys)
{
if (statKey.Contains("StartTime") && DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var startTime))
{
start = (startTime < start ? startTime : start);
}
else if (DateTime.TryParse(taskUpdateEvent.ExecutionStats[statKey], out var endTime))
{
end = (endTime > end ? endTime : start);
}
}
taskExecutionStats.ExecutionTimeSeconds = (end - start).TotalMilliseconds / 1000;
}

return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
T.StartedUTC >= start.ToUniversalTime() &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowNull || T.WorkflowId == workflowId) &&
(taskIdNull || T.TaskId == taskId) &&
(statusNull || T.Status == status));
public async Task<long> GetStatsStatusCountAsync(DateTime startTime, DateTime endTime, string status = "", string workflowId = "", string taskId = "")
{
Expression<Func<ExecutionStats, bool>>? statusFilter = null;
if (!string.IsNullOrWhiteSpace(status))
{
statusFilter = t => t.Status == status;
}
return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
}

public async Task<long> GetStatsStatusSucceededCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
public async Task<long> GetStatsCountAsync(DateTime startTime, DateTime endTime, Expression<Func<ExecutionStats, bool>>? statusFilter = null, string workflowId = "", string taskId = "")
{
var workflowNull = string.IsNullOrWhiteSpace(workflowId);
var taskIdNull = string.IsNullOrWhiteSpace(taskId);

return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
T.StartedUTC >= startTime.ToUniversalTime() &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowNull || T.WorkflowId == workflowId) &&
(taskIdNull || T.TaskId == taskId) &&
T.Status == TaskExecutionStatus.Succeeded.ToString());
var builder = Builders<ExecutionStats>.Filter;
var filter = builder.Empty;

filter &= builder.Where(t => t.StartedUTC >= startTime.ToUniversalTime());
filter &= builder.Where(t => t.StartedUTC <= endTime.ToUniversalTime());
filter &= builder.Where(t => workflowNull || t.WorkflowId == workflowId);
filter &= builder.Where(t => taskIdNull || t.TaskId == taskId);
if (statusFilter is not null)
{
filter &= builder.Where(statusFilter);
}

return await _taskExecutionStatsCollection.CountDocumentsAsync(filter);
}

public async Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
public async Task<long> GetStatsTotalCompleteExecutionsCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
{
var workflowNull = string.IsNullOrWhiteSpace(workflowId);
var taskIdNull = string.IsNullOrWhiteSpace(taskId);
var dispatched = TaskExecutionStatus.Dispatched.ToString();
var created = TaskExecutionStatus.Created.ToString();
var accepted = TaskExecutionStatus.Accepted.ToString();
Expression<Func<ExecutionStats, bool>> statusFilter = t => t.Status != dispatched && t.Status != created && t.Status != accepted;

return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
}

return await _taskExecutionStatsCollection.CountDocumentsAsync(T =>
T.StartedUTC >= startTime.ToUniversalTime() &&
T.StartedUTC <= endTime.ToUniversalTime() &&
(workflowNull || T.WorkflowId == workflowId) &&
(taskIdNull || T.TaskId == taskId) &&
(
T.Status == TaskExecutionStatus.Failed.ToString() ||
T.Status == TaskExecutionStatus.PartialFail.ToString()
));
public async Task<long> GetStatsStatusSucceededCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
{
Expression<Func<ExecutionStats, bool>> statusFilter = t => t.Status == TaskExecutionStatus.Succeeded.ToString();
return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
}

public async Task<long> GetStatsStatusFailedCountAsync(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
{
Expression<Func<ExecutionStats, bool>> statusFilter = t => t.Status == TaskExecutionStatus.Failed.ToString() || t.Status == TaskExecutionStatus.PartialFail.ToString();
return await GetStatsCountAsync(startTime, endTime, statusFilter, workflowId, taskId);
}

public async Task<(double avgTotalExecution, double avgArgoExecution)> GetAverageStats(DateTime startTime, DateTime endTime, string workflowId = "", string taskId = "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, st
{
// both not empty but one is !
_logger.LogDebug($"{nameof(GetStatsAsync)} - Failed to validate WorkflowId or TaskId");
return Problem($"Failed to validate ids, not a valid guid", $"tasks/stats/", BadRequest);
return Problem("Failed to validate ids, not a valid guid", "tasks/stats/", BadRequest);
}

if (filter.EndTime == default)
Expand All @@ -150,12 +150,19 @@ public async Task<IActionResult> GetStatsAsync([FromQuery] TimeFilter filter, st

try
{
var allStats = _repository.GetStatsAsync(filter.StartTime, filter.EndTime, pageSize, filter.PageNumber, workflowId ?? string.Empty, taskId ?? string.Empty);
var successes = _repository.GetStatsStatusSucceededCountAsync(filter.StartTime, filter.EndTime, workflowId ?? string.Empty, taskId ?? string.Empty);
var fails = _repository.GetStatsStatusFailedCountAsync(filter.StartTime, filter.EndTime, workflowId ?? string.Empty, taskId ?? string.Empty);
var rangeCount = _repository.GetStatsStatusCountAsync(filter.StartTime, filter.EndTime, string.Empty, workflowId ?? string.Empty, taskId ?? string.Empty);
var stats = _repository.GetAverageStats(filter.StartTime, filter.EndTime, workflowId ?? string.Empty, taskId ?? string.Empty);
var running = _repository.GetStatsStatusCountAsync(filter.StartTime, filter.EndTime, TaskExecutionStatus.Accepted.ToString());
workflowId ??= string.Empty;
taskId ??= string.Empty;
var allStats = _repository.GetStatsAsync(filter.StartTime, filter.EndTime, pageSize, filter.PageNumber, workflowId, taskId);

var successes = _repository.GetStatsStatusSucceededCountAsync(filter.StartTime, filter.EndTime, workflowId, taskId);

var fails = _repository.GetStatsStatusFailedCountAsync(filter.StartTime, filter.EndTime, workflowId, taskId);

var rangeCount = _repository.GetStatsTotalCompleteExecutionsCountAsync(filter.StartTime, filter.EndTime, workflowId, taskId);

var stats = _repository.GetAverageStats(filter.StartTime, filter.EndTime, workflowId, taskId);

var running = _repository.GetStatsStatusCountAsync(filter.StartTime, filter.EndTime, TaskExecutionStatus.Accepted.ToString(), workflowId, taskId);

await Task.WhenAll(allStats, fails, rangeCount, stats, running);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="fo-dicom" Version="5.0.3" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="Moq" Version="4.18.4" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
Expand Down
Loading

0 comments on commit d819b68

Please sign in to comment.