From 31b70636c3e895ab9d31cf5dc933a41f4567da1f Mon Sep 17 00:00:00 2001 From: Dhruv Batheja Date: Sat, 11 Feb 2023 08:48:55 +0100 Subject: [PATCH] Make function not access outer-state (#265) Co-authored-by: Filipe Regadas --- pkg/flink/handler.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pkg/flink/handler.go b/pkg/flink/handler.go index 8cdec36..4b467c6 100644 --- a/pkg/flink/handler.go +++ b/pkg/flink/handler.go @@ -158,10 +158,14 @@ func (h flinkResourceHandler) OnAbort(ctx context.Context, tCtx pluginsCore.Task return abortBehavior, nil } -func FlinkClusterTaskLogs(ctx context.Context, flinkClusterName string, flinkClusterNamespace string) ([]*core.TaskLog, error) { +type FlinkTaskLogsInput struct { + ClusterName string + Namespace string +} + +func FlinkClusterTaskLogs(ctx context.Context, config *Config, fi FlinkTaskLogsInput) ([]*core.TaskLog, error) { var taskLogs []*core.TaskLog - config := GetFlinkConfig() p, err := logs.InitializeLogPlugins(&config.LogConfig) if err != nil { return nil, err @@ -172,8 +176,8 @@ func FlinkClusterTaskLogs(ctx context.Context, flinkClusterName string, flinkClu } jobLog, err := p.GetTaskLogs(tasklog.Input{ - PodName: flinkClusterName, - Namespace: flinkClusterNamespace, + PodName: fi.ClusterName, + Namespace: fi.Namespace, LogName: "(Job)", }) if err != nil { @@ -187,7 +191,10 @@ func FlinkClusterTaskLogs(ctx context.Context, flinkClusterName string, flinkClu func flinkClusterTaskInfo(ctx context.Context, flinkCluster *flinkOp.FlinkCluster) (*pluginsCore.TaskInfo, error) { var taskLogs []*core.TaskLog - tl, err := FlinkClusterTaskLogs(ctx, flinkCluster.Name, flinkCluster.Namespace) + tl, err := FlinkClusterTaskLogs(ctx, GetFlinkConfig(), FlinkTaskLogsInput{ + ClusterName: flinkCluster.Name, + Namespace: flinkCluster.Namespace, + }) if err != nil { return nil, err }