Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Workflow docs editing #6571

Merged
merged 12 commits into from
Oct 2, 2024
34 changes: 18 additions & 16 deletions docs/architecture/in-memory-queue.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
# In-memory timer queue
This queue is similar to normal persisted timer queue, but it exists in memory only and never gets
persisted. It is created with generic `MemoryScheduledQueueFactory`, but currently serves only
[speculative Workflow Task](./speculative-workflow-task.md) timeouts, therefore the only queue this factory creates
is `SpeculativeWorkflowTaskTimeoutQueue` which uses same task executor as normal timer queue:
`TimerQueueActiveTaskExecutor`.
# In-memory Timer Queue

Implementation uses `PriorityQueue` by `VisibilityTimestamp`: a task on top is the task that
executed next.
This queue is similar to the normal persisted timer queue, but it exists only in memory, i.e. it
never gets persisted. It is created by a generic `MemoryScheduledQueueFactory`, but currently serves
only [speculative Workflow Task](./speculative-workflow-task.md) timeouts. Therefore, the only queue
this factory creates is `SpeculativeWorkflowTaskTimeoutQueue` which uses the same task executor as
the normal timer queue: `TimerQueueActiveTaskExecutor`.

In-memory queue supports only `WorkflowTaskTimeoutTask` and there are two timeout types
enforced by in-memory queue: `SCHEDULED_TO_START` and `START_TO_CLOSE`.
Its implementation uses a `PriorityQueue` sorted by `VisibilityTimestamp`: the task on top is the
task that is executed next.

Executor of `WorkflowTaskTimeoutTask` from in-memory queue is the same as for normal timer queue,
although it does one extra check for speculative Workflow Task. It checks if a task being executed still the same
as stored in mutable state (`CheckSpeculativeWorkflowTaskTimeoutTask`). This is because MS can lose and create
a new speculative Workflow Task, which will be a different Workflow Task and a timeout task must be skipped for it.
The in-memory queue only supports `WorkflowTaskTimeoutTask`, and enforces the
`SCHEDULE_TO_START` and `START_TO_CLOSE` timeouts.

Note that while the in-memory queue's executor of `WorkflowTaskTimeoutTask` is the same as for
the normal timer queue, it does one extra check for speculative Workflow Tasks:
`CheckSpeculativeWorkflowTaskTimeoutTask` checks if a task being executed is still the *same* task
that's stored in mutable state. This is important since the mutable state can lose and create a *new*
speculative Workflow Task, and therefore the old timeout task must be ignored.

> #### TODO
> Future refactoring is necessary to make logic (and probably naming) clearer. It is not clear
> if in-memory queue might have other applications besides timeouts for speculative Workflow Tasks.
> Future refactoring is necessary to make the logic (and probably naming) clearer. It is not clear
> yet if the in-memory queue has other applications besides timeouts for speculative Workflow Tasks.
90 changes: 46 additions & 44 deletions docs/architecture/message-protocol.md
Original file line number Diff line number Diff line change
@@ -1,65 +1,67 @@
# Message protocol
# Message Protocol

## Why it exists
Usually communication between server and worker uses events and commands: events go from server to worker,
worker process them and generates commands that go back to server. Events are attached to Workflow Task, which
worker gets as response to `PollWorkflowTask` API call, and worker sends commands back
when it completes Workflow Task with `RespondWorkflowTaskCompleted` API. Workflow Task works as transport on RPC level.
Usually, communication between the server and the worker uses events and commands: events go from
the server to the worker, the worker processes them and generates commands that go back to server.
The events are attached to the Workflow Task, which the worker receives from the `PollWorkflowTask`
API call, and the worker sends commands back when it completes the Workflow Task with the
`RespondWorkflowTaskCompleted` API.

Unfortunately, this way or communication didn't work for Workflow Update. Server can't use events
to ship Update request to the worker, because worker might reject Update, and it must completely disappear.
Because history is immutable, server can't delete events from it. Initial implementation
was using transient event with Update request, which wasn't written to history. This implementation
was proven to be error-prone and hard to handle on the SDK side. Commands that go back from worker to server
also can't be used for Update because some SDKs assume that every command will produce exactly one event,
which is not true for Update rejections that don't produce any events.
Unfortunately, this protocol didn't work for Workflow Update. The server cannot use events to ship
Update request to the worker because in case the Update is rejected, it must completely disappear.
But because the history is immutable, the server cannot delete any events from it. The initial
implementation was using a transient (not written to history) event instead, but that implementation
proved to be error-prone and hard to handle on the SDK side. Similarly, commands can't be used
for Update either because some SDKs assume that every command will produce *exactly* one event,
which is not true for Update rejections as they don't produce an event.

Another protocol was required to implement Workflow Update. Messages are attached to Workflow Task and go in
both directions, similar to events and commands but don't have limitations listed above.
Another protocol was required to implement Workflow Update: messages are attached to Workflow Task
and travel in both directions. They are similar to events and commands but don't have the same
limitations listed above.

## `Message` proto message
This might look confusing:
```protobuf
message Message {}
```
but first `message` word refers to protobuf messages and second `Message` is `protocolpb.Message`
data struct used by Temporal. Most fields are self-explanatory, but some fields need explanation.
The first `message` refers to protobuf messages and the second `Message` is `protocolpb.Message`
data struct used by Temporal.

### `protocol_instance_id`
is an identifier of the object which this message belongs to. Because currently messages are used for
Workflow Update only, it is `update_id`.
This field identifies what object this message belongs to. Because currently messages are only used for
Workflow Update, it is the same as `update_id`.

> #### TODO
> In the future, signals and queries might use message protocol too.
> In these case `protocol_instance_id` will be `query_id` or `signal_id.
> In the future, signals and queries might use the message protocol, too.
> In that case `protocol_instance_id` would be `query_id` or `signal_id`.

### `body`
is intentionally of type `Any` (not `oneof`) to support pluggable interfaces which Temporal server
might not be aware of.
This field is intentionally of type `Any` (not `oneof`) to support pluggable interfaces which the
Temporal server might not be aware of.

### `sequence_id`
Because messages might intersect with events and commands, it is important to specify when
a particular message must be processed. This field can be `event_id`, and it will indicate event
after which message should be processed by worker, or `command_index`, and it will indicate
command after which message should be processed by server.
### `sequencing_id`
Because messages might intersect with events and commands, it is important to specify in what order
messages must be processed. This field can be:
- `event_id`, to indicate the event after which this message should be processed by the worker, or
- `command_index`, to indicate the command after which message should be processed by the server.

> #### TODO
> In fact, this is not used. Server always set `event_id` equal to event Id before `WorkflowTaskStartedEvent`,
> which essentially means, that all messages are processed after all events (all SDKs respect this field though).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is important to indicate that this field is not currently used at all. Yes, from your version it kinda implies that, but I would like to see clear sentence saying "it doesn't work this way".

Copy link
Contributor Author

@stephanos stephanos Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I found the "not used part" confusing. It sounds like the worker isn't using the field? I've updated the text now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Yes, event_id is set by server and respected by all SDK. It just set not to intended value. command_index though is indeed not used at all: it is not set by SDKs (at least go) and not checked by server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense 👍

> This is because buffered events are reordered on server (see `reorderBuffer()` func) and intersection
> with them based on `event_id` is not possible. When reordering is removed, this field can be set to the right value.
> `event_id` is not used as intended: it is *always* set to the id before the `WorkflowTaskStartedEvent`
> by the server, which means that all messages are processed after all events. This is because buffered
> events are reordered on the server (see `reorderBuffer()` func) and intersecting them based on `event_id`
> is not possible. When reordering is removed, this field can be set to the right value.

> #### TODO
> `command_index` is not used: SDKs use a different approach where a special command of type
> `COMMAND_TYPE_PROTOCOL_MESSAGE` is added to a command list to indicate the place where a message
> must be processed. This command has only `message_id` fields which point to a particular message.
>
> When the Update is rejected, `COMMAND_TYPE_PROTOCOL_MESSAGE` is *not* added to the list of commands,
> though, because of the aforementioned limitation of requiring each command to produce an event.
> The server will assume that any message that wasn't mentioned in a `COMMAND_TYPE_PROTOCOL_MESSAGE`
> command is rejected. Those messages will be processed after all commands were processed first,
> in the order they arrived in.
>
> `command_index` is not used because SDKs use different approach: special command of type `COMMAND_TYPE_PROTOCOL_MESSAGE`
> is added to a command list to indicate place where a message must be processed. This command has only `message_id` fields
> which points to a particular message. This is done this way because of limitation described above:
> all commands must produce exactly one event and vice versa. Because Update rejection messages
> doesn't produce events at all, `COMMAND_TYPE_PROTOCOL_MESSAGE` is not added to the list of commands for Update rejections.
> Once processed, a message is removed from the list of messages to process. Therefore,
> all Update rejections, as well as messages which don't have `COMMAND_TYPE_PROTOCOL_MESSAGE` command
> are processed last (because messages are processed after commands).
> Server doesn't require `COMMAND_TYPE_PROTOCOL_MESSAGE` command, and if it is not present, all messages
> will be processed after all commands in the order they arrive.
> When 1:1 limitation is removed, `command_index` might be used.
> When the 1:1 limitation between commands and events is removed, `command_index` can be used.

> #### NOTE
> All SDKs process all queries last (after events and messages).
> All SDKs process all queries *last* (i.e. after events and messages).
Loading
Loading