diff --git a/pkg/driver/trace_driver.go b/pkg/driver/trace_driver.go index 94d7c690..202c282b 100644 --- a/pkg/driver/trace_driver.go +++ b/pkg/driver/trace_driver.go @@ -85,8 +85,8 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration, return func() { signalReady.Done() - clusterUsageRecords := make(chan interface{}, 100) knStatRecords := make(chan interface{}, 100) + scaleRecords := make(chan interface{}, 100) writerDone := sync.WaitGroup{} clusterUsageFile, err := os.Create(d.outputFilename("cluster_usage")) @@ -96,6 +96,9 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration, writerDone.Add(1) go d.runCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone) + writerDone.Add(1) + go d.runCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone) + for { select { case <-timer.C: @@ -111,12 +114,19 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration, _, err = clusterUsageFile.WriteString("\n") common.Check(err) + recScale := mc.ScrapeDeploymentScales() + timestamp := time.Now().UnixMicro() + for _, rec := range recScale { + rec.Timestamp = timestamp + scaleRecords <- rec + } + recKnative := mc.ScrapeKnStats() recKnative.Timestamp = time.Now().UnixMicro() knStatRecords <- recKnative case <-finishCh: - close(clusterUsageRecords) close(knStatRecords) + close(scaleRecords) writerDone.Wait() allRecordsWritten.Done() diff --git a/pkg/metric/collect.go b/pkg/metric/collect.go index 567e366d..7d89ce57 100644 --- a/pkg/metric/collect.go +++ b/pkg/metric/collect.go @@ -3,16 +3,12 @@ package metric import ( "encoding/json" "os/exec" - "time" log "github.com/sirupsen/logrus" ) -func ScrapeDeploymentScales() []ScaleRecord { - cmd := exec.Command( - "python3", - "pkg/metric/scrape_scales.py", - ) +func ScrapeDeploymentScales() []DeploymentScale { + cmd := exec.Command("python3", "pkg/metric/scrape_scales.py") out, err := cmd.CombinedOutput() if err != nil { log.Warn("Fail to scrape deployment scales: ", err) @@ -24,17 +20,7 @@ func ScrapeDeploymentScales() []ScaleRecord { log.Warn("Fail to parse deployment scales: ", string(out[:]), err) } - timestamp := time.Now().UnixMicro() - records := []ScaleRecord{} - for _, result := range results { - records = append(records, ScaleRecord{ - Timestamp: timestamp, - Deployment: result.Deployment, - DesiredScale: result.DesiredScale, - ActualScale: result.ActualScale, - }) - } - return records + return results } func ScrapeKnStats() KnStats { diff --git a/pkg/metric/record.go b/pkg/metric/record.go index d38c318f..145e1123 100644 --- a/pkg/metric/record.go +++ b/pkg/metric/record.go @@ -86,18 +86,15 @@ type ExecutionRecord struct { ColdStartCount int `csv:"coldstart_count"`*/ } -type ScaleRecord struct { - Timestamp int64 `csv:"timestamp"` - Deployment string `csv:"deployment"` - DesiredScale int `csv:"desired_scale"` - ActualScale int `csv:"actual_scale"` -} - type DeploymentScale struct { - Timestamp int64 `csv:"timestamp"` - Deployment string `csv:"deployment"` - DesiredScale int `csv:"desired_scale"` - ActualScale int `csv:"actual_scale"` + Timestamp int64 `csv:"timestamp" json:"timestamp"` + Function string `csv:"function" json:"function"` + DesiredPods int `csv:"desired_pods" json:"desired_pods"` + RunningPods int `csv:"running_pods" json:"running_pods"` + UnreadyPods int `csv:"unready_pods" json:"unready_pods"` + PendingPods int `csv:"pending_pods" json:"pending_pods"` + TerminatingPods int `csv:"terminating_pods" json:"terminating_pods"` + ActivatorQueue float64 `csv:"activator_queue" json:"activator_queue"` } type KnStats struct { diff --git a/pkg/metric/scale_registry.go b/pkg/metric/scale_registry.go deleted file mode 100644 index febfff60..00000000 --- a/pkg/metric/scale_registry.go +++ /dev/null @@ -1,38 +0,0 @@ -package metric - -type ScaleRegistry struct { - scaleGauge map[string]int -} - -func (r *ScaleRegistry) Init(records []ScaleRecord) { - r.scaleGauge = map[string]int{} - for _, record := range records { - r.scaleGauge[record.Deployment] = record.ActualScale - } -} - -//! Since all functions are deployed once, we assume no duplications. -func (r *ScaleRegistry) UpdateAndGetColdStartCount(records []ScaleRecord) int { - coldStarts := 0 - for _, record := range records { - prevScale := r.scaleGauge[record.Deployment] - currScale := record.ActualScale - - //* Check if it's scaling from 0. - if prevScale == 0 && currScale > 0 { - coldStarts++ - } - //* Update registry. - r.scaleGauge[record.Deployment] = currScale - } - return coldStarts -} - -func (r *ScaleRegistry) GetOneColdFunctionName() string { - for f, scale := range r.scaleGauge { - if scale == 0 { - return f - } - } - return "None" -} diff --git a/pkg/metric/scrape_scales.py b/pkg/metric/scrape_scales.py index 9e5c7fa2..4e457d0c 100644 --- a/pkg/metric/scrape_scales.py +++ b/pkg/metric/scrape_scales.py @@ -1,18 +1,38 @@ import json import os +prometheus_ip = os.popen("kubectl get svc -n monitoring | grep prometheus-kube-prometheus-prometheus | awk '{print $3}'").read().strip().split('\n')[0] + +def get_promql_query(query): + def promql_query(): + return "tools/bin/promql --no-headers --host 'http://" + prometheus_ip + ":9090' '" + query + "' | grep . | awk '{print $1\" \"$2}'" + return promql_query + if __name__ == "__main__": - cmd = "kubectl get podautoscalers | awk '{print $1\" \"$2\" \"$3}'" - out = os.popen(cmd).read().strip().split('\n') + query_desired_pods = 'max(autoscaler_desired_pods) by(configuration_name)' + query_running_pods = 'max(autoscaler_actual_pods) by(configuration_name)' + query_unready_pods = 'max(autoscaler_not_ready_pods) by(configuration_name)' + query_pending_pods = 'max(autoscaler_pending_pods) by(configuration_name)' + query_terminating_pods = 'max(autoscaler_terminating_pods) by(configuration_name)' + query_activator_queue = 'max(activator_request_concurrency) by(configuration_name)' + + desired_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_desired_pods)()).read().strip().split('\n')} + running_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_running_pods)()).read().strip().split('\n')} + unready_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_unready_pods)()).read().strip().split('\n')} + pending_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_pending_pods)()).read().strip().split('\n')} + terminating_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_terminating_pods)()).read().strip().split('\n')} + queue_size = {x.split()[0]: float(x.split()[1]) for x in os.popen(get_promql_query(query_activator_queue)()).read().strip().split('\n')} results = [] - for line in out[1:]: - deployment, desired_scale, actual_scale = line.split() + for func in desired_pods_count.keys(): results.append({ - # Cut of the deployment suffix as each function is only deployed once. - "deployment": '-'.join(deployment.split('-')[:-1]), - "desired_scale": int(desired_scale), - "actual_scale": int(actual_scale), + 'function': func, + 'desired_pods': desired_pods_count[func], + 'running_pods': running_pods_count[func], + 'unready_pods': unready_pods_count[func], + 'pending_pods': pending_pods_count[func], + 'terminating_pods': terminating_pods_count[func], + 'activator_queue': queue_size.get(func, 0) }) print(json.dumps(results)) diff --git a/scripts/setup/create_multinode.sh b/scripts/setup/create_multinode.sh index 96b71a53..40a38841 100755 --- a/scripts/setup/create_multinode.sh +++ b/scripts/setup/create_multinode.sh @@ -249,6 +249,13 @@ function clone_loader_on_workers() { # Notify the master that all nodes have joined the cluster server_exec $MASTER_NODE 'tmux send -t master "y" ENTER' + + namespace_info=$(server_exec $MASTER_NODE "kubectl get namespaces") + while [[ ${namespace_info} != *'knative-serving'* ]]; do + sleep 60 + namespace_info=$(server_exec $MASTER_NODE "kubectl get namespaces") + done + echo "Master node $MASTER_NODE finalised." # Copy API server certificates from master to each worker node