Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/pip/pyopenssl-23.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
cvetkovic authored Jul 3, 2023
2 parents 9fd93fc + 28d1def commit 8c3e9a8
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 76 deletions.
14 changes: 12 additions & 2 deletions pkg/driver/trace_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration,

return func() {
signalReady.Done()
clusterUsageRecords := make(chan interface{}, 100)
knStatRecords := make(chan interface{}, 100)
scaleRecords := make(chan interface{}, 100)
writerDone := sync.WaitGroup{}

clusterUsageFile, err := os.Create(d.outputFilename("cluster_usage"))
Expand All @@ -96,6 +96,9 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration,
writerDone.Add(1)
go d.runCSVWriter(knStatRecords, d.outputFilename("kn_stats"), &writerDone)

writerDone.Add(1)
go d.runCSVWriter(scaleRecords, d.outputFilename("deployment_scale"), &writerDone)

for {
select {
case <-timer.C:
Expand All @@ -111,12 +114,19 @@ func (d *Driver) CreateMetricsScrapper(interval time.Duration,
_, err = clusterUsageFile.WriteString("\n")
common.Check(err)

recScale := mc.ScrapeDeploymentScales()
timestamp := time.Now().UnixMicro()
for _, rec := range recScale {
rec.Timestamp = timestamp
scaleRecords <- rec
}

recKnative := mc.ScrapeKnStats()
recKnative.Timestamp = time.Now().UnixMicro()
knStatRecords <- recKnative
case <-finishCh:
close(clusterUsageRecords)
close(knStatRecords)
close(scaleRecords)

writerDone.Wait()
allRecordsWritten.Done()
Expand Down
20 changes: 3 additions & 17 deletions pkg/metric/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@ package metric
import (
"encoding/json"
"os/exec"
"time"

log "github.com/sirupsen/logrus"
)

func ScrapeDeploymentScales() []ScaleRecord {
cmd := exec.Command(
"python3",
"pkg/metric/scrape_scales.py",
)
func ScrapeDeploymentScales() []DeploymentScale {
cmd := exec.Command("python3", "pkg/metric/scrape_scales.py")
out, err := cmd.CombinedOutput()
if err != nil {
log.Warn("Fail to scrape deployment scales: ", err)
Expand All @@ -24,17 +20,7 @@ func ScrapeDeploymentScales() []ScaleRecord {
log.Warn("Fail to parse deployment scales: ", string(out[:]), err)
}

timestamp := time.Now().UnixMicro()
records := []ScaleRecord{}
for _, result := range results {
records = append(records, ScaleRecord{
Timestamp: timestamp,
Deployment: result.Deployment,
DesiredScale: result.DesiredScale,
ActualScale: result.ActualScale,
})
}
return records
return results
}

func ScrapeKnStats() KnStats {
Expand Down
19 changes: 8 additions & 11 deletions pkg/metric/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,15 @@ type ExecutionRecord struct {
ColdStartCount int `csv:"coldstart_count"`*/
}

type ScaleRecord struct {
Timestamp int64 `csv:"timestamp"`
Deployment string `csv:"deployment"`
DesiredScale int `csv:"desired_scale"`
ActualScale int `csv:"actual_scale"`
}

type DeploymentScale struct {
Timestamp int64 `csv:"timestamp"`
Deployment string `csv:"deployment"`
DesiredScale int `csv:"desired_scale"`
ActualScale int `csv:"actual_scale"`
Timestamp int64 `csv:"timestamp" json:"timestamp"`
Function string `csv:"function" json:"function"`
DesiredPods int `csv:"desired_pods" json:"desired_pods"`
RunningPods int `csv:"running_pods" json:"running_pods"`
UnreadyPods int `csv:"unready_pods" json:"unready_pods"`
PendingPods int `csv:"pending_pods" json:"pending_pods"`
TerminatingPods int `csv:"terminating_pods" json:"terminating_pods"`
ActivatorQueue float64 `csv:"activator_queue" json:"activator_queue"`
}

type KnStats struct {
Expand Down
38 changes: 0 additions & 38 deletions pkg/metric/scale_registry.go

This file was deleted.

36 changes: 28 additions & 8 deletions pkg/metric/scrape_scales.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
import json
import os

prometheus_ip = os.popen("kubectl get svc -n monitoring | grep prometheus-kube-prometheus-prometheus | awk '{print $3}'").read().strip().split('\n')[0]

def get_promql_query(query):
def promql_query():
return "tools/bin/promql --no-headers --host 'http://" + prometheus_ip + ":9090' '" + query + "' | grep . | awk '{print $1\" \"$2}'"
return promql_query

if __name__ == "__main__":
cmd = "kubectl get podautoscalers | awk '{print $1\" \"$2\" \"$3}'"
out = os.popen(cmd).read().strip().split('\n')
query_desired_pods = 'max(autoscaler_desired_pods) by(configuration_name)'
query_running_pods = 'max(autoscaler_actual_pods) by(configuration_name)'
query_unready_pods = 'max(autoscaler_not_ready_pods) by(configuration_name)'
query_pending_pods = 'max(autoscaler_pending_pods) by(configuration_name)'
query_terminating_pods = 'max(autoscaler_terminating_pods) by(configuration_name)'
query_activator_queue = 'max(activator_request_concurrency) by(configuration_name)'

desired_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_desired_pods)()).read().strip().split('\n')}
running_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_running_pods)()).read().strip().split('\n')}
unready_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_unready_pods)()).read().strip().split('\n')}
pending_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_pending_pods)()).read().strip().split('\n')}
terminating_pods_count = {x.split()[0]: int(x.split()[1]) for x in os.popen(get_promql_query(query_terminating_pods)()).read().strip().split('\n')}
queue_size = {x.split()[0]: float(x.split()[1]) for x in os.popen(get_promql_query(query_activator_queue)()).read().strip().split('\n')}

results = []
for line in out[1:]:
deployment, desired_scale, actual_scale = line.split()
for func in desired_pods_count.keys():
results.append({
# Cut of the deployment suffix as each function is only deployed once.
"deployment": '-'.join(deployment.split('-')[:-1]),
"desired_scale": int(desired_scale),
"actual_scale": int(actual_scale),
'function': func,
'desired_pods': desired_pods_count[func],
'running_pods': running_pods_count[func],
'unready_pods': unready_pods_count[func],
'pending_pods': pending_pods_count[func],
'terminating_pods': terminating_pods_count[func],
'activator_queue': queue_size.get(func, 0)
})

print(json.dumps(results))
7 changes: 7 additions & 0 deletions scripts/setup/create_multinode.sh
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,13 @@ function clone_loader_on_workers() {

# Notify the master that all nodes have joined the cluster
server_exec $MASTER_NODE 'tmux send -t master "y" ENTER'

namespace_info=$(server_exec $MASTER_NODE "kubectl get namespaces")
while [[ ${namespace_info} != *'knative-serving'* ]]; do
sleep 60
namespace_info=$(server_exec $MASTER_NODE "kubectl get namespaces")
done

echo "Master node $MASTER_NODE finalised."

# Copy API server certificates from master to each worker node
Expand Down

0 comments on commit 8c3e9a8

Please sign in to comment.