Skip to content

Commit

Permalink
Merge pull request #2252 from phw/pipe-reintroduce-thread-pool
Browse files Browse the repository at this point in the history
Pipe refactor thread pool
  • Loading branch information
phw authored Jul 6, 2023
2 parents bf9ec7a + 5b61c2d commit d3b60e0
Showing 1 changed file with 34 additions and 38 deletions.
72 changes: 34 additions & 38 deletions picard/util/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class PipeErrorNoDestination(PipeError):
class AbstractPipe(metaclass=ABCMeta):
NO_RESPONSE_MESSAGE: str = "No response from FIFO"
MESSAGE_TO_IGNORE: str = '\0'
TIMEOUT_SECS: float = 1.5
TIMEOUT_SECS_WRITE: float = 1.5

@classmethod
@property
Expand Down Expand Up @@ -160,6 +160,8 @@ def __init__(self, app_name: str, app_version: str, args: Optional[Iterable[str]

self.unexpected_removal = False

self.__thread_pool = concurrent.futures.ThreadPoolExecutor()

for path in self._paths:
self.path = path
for arg in self._args:
Expand Down Expand Up @@ -223,32 +225,23 @@ def _sender(self, message: str) -> bool:
"""
raise NotImplementedError()

def read_from_pipe(self, timeout_secs: Optional[float] = None) -> List[str]:
def read_from_pipe(self) -> List[str]:
"""
Common interface for the custom _reader implementations
:param timeout_secs: (Optional[float]) Timeout for the function, by default it fallbacks to self.TIMEOUT_SECS
:return: List of messages or {self.NO_RESPONSE_MESSAGE} (if no messages received)
:rtype: List[str]
"""
if timeout_secs is None:
timeout_secs = self.TIMEOUT_SECS

with concurrent.futures.ThreadPoolExecutor() as executor:
reader = executor.submit(self._reader)
try:
res = reader.result(timeout=timeout_secs)
if res:
out = [r for r in res.split(self.MESSAGE_TO_IGNORE) if r]
if out:
return out
except concurrent.futures._base.TimeoutError:
# hacky way to kill the file-opening loop
self.send_to_pipe(self.MESSAGE_TO_IGNORE)
except Exception as e:
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.result
# If the call raised an exception, this method will raise the same exception.
log.error("pipe reader exception: %s", e)
try:
res = self._reader()
if res:
out = [r for r in res.split(self.MESSAGE_TO_IGNORE) if r]
if out:
return out
except Exception as e:
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.result
# If the call raised an exception, this method will raise the same exception.
log.error("pipe reader exception: %s", e)

return [self.NO_RESPONSE_MESSAGE]

Expand All @@ -262,31 +255,34 @@ def send_to_pipe(self, message: str, timeout_secs: Optional[float] = None) -> bo
:rtype: bool
"""
if timeout_secs is None:
timeout_secs = self.TIMEOUT_SECS
timeout_secs = self.TIMEOUT_SECS_WRITE

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# we're sending only filepaths, so we have to create some kind of separator
# to avoid any potential conflicts and mixing the data
sender = executor.submit(self._sender, message + self.MESSAGE_TO_IGNORE)
try:
if sender.result(timeout=timeout_secs):
return True
except concurrent.futures._base.TimeoutError:
if self.pipe_running:
log.warning("Couldn't send: %r", message)
# hacky way to kill the sender
self.read_from_pipe()
except Exception as e:
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.result
# If the call raised an exception, this method will raise the same exception.
log.error("pipe sender exception: %s", e)
# we're sending only filepaths, so we have to create some kind of separator
# to avoid any potential conflicts and mixing the data
try:
sender = self.__thread_pool.submit(self._sender, message + self.MESSAGE_TO_IGNORE)
if sender.result(timeout=timeout_secs):
return True
except concurrent.futures._base.TimeoutError:
if self.pipe_running:
log.warning("Couldn't send: %r", message)
# hacky way to kill the sender
self.read_from_pipe()
except Exception as e:
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.result
# If the call raised an exception, this method will raise the same exception.
log.error("pipe sender exception: %s", e)

return False

def stop(self):
log.debug("Stopping pipe")
self.pipe_running = False
self.send_to_pipe(self.MESSAGE_TO_IGNORE)
try:
self.__thread_pool.shutdown(wait=True, cancel_futures=True)
except TypeError: # cancel_futures is not supported on Python < 3.9
self.__thread_pool.shutdown(wait=True)


class UnixPipe(AbstractPipe):
Expand Down

0 comments on commit d3b60e0

Please sign in to comment.