-
Notifications
You must be signed in to change notification settings - Fork 9
/
testTypeHttp.py
105 lines (80 loc) · 2.86 KB
/
testTypeHttp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import shlex
import time
from dataclasses import dataclass
from ktoolbox import common
from ktoolbox import host
import task
import tftbase
from task import ClientTask
from task import ServerTask
from task import TaskOperation
from testSettings import TestSettings
from testType import TestTypeHandler
from tftbase import BaseOutput
from tftbase import FlowTestOutput
from tftbase import TestType
@dataclass(frozen=True)
class TestTypeHandlerHttp(TestTypeHandler):
def __init__(self) -> None:
super().__init__(TestType.HTTP)
def _create_server_client(self, ts: TestSettings) -> tuple[ServerTask, ClientTask]:
s = HttpServer(ts=ts)
c = HttpClient(ts=ts, server=s)
return (s, c)
TestTypeHandler.register_test_type(TestTypeHandlerHttp())
class HttpServer(task.ServerTask):
def cmd_line_args(self) -> list[str]:
return [
"python3",
"-m",
"http.server",
"-d",
"/etc/kubernetes-traffic-flow-tests",
f"{self.port}",
]
def get_template_args(self) -> dict[str, str | list[str]]:
extra_args: dict[str, str | list[str]] = {}
if self.exec_persistent:
extra_args["args"] = self.cmd_line_args()
return {
**super().get_template_args(),
**extra_args,
}
def _create_setup_operation_get_thread_action_cmd(self) -> str:
return shlex.join(self.cmd_line_args())
def _create_setup_operation_get_cancel_action_cmd(self) -> str:
return "killall python3"
class HttpClient(task.ClientTask):
def _create_task_operation(self) -> TaskOperation:
server_ip = self.get_target_ip()
cmd = f"curl --fail -s http://{server_ip}:{self.port}/data"
def _thread_action() -> BaseOutput:
self.ts.clmo_barrier.wait()
def _check_success(r: host.Result) -> bool:
return r.success and r.match(
out="kubernetes-traffic-flow-tests\n",
err="",
)
sleep_time = 0.2
end_timestamp = time.monotonic() + self.get_duration() - sleep_time
while True:
r = self.run_oc_exec(cmd)
if not _check_success(r):
break
if time.monotonic() >= end_timestamp:
break
time.sleep(sleep_time)
self.ts.event_client_finished.set()
return FlowTestOutput(
success=_check_success(r),
tft_metadata=self.ts.get_test_metadata(),
command=cmd,
result={
"result": common.dataclass_to_dict(r),
},
bitrate_gbps=tftbase.Bitrate.NA,
)
return TaskOperation(
log_name=self.log_name,
thread_action=_thread_action,
)