Skip to content

Commit

Permalink
Merge remote-tracking branch 'richtja/status_server_socket_error'
Browse files Browse the repository at this point in the history
Signed-off-by: Cleber Rosa <[email protected]>
  • Loading branch information
clebergnu committed Jan 31, 2024
2 parents e2bed32 + 2b23b11 commit 66e0980
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 19 deletions.
64 changes: 46 additions & 18 deletions avocado/core/nrunner/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,46 @@ class TaskStatusService:

def __init__(self, uri):
self.uri = uri
self.connection = None
self._connection = None

def post(self, status):
@property
def connection(self):
if not self._connection:
self._create_connection()
return self._connection

def _create_connection(self):
"""
Creates connection with `self.uri` based on `socket.create_connection`
"""
if ":" in self.uri:
host, port = self.uri.split(":")
port = int(port)
if self.connection is None:
for _ in range(600):
try:
self.connection = socket.create_connection((host, port))
break
except ConnectionRefusedError as error:
LOG.warning(error)
time.sleep(1)
else:
self.connection = socket.create_connection((host, port))
for _ in range(600):
try:
self._connection = socket.create_connection((host, port))
break
except ConnectionRefusedError as error:
LOG.warning(error)
time.sleep(1)
else:
self._connection = socket.create_connection((host, port))
else:
if self.connection is None:
self.connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.connection.connect(self.uri)
self._connection = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._connection.connect(self.uri)

def post(self, status):
data = json_dumps(status)
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
try:
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except BrokenPipeError:
try:
self._create_connection()
self.connection.send(data.encode("ascii") + "\n".encode("ascii"))
except ConnectionRefusedError:
LOG.warning(f"Connection with {self.uri} has been lost.")
return False
return True

def close(self):
if self.connection is not None:
Expand Down Expand Up @@ -203,12 +220,23 @@ def run(self):
self.setup_output_dir()
runner_klass = self.runnable.pick_runner_class()
runner = runner_klass()
running_status_services = self.status_services
damaged_status_services = []
for status in runner.run(self.runnable):
if status["status"] == "started":
status.update({"output_dir": self.runnable.output_dir})
status.update({"id": self.identifier})
if self.job_id is not None:
status.update({"job_id": self.job_id})
for status_service in self.status_services:
status_service.post(status)
for status_service in running_status_services:
if not status_service.post(status):
damaged_status_services.append(status_service)
if damaged_status_services:
running_status_services = list(
filter(
lambda s: s not in damaged_status_services,
running_status_services,
)
)
damaged_status_services.clear()
yield status
2 changes: 1 addition & 1 deletion selftests/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"nrunner-requirement": 16,
"unit": 667,
"jobs": 11,
"functional-parallel": 299,
"functional-parallel": 300,
"functional-serial": 4,
"optional-plugins": 0,
"optional-plugins-golang": 2,
Expand Down
20 changes: 20 additions & 0 deletions selftests/functional/nrunner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import time
import unittest

from avocado.core.job import Job
Expand Down Expand Up @@ -265,6 +266,25 @@ def test_recipe_exec_test_3(self):
self.assertEqual(res.exit_status, 0)


class TaskRunStatusService(TestCaseTmpDir):
@skipUnlessPathExists("/bin/sleep")
@skipUnlessPathExists("/bin/nc")
def test_task_status_service_lost(self):
nc_path = os.path.join(self.tmpdir.name, "socket")
nc_proc = process.SubProcess(f"nc -lU {nc_path}")
nc_proc.start()
task_proc = process.SubProcess(
f"avocado-runner-exec-test task-run -i 1 -u /bin/sleep -a 3 -s {nc_path}"
)
task_proc.start()
time.sleep(1)
nc_proc.kill()
time.sleep(1)
self.assertIn(
f"Connection with {nc_path} has been lost.".encode(), task_proc.get_stderr()
)


class ResolveSerializeRun(TestCaseTmpDir):
@skipUnlessPathExists("/bin/true")
def test(self):
Expand Down

0 comments on commit 66e0980

Please sign in to comment.