Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parcel (or analog) to data exchange between Parent and Child processes #190

Open
hitriyvalenok opened this issue Oct 31, 2023 · 1 comment
Labels

Comments

@hitriyvalenok
Copy link

hitriyvalenok commented Oct 31, 2023

Hello there! 👋 It's not an issue, rather a question 🤗 I have a worker pool with some tasks running. Workers are using consumables they may request at any time and any number of times. Ideally, I wish to use Parcel for this purpose. Parcel is a very convenient way of communicating with data, but it can't be serialized and thus you can't pass it to Task. There's also Channel, but I don't understand how to use it in more complex cases than the docs described (when you sequentially send value, receive value, return result). How to solve my challenge without crutches? Thanks!

@hitriyvalenok hitriyvalenok changed the title Parcel (or analog) to exchange data between Parent and Child processes Parcel (or analog) to data exchange between Parent and Child processes Oct 31, 2023
@trowski
Copy link
Member

trowski commented Nov 26, 2023

Hi @hitriyvalenok!

We ship two implementations of Parcel which can be used across processes: SharedMemoryParcel in amphp/sync and RedisParcel in amphp/redis. Neither of these must be serialized to be used in the parent and child process

Use SharedMemoryParcel::create() in the parent and SharedMemoryParcel::use() in each child process. You'll need to use a mutex which can be shared across processes as well, so likely PosixSemaphore wrapped with SempahoreMutex, all found in amphp/sync.

RedisParcel of course requires a redis server, but is a bit simpler and does not require additional extensions. Create a RedisParcel using the same key in the parent and child.

You also could implement another Parcel using any data store, such as a file. Accessing the data will need to be mutually exclusive, so a mutex will likely be needed. amphp/file provides a file mutex, though no file parcel. If you do implement anything which you think would be useful to others, we always appreciate PRs! 👍

Channels are essentially streams which return PHP values rather than only bytes. You can create a new fiber/coroutine using Amp\async() to read data from a channel in a loop. Values can be sent at any time on a channel, they do not need to be in response to data received on the channel.

Below is a snippet from ContextWorker in amphp/cluster which is receiving messages from the parent process and acting upon them in the child process. Another method, send() (found here) is used to send messages back to the parent, which may be invoked at any time. A similar loop in the parent is receiving messages from the child.

        $cancellation = $this->deferredCancellation->getCancellation();

        try {
            // We get null as last message from the cluster-runner in case it's shutting down cleanly.
            // In that case, join it.
            /** @var ClusterMessage $message */
            while ($message = $this->context->receive($cancellation)) {
                $this->lastActivity = \time();

                /** @psalm-suppress UnhandledMatchCondition False positive. */
                match ($message->type) {
                    ClusterMessageType::Pong => null,

                    ClusterMessageType::Data => $this->handleMessage($message->data),

                    ClusterMessageType::Log => \array_map(
                        static fn (MonologHandler $handler) => $handler->handle($message->data),
                        $this->logger->getHandlers(),
                    ),

                    ClusterMessageType::Ping => throw new \RuntimeException('Unexpected message type received'),
                };
            }

            $this->joinFuture->await(new TimeoutCancellation(Watcher::WORKER_TIMEOUT));
        } finally {
            EventLoop::cancel($watcher);
            $this->close();
        }

Hopefully this was helpful. We'll be working more on docs in the coming months and I'll consider adding an example like the above to the readme. Please reply with any additional questions (and I'll try to respond more promptly).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

2 participants