Skip to content

Commit

Permalink
feat: add more robust retry policy to Deluge client
Browse files Browse the repository at this point in the history
  • Loading branch information
gmega committed Jan 14, 2025
1 parent 700753c commit 8ef05e3
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 4 deletions.
32 changes: 29 additions & 3 deletions benchmarks/deluge/deluge_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

import pathvalidate
from deluge_client import DelugeRPCClient
from tenacity import retry, wait_exponential
from tenacity import retry, wait_exponential, stop_after_attempt
from tenacity.stop import stop_base
from tenacity.wait import wait_base
from torrentool.torrent import Torrent
from typing_extensions import Generic, TypeVar
from urllib3.util import Url

from benchmarks.core.experiments.experiments import ExperimentComponent
Expand Down Expand Up @@ -129,14 +132,20 @@ def rpc(self) -> DelugeRPCClient:
self.connect()
return self._rpc

@retry(wait=wait_exponential(multiplier=1, min=4, max=16))
@retry(
stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=16)
)
def connect(self) -> Self:
return self._raw_connect()

def _raw_connect(self):
client = DelugeRPCClient(**self.daemon_args)
client.connect()
self._rpc = client
self._rpc = ResilientCallWrapper(
client,
wait_policy=wait_exponential(multiplier=1, min=4, max=16),
stop_policy=stop_after_attempt(5),
)
return self

def is_ready(self) -> bool:
Expand All @@ -159,6 +168,23 @@ def __str__(self):
return f"DelugeNode({self.name}, {self.daemon_args['host']}:{self.daemon_args['port']})"


T = TypeVar("T")


class ResilientCallWrapper(Generic[T]):
def __init__(self, client: T, wait_policy: wait_base, stop_policy: stop_base):
self.client = client
self.wait_policy = wait_policy
self.stop_policy = stop_policy

def __getattr__(self, item):
@retry(wait=self.wait_policy, stop=self.stop_policy)
def _resilient_wrapper(*args, **kwargs):
return getattr(self.client, item)(*args, **kwargs)

return _resilient_wrapper


class DelugeDownloadHandle(DownloadHandle):
def __init__(self, torrent: Torrent, node: DelugeNode) -> None:
self.node = node
Expand Down
34 changes: 33 additions & 1 deletion benchmarks/deluge/tests/test_deluge_node.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from pathlib import Path

import pytest
from tenacity import wait_incrementing, stop_after_attempt, RetryError

from benchmarks.core.utils import megabytes, await_predicate
from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta
from benchmarks.deluge.deluge_node import DelugeNode, DelugeMeta, ResilientCallWrapper
from benchmarks.deluge.tracker import Tracker


Expand Down Expand Up @@ -74,3 +75,34 @@ def test_should_remove_files(

deluge_node1.remove(torrent)
assert not deluge_node1.torrent_info(name="dataset1")


class FlakyClient:
def __init__(self):
self.count = 0

def flaky(self):
self.count += 1
if self.count == 1:
raise IOError("Connection refused")
return 1


def test_should_retry_operations_when_they_fail():
wrapper = ResilientCallWrapper(
FlakyClient(),
wait_policy=wait_incrementing(start=0, increment=0),
stop_policy=stop_after_attempt(2),
)
assert wrapper.flaky() == 1


def test_should_give_up_on_operations_when_stop_policy_is_met():
wrapper = ResilientCallWrapper(
FlakyClient(),
wait_policy=wait_incrementing(start=0, increment=0),
stop_policy=stop_after_attempt(1),
)

with pytest.raises(RetryError):
wrapper.flaky()

0 comments on commit 8ef05e3

Please sign in to comment.