Skip to content

Commit

Permalink
Fixes Future.done() behavior (issue #24).
Browse files Browse the repository at this point in the history
Signed-off-by: rafa-be <[email protected]>
  • Loading branch information
rafa-be committed Oct 3, 2024
1 parent f15fb60 commit db256d5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.8.5"
__version__ = "1.8.6"
69 changes: 61 additions & 8 deletions scaler/client/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def __init__(self, task: Task, is_delayed: bool, group_task_id: Optional[bytes],
self._result_object_id: Optional[bytes] = None
self._result_ready_event = threading.Event()
self._result_request_sent = False
self._result_received = False

self._profiling_info: Optional[ProfileResult] = None

Expand All @@ -42,6 +43,8 @@ def set_result_ready(self, object_id: Optional[bytes], profile_result: Optional[
if self.done():
raise InvalidStateError(f"invalid future state: {self._state}")

self._state = "FINISHED"

if object_id is not None:
self._result_object_id = object_id

Expand All @@ -54,25 +57,74 @@ def set_result_ready(self, object_id: Optional[bytes], profile_result: Optional[

self._result_ready_event.set()

def set_exception(self, exception: Optional[BaseException], profile_result: Optional[ProfileResult] = None) -> None:
def _set_result_or_exception(
self,
result: Optional[Any] = None,
exception: Optional[BaseException] = None,
profiling_info: Optional[ProfileResult] = None
) -> None:
with self._condition: # type: ignore[attr-defined]
if profile_result is not None:
self._profiling_info = profile_result
if self.cancelled():
raise InvalidStateError(f"invalid future state: {self._state}")

if self._result_received:
raise InvalidStateError("future already received object data.")

if profiling_info is not None:
if self._profiling_info is not None:
raise InvalidStateError("cannot set profiling info twice.")

self._profiling_info = profiling_info

self._state = "FINISHED"
self._result_received = True

if exception is not None:
assert result is None
self._exception = exception
for waiter in self._waiters:
waiter.add_exception(self)
else:
self._result = result
for waiter in self._waiters:
waiter.add_result(self)

self._result_ready_event.set()
self._condition.notify_all()

return super().set_exception(exception)
self._invoke_callbacks() # type: ignore[attr-defined]

def result(self, timeout=None):
def set_result(self, result: Any, profiling_info: Optional[ProfileResult] = None) -> None:
self._set_result_or_exception(result=result, profiling_info=profiling_info)

def set_exception(self, exception: Optional[BaseException], profiling_info: Optional[ProfileResult] = None) -> None:
self._set_result_or_exception(exception=exception, profiling_info=profiling_info)

def result(self, timeout: Optional[float] = None) -> Any:
self._result_ready_event.wait(timeout)

with self._condition: # type: ignore[attr-defined]
# if it's delayed future, get the result when future.result() get called
# if it's delayed future, get the result when future.result() gets called
if self._is_delayed:
self._request_result_object()

# wait for
return super().result(timeout)
if not self._result_received:
self._condition.wait(timeout)

return super().result()

def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]:
self._result_ready_event.wait(timeout)

with self._condition: # type: ignore[attr-defined]
# if it's delayed future, get the result when future.exception() gets called
if self._is_delayed:
self._request_result_object()

if not self._result_received:
self._condition.wait(timeout)

return super().exception()

def cancel(self) -> bool:
with self._condition: # type: ignore[attr-defined]
Expand All @@ -88,6 +140,7 @@ def cancel(self) -> bool:
self._connector.send(TaskCancel.new_msg(self._task_id))

self._state = "CANCELLED"
self._result_received = True

self._result_ready_event.set()
self._condition.notify_all() # type: ignore[attr-defined]
Expand Down
13 changes: 12 additions & 1 deletion tests/test_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_callback(self):
done_called_event = Event()

def on_done_callback(fut):
self.assertTrue(fut.done())
self.assertAlmostEqual(fut.result(), 4.0)
done_called_event.set()

Expand Down Expand Up @@ -57,20 +58,30 @@ def test_state(self):
def test_cancel(self):
with Client(address=self.address) as client:
fut = client.submit(math.sqrt, 100.0)
fut.cancel()
self.assertTrue(fut.cancel())

self.assertTrue(fut.cancelled())
self.assertTrue(fut.done())

with self.assertRaises(CancelledError):
fut.result()

fut = client.submit(math.sqrt, 16)
fut.result()

# cancel() should fail on a completed future.
self.assertFalse(fut.cancel())
self.assertFalse(fut.cancelled())

def test_exception(self):
with Client(address=self.address) as client:
fut = client.submit(math.sqrt, "16")

with self.assertRaises(TypeError):
fut.result()

self.assertTrue(fut.done())

self.assertIsInstance(fut.exception(), TypeError)

def test_client_disconnected(self):
Expand Down

0 comments on commit db256d5

Please sign in to comment.