From 28affbc4bb59f99cf03ff33a9789c82c0c567cea Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Mon, 16 Oct 2023 22:07:02 +0400 Subject: [PATCH] Transport Client now carries Workflow contexts instead of just info. It is required to resolve promises in the right context. --- src/Internal/Support/Facade.php | 1 + src/Internal/Transport/Client.php | 16 ++++++++++------ src/Internal/Transport/ClientInterface.php | 6 +++--- src/Internal/Workflow/Process/Scope.php | 4 ++-- src/Internal/Workflow/WorkflowContext.php | 2 +- tests/Feature/Testing/CapturedClient.php | 4 ++-- tests/Feature/Testing/TestingClient.php | 5 +++-- tests/Unit/Framework/ClientMock.php | 4 ++-- 8 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/Internal/Support/Facade.php b/src/Internal/Support/Facade.php index 810366d8..724054dd 100644 --- a/src/Internal/Support/Facade.php +++ b/src/Internal/Support/Facade.php @@ -53,6 +53,7 @@ public static function __callStatic(string $name, array $arguments) /** * @param object|null $ctx + * @internal */ public static function setCurrentContext(?object $ctx): void { diff --git a/src/Internal/Transport/Client.php b/src/Internal/Transport/Client.php index a589d1a6..7da111a8 100644 --- a/src/Internal/Transport/Client.php +++ b/src/Internal/Transport/Client.php @@ -21,7 +21,8 @@ use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Worker\Transport\Command\ResponseInterface; use Temporal\Worker\Transport\Command\SuccessResponseInterface; -use Temporal\Workflow\WorkflowInfo; +use Temporal\Workflow; +use Temporal\Workflow\WorkflowContextInterface; /** * @internal Client is an internal library class, please do not use it in your code. @@ -38,7 +39,7 @@ final class Client implements ClientInterface 'a request with that identifier was not sent'; /** - * @var array + * @var array */ private array $requests = []; @@ -64,14 +65,17 @@ public function dispatch(ResponseInterface $response): void return; } - [$deferred, $info] = $this->requests[$id]; + [$deferred, $context] = $this->requests[$id]; unset($this->requests[$id]); + $info = $context->getInfo(); if ($info !== null && $response->getHistoryLength() > $info->historyLength) { /** @psalm-suppress InaccessibleProperty */ $info->historyLength = $response->getHistoryLength(); } + // Bind workflow context for promise resolution + Workflow::setCurrentContext($context); if ($response instanceof FailureResponseInterface) { $deferred->reject($response->getFailure()); } else { @@ -81,11 +85,11 @@ public function dispatch(ResponseInterface $response): void /** * @param RequestInterface $request - * @param null|WorkflowInfo $workflowInfo + * @param null|WorkflowContextInterface $context * * @return PromiseInterface */ - public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface + public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface { $this->queue->push($request); @@ -96,7 +100,7 @@ public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = } $deferred = new Deferred(); - $this->requests[$id] = [$deferred, $workflowInfo]; + $this->requests[$id] = [$deferred, $context]; return $deferred->promise(); } diff --git a/src/Internal/Transport/ClientInterface.php b/src/Internal/Transport/ClientInterface.php index adb2e95c..56020d0b 100644 --- a/src/Internal/Transport/ClientInterface.php +++ b/src/Internal/Transport/ClientInterface.php @@ -14,17 +14,17 @@ use React\Promise\PromiseInterface; use Temporal\Worker\Transport\Command\CommandInterface; use Temporal\Worker\Transport\Command\RequestInterface; -use Temporal\Workflow\WorkflowInfo; +use Temporal\Workflow\WorkflowContextInterface; interface ClientInterface { /** * @param RequestInterface $request - * @param null|WorkflowInfo $workflowInfo + * @param null|WorkflowContextInterface $context * * @return PromiseInterface */ - public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface; + public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface; /** * @param CommandInterface $command diff --git a/src/Internal/Workflow/Process/Scope.php b/src/Internal/Workflow/Process/Scope.php index 0cca4b64..b394c5e3 100644 --- a/src/Internal/Workflow/Process/Scope.php +++ b/src/Internal/Workflow/Process/Scope.php @@ -422,7 +422,7 @@ protected function onRequest(RequestInterface $request, PromiseInterface $promis return; } - $this->context->getClient()->request(new Cancel($request->getID())); + $this->context->getClient()->request(new Cancel($request->getID()), $this->scopeContext); }; $cancelID = $this->cancelID; @@ -471,7 +471,7 @@ protected function next(): void break; case $current instanceof RequestInterface: - $this->nextPromise($this->context->getClient()->request($current, $this->scopeContext->getInfo())); + $this->nextPromise($this->context->getClient()->request($current, $this->scopeContext)); break; case $current instanceof \Generator: diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index a66bcd1e..cac298e2 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -507,7 +507,7 @@ public function request(RequestInterface $request, bool $cancellable = true): Pr // Intercept workflow outbound calls return $this->requestInterceptor->with( function (RequestInterface $request): PromiseInterface { - return $this->client->request($request, $this->getInfo()); + return $this->client->request($request, $this); }, /** @see WorkflowOutboundRequestInterceptor::handleOutboundRequest() */ 'handleOutboundRequest', diff --git a/tests/Feature/Testing/CapturedClient.php b/tests/Feature/Testing/CapturedClient.php index cc63efe8..86f26992 100644 --- a/tests/Feature/Testing/CapturedClient.php +++ b/tests/Feature/Testing/CapturedClient.php @@ -15,7 +15,7 @@ use Temporal\Internal\Transport\ClientInterface; use Temporal\Worker\Transport\Command\CommandInterface; use Temporal\Worker\Transport\Command\RequestInterface; -use Temporal\Workflow\WorkflowInfo; +use Temporal\Workflow\WorkflowContextInterface; class CapturedClient implements ClientInterface { @@ -41,7 +41,7 @@ public function __construct(ClientInterface $parent) * @param RequestInterface $request * @return PromiseInterface */ - public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface + public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface { return $this->requests[$request->getID()] = $this->parent->request($request) ->then($this->onFulfilled($request), $this->onRejected($request)); diff --git a/tests/Feature/Testing/TestingClient.php b/tests/Feature/Testing/TestingClient.php index 2d7cbb52..2f158dd9 100644 --- a/tests/Feature/Testing/TestingClient.php +++ b/tests/Feature/Testing/TestingClient.php @@ -19,6 +19,7 @@ use Temporal\Worker\Transport\Command\FailureResponse; use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Worker\Transport\Command\SuccessResponse; +use Temporal\Workflow\WorkflowContextInterface; use Temporal\Workflow\WorkflowInfo; class TestingClient extends CapturedClient @@ -70,12 +71,12 @@ public function error(RequestInterface $request, \Throwable $error): TestingFail /** * {@inheritDoc} */ - public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface + public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface { if (!$request instanceof TestingRequest) { $request = new TestingRequest($request); } - return parent::request($request, $workflowInfo); + return parent::request($request, $context); } } diff --git a/tests/Unit/Framework/ClientMock.php b/tests/Unit/Framework/ClientMock.php index 55ae24a3..e557302a 100644 --- a/tests/Unit/Framework/ClientMock.php +++ b/tests/Unit/Framework/ClientMock.php @@ -15,7 +15,7 @@ use Temporal\Worker\Transport\Command\RequestInterface; use Temporal\Worker\Transport\Command\ResponseInterface; use Temporal\Worker\Transport\Command\SuccessResponseInterface; -use Temporal\Workflow\WorkflowInfo; +use Temporal\Workflow\WorkflowContextInterface; /** * @internal @@ -60,7 +60,7 @@ public function dispatch(ResponseInterface $response): void } } - public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface + public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface { $this->queue->push($request);