Skip to content

Commit

Permalink
Transport Client now carries Workflow contexts instead of just info.
Browse files Browse the repository at this point in the history
It is required to resolve promises in the right context.
  • Loading branch information
roxblnfk committed Oct 16, 2023
1 parent f3c0a33 commit 28affbc
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/Internal/Support/Facade.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static function __callStatic(string $name, array $arguments)

/**
* @param object<T>|null $ctx
* @internal
*/
public static function setCurrentContext(?object $ctx): void
{
Expand Down
16 changes: 10 additions & 6 deletions src/Internal/Transport/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Worker\Transport\Command\ResponseInterface;
use Temporal\Worker\Transport\Command\SuccessResponseInterface;
use Temporal\Workflow\WorkflowInfo;
use Temporal\Workflow;
use Temporal\Workflow\WorkflowContextInterface;

/**
* @internal Client is an internal library class, please do not use it in your code.
Expand All @@ -38,7 +39,7 @@ final class Client implements ClientInterface
'a request with that identifier was not sent';

/**
* @var array<int, array{Deferred, WorkflowInfo|null}>
* @var array<int, array{Deferred, WorkflowContextInterface|null}>
*/
private array $requests = [];

Expand All @@ -64,14 +65,17 @@ public function dispatch(ResponseInterface $response): void
return;
}

[$deferred, $info] = $this->requests[$id];
[$deferred, $context] = $this->requests[$id];
unset($this->requests[$id]);
$info = $context->getInfo();

if ($info !== null && $response->getHistoryLength() > $info->historyLength) {
/** @psalm-suppress InaccessibleProperty */
$info->historyLength = $response->getHistoryLength();
}

// Bind workflow context for promise resolution
Workflow::setCurrentContext($context);
if ($response instanceof FailureResponseInterface) {
$deferred->reject($response->getFailure());
} else {
Expand All @@ -81,11 +85,11 @@ public function dispatch(ResponseInterface $response): void

/**
* @param RequestInterface $request
* @param null|WorkflowInfo $workflowInfo
* @param null|WorkflowContextInterface $context
*
* @return PromiseInterface
*/
public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface
public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface
{
$this->queue->push($request);

Expand All @@ -96,7 +100,7 @@ public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo =
}

$deferred = new Deferred();
$this->requests[$id] = [$deferred, $workflowInfo];
$this->requests[$id] = [$deferred, $context];

return $deferred->promise();
}
Expand Down
6 changes: 3 additions & 3 deletions src/Internal/Transport/ClientInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
use React\Promise\PromiseInterface;
use Temporal\Worker\Transport\Command\CommandInterface;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow\WorkflowInfo;
use Temporal\Workflow\WorkflowContextInterface;

interface ClientInterface
{
/**
* @param RequestInterface $request
* @param null|WorkflowInfo $workflowInfo
* @param null|WorkflowContextInterface $context
*
* @return PromiseInterface
*/
public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface;
public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface;

/**
* @param CommandInterface $command
Expand Down
4 changes: 2 additions & 2 deletions src/Internal/Workflow/Process/Scope.php
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ protected function onRequest(RequestInterface $request, PromiseInterface $promis
return;
}

$this->context->getClient()->request(new Cancel($request->getID()));
$this->context->getClient()->request(new Cancel($request->getID()), $this->scopeContext);
};

$cancelID = $this->cancelID;
Expand Down Expand Up @@ -471,7 +471,7 @@ protected function next(): void
break;

case $current instanceof RequestInterface:
$this->nextPromise($this->context->getClient()->request($current, $this->scopeContext->getInfo()));
$this->nextPromise($this->context->getClient()->request($current, $this->scopeContext));
break;

case $current instanceof \Generator:
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/Workflow/WorkflowContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ public function request(RequestInterface $request, bool $cancellable = true): Pr
// Intercept workflow outbound calls
return $this->requestInterceptor->with(
function (RequestInterface $request): PromiseInterface {
return $this->client->request($request, $this->getInfo());
return $this->client->request($request, $this);
},
/** @see WorkflowOutboundRequestInterceptor::handleOutboundRequest() */
'handleOutboundRequest',
Expand Down
4 changes: 2 additions & 2 deletions tests/Feature/Testing/CapturedClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use Temporal\Internal\Transport\ClientInterface;
use Temporal\Worker\Transport\Command\CommandInterface;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Workflow\WorkflowInfo;
use Temporal\Workflow\WorkflowContextInterface;

class CapturedClient implements ClientInterface
{
Expand All @@ -41,7 +41,7 @@ public function __construct(ClientInterface $parent)
* @param RequestInterface $request
* @return PromiseInterface
*/
public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface
public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface
{
return $this->requests[$request->getID()] = $this->parent->request($request)
->then($this->onFulfilled($request), $this->onRejected($request));
Expand Down
5 changes: 3 additions & 2 deletions tests/Feature/Testing/TestingClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Temporal\Worker\Transport\Command\FailureResponse;
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Worker\Transport\Command\SuccessResponse;
use Temporal\Workflow\WorkflowContextInterface;
use Temporal\Workflow\WorkflowInfo;

class TestingClient extends CapturedClient
Expand Down Expand Up @@ -70,12 +71,12 @@ public function error(RequestInterface $request, \Throwable $error): TestingFail
/**
* {@inheritDoc}
*/
public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface
public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface
{
if (!$request instanceof TestingRequest) {
$request = new TestingRequest($request);
}

return parent::request($request, $workflowInfo);
return parent::request($request, $context);
}
}
4 changes: 2 additions & 2 deletions tests/Unit/Framework/ClientMock.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use Temporal\Worker\Transport\Command\RequestInterface;
use Temporal\Worker\Transport\Command\ResponseInterface;
use Temporal\Worker\Transport\Command\SuccessResponseInterface;
use Temporal\Workflow\WorkflowInfo;
use Temporal\Workflow\WorkflowContextInterface;

/**
* @internal
Expand Down Expand Up @@ -60,7 +60,7 @@ public function dispatch(ResponseInterface $response): void
}
}

public function request(RequestInterface $request, ?WorkflowInfo $workflowInfo = null): PromiseInterface
public function request(RequestInterface $request, ?WorkflowContextInterface $context = null): PromiseInterface
{
$this->queue->push($request);

Expand Down

0 comments on commit 28affbc

Please sign in to comment.