diff --git a/src/Internal/Workflow/WorkflowContext.php b/src/Internal/Workflow/WorkflowContext.php index 1d734b5e..b2843639 100644 --- a/src/Internal/Workflow/WorkflowContext.php +++ b/src/Internal/Workflow/WorkflowContext.php @@ -63,6 +63,7 @@ use Temporal\Workflow\ChildWorkflowStubInterface; use Temporal\Workflow\ContinueAsNewOptions; use Temporal\Workflow\ExternalWorkflowStubInterface; +use Temporal\Workflow\MutexInterface; use Temporal\Workflow\WorkflowContextInterface; use Temporal\Workflow\WorkflowExecution; use Temporal\Workflow\WorkflowInfo; @@ -309,7 +310,7 @@ public function panic(\Throwable $failure = null): PromiseInterface public function continueAsNew( string $type, array $args = [], - ContinueAsNewOptions $options = null + ContinueAsNewOptions $options = null, ): PromiseInterface { return $this->callsInterceptor->with( function (ContinueAsNewInput $input): PromiseInterface { @@ -632,6 +633,21 @@ public function uuid7(?DateTimeInterface $dateTime = null): PromiseInterface return $this->sideEffect(static fn(): UuidInterface => \Ramsey\Uuid\Uuid::uuid7($dateTime)); } + public function mutex(string $name): MutexInterface + { + // todo + } + + public function conditionalMutex(string $name, PromiseInterface|callable ...$lockConditions): MutexInterface + { + // todo + } + + public function runLocked(string|MutexInterface $name, callable $callable): PromiseInterface + { + // todo + } + /** * @internal */ diff --git a/src/Workflow.php b/src/Workflow.php index 012fa339..2c5ceee4 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -28,8 +28,10 @@ use Temporal\Workflow\ChildWorkflowStubInterface; use Temporal\Workflow\ContinueAsNewOptions; use Temporal\Workflow\ExternalWorkflowStubInterface; +use Temporal\Workflow\MutexInterface; use Temporal\Workflow\ScopedContextInterface; use Temporal\Workflow\UpdateContext; +use Temporal\Workflow\WorkflowContextInterface; use Temporal\Workflow\WorkflowExecution; use Temporal\Workflow\WorkflowInfo; use Temporal\Internal\Support\DateInterval; @@ -970,4 +972,87 @@ public static function uuid7(?DateTimeInterface $dateTime = null): PromiseInterf return $context->uuid7($dateTime); } + + /** + * Get a mutex by name or create a new one. + * + * If a mutex is yielded without calling `lock()`, the Workflow will continue + * only when the lock is released. + * + * ```php + * yield Workflow::mutex('my-mutex'); + * ``` + * + * Now to continue only when the lock is acquired: + * + * ```php + * yield Workflow::mutex('my-mutex')->lock(); + * ``` + * + * Note: in this case, if the lock is already acquired, the Workflow will be blocked until it's released + * + * @param non-empty-string $name The name of the mutex. + */ + public static function mutex(string $name): MutexInterface + { + /** @var WorkflowContextInterface $context */ + $context = self::getCurrentContext(); + + return $context->mutex($name); + } + + /** + * Create a conditional mutex that is locked when any of the conditions are met. + * + * @param non-empty-string $name + * + * Example: + * + * Monitor when the number of threads exceeds 10: + * + * ```php + * #[WorkflowMethod] + * function start() { + * // Register a conditional mutex that will be locked when the number of threads exceeds 10 + * Workflow::conditionalMutex( + * 'limit', + * fn() => count($this->threads) > 10, + * ); + * // ... + * } + * + * #[SignalMethod] + * function addTask(Task $task) { + * yield Workflow::runLocked('limit', function() { + * $key = array_key_last($this->threads) + 1; + * yield $this->threads[$key] = Workflow::executeChildWorkflow(...); + * unset($this->threads[$key]); + * }); + * } + * ``` + */ + public function conditionalMutex(string $name, PromiseInterface|callable ...$lockConditions): MutexInterface + { + /** @var WorkflowContextInterface $context */ + $context = self::getCurrentContext(); + + return $context->conditionalMutex($name, ...$lockConditions); + } + + /** + * Run a function when the mutex is released. + * The mutex is locked for the duration of the function if it's not a conditional mutex. + * Conditional mutexes are locked only when all conditions are met. + * + * @see Workflow::conditionalMutex() + * + * @param non-empty-string|MutexInterface $name Mutex name or instance. + */ + public function runLocked(string|MutexInterface $name, callable $callable): PromiseInterface + { + /** @var WorkflowContextInterface $context */ + $context = self::getCurrentContext(); + + return $context->runLocked($name, $callable); + } } diff --git a/src/Workflow/MutexInterface.php b/src/Workflow/MutexInterface.php new file mode 100644 index 00000000..ad65f190 --- /dev/null +++ b/src/Workflow/MutexInterface.php @@ -0,0 +1,21 @@ + */ public function uuid7(?DateTimeInterface $dateTime = null): PromiseInterface; + + /** + * Get a mutex by name or create a new one. + * + * @param non-empty-string $name The name of the mutex. + */ + public function mutex(string $name): MutexInterface; + + /** + * Create a conditional mutex. + * + * @param non-empty-string $name + */ + public function conditionalMutex(string $name, PromiseInterface|callable ...$lockConditions): MutexInterface; + + /** + * Run a function when the mutex is released. + * The mutex is locked for the duration of the function if it's not a conditional mutex. + * Conditional mutexes are locked only when all conditions are met. + * + * @param non-empty-string|MutexInterface $name Mutex name or instance. + */ + public function runLocked(string|MutexInterface $name, callable $callable): PromiseInterface; }