-
Notifications
You must be signed in to change notification settings - Fork 272
Progress tracking
The main feature timely dataflow provides, in addition to moving data along dataflow edges, is the indication of progress through the stream of data.
The system alerts each recipient of timestamped data once it can determine that some previously active timestamp will never be seen again. This allows the operator to react with final messages and actions, for example sending aggregates, flushing state, or releasing held resources. The same mechanisms also alert the user as output data emerge from the dataflow graph.
We first need to develop timely dataflow's progress vocabulary.
A timely dataflow graph consists of operators, whose inputs and outputs are connected by channels. Each operator input has one associated channel, but an operator output may have many channels leading from it: multiple other operators may want to see the data it produces. Each group of records transmitted in timely dataflow have an associated timestamp.
Timely dataflow is responsible for producing information that would let each operator answer the question
Will I ever see data with this specific timestamp on this specific input of mine?
To provide this information, timely dataflow imposes some constraints on the structure of the dataflow graph and the behavior of operators.
The high-level constraint that timely dataflow imposes is that that there should be no cycles in the timely dataflow graph which permit data on some channel to eventually produce data on that channel with the same timestamp. This will let the system reason that once some timestamp has gone away, it will not be seen again.
This property can be difficult to reason about directly, so instead we will impose simpler, local constraints on the graph structure and operator behavior, and argue that these constraints imply the desired global property.
With one exception, operators may only be constructed from the full set of channels connected to their inputs, and their outputs are not available until this happens.
This constraint by itself would ensure that dataflow graphs are acyclic, as we could totally order the operators by their construction time, and edges would not be able to go backwards along this order.
Acyclic dataflow graphs are a limitation we hope to avoid. To create cycles, timely dataflow permits one specific system-provided operator to be constructed before the channel connecting its input is available. This is the feedback
operator, and it ensures that timestamps are strictly advanced for all data passing through.
Each operator must respond to input messages only with output messages whose timestamps are at least as large.
To maintain the strict advancement the feedback
operator introduces, each other operator must never "roll back" timestamps. This property is a bit subtle, in that more complicated operators may have multiple inputs and outputs, not all of which should be thought of as directly connected.
Each operator must implement the Operate<T>
trait, parameterized by the timestamp T
their inputs and outputs use for data. This trait describes all of the information the system needs to know about the operator in order to reason about progress around the operator.
// implementors can be treated as operators
pub trait Operate<T: Timestamp> {
// preliminary information.
fn inputs(&self) -> u64;
fn outputs(&self) -> u64;
// initialization methods, describing internal/external structure and capabilities.
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<T::Summary>>>, Vec<CountMap<T>>);
fn set_external_summary(&mut self, Vec<Vec<Antichain<T::Summary>>>, &mut [CountMap<T>]);
// run-time methods, indicating changes in capabilities.
fn push_external_progress(&mut self, &mut [CountMap<T>])
fn pull_internal_progress(&mut self, &mut [CountMap<T>], &mut [CountMap<T>], &mut [CountMap<T>]) -> bool;
}
Let's go through each of the groups of methods, and explain what they are about (and what Antichain
and CountMap
are).
The first information an operator must declare is what type it plans to use for its timestamp. There are constraints on these types, but the most important one for our purposes is that each timestamp must implement PartialOrd
: there must be a way to indicate that some elements are "greater than" others.
An operator must also declare its number of inputs and outputs, clearly. The rest of the methods will operate in terms of numbered inputs and outputs, rather than pointers to things.
pub trait Operate<T: Timestamp> {
// preliminary information.
fn inputs(&self) -> u64;
fn outputs(&self) -> u64;
There are presently no constraints on the number of inputs and outputs. Some operators have no inputs, some no outputs, some have unboundedly many inputs and some unboundedly many outputs.
This is where we start to get in to the heavy stuff.
Before a timely dataflow computation starts, the system needs to understand how timestamped data presented to the input of an operator might emerge from an output. For example, it may be that data presented at one input could emerge at two of the operator's outputs, but data presented at another input could emerge only at one specific output, and in that case only with the timestamp advanced.
It may be less obvious, but the system should also communicate the same information about the outside world to the operator: how might data the operator produces on an output eventually return to its inputs?
Let's look at these two methods and talk through their implications
The first method asks the operator to summarize the connectivity of its inputs to its outputs, and to indicate any initial message sending capabilities it intends for each of its outputs.
fn get_internal_summary(&mut self) ->
(Vec<Vec<Antichain<T::Summary>>>,
Vec<CountMap<T>>);
The return type is a pair, where the first element should be thought of as a map from (input, output)
to a collection of T::Summary
objects. A T::Summary
is a summary of how T
advances. In the simplest case it could just be the identity, saying a timestamp emerges at the output unchanged. More generally, some part of the timestamp might advance, and the operator can indicate that.
The Antichain
aspect is a detail of the PartialOrd
-eredness of the timestamps and their summaries: there may be several incomparable ways in which the timestamp advances (for example, at least one of two coordinates might advance, but perhaps no guarantee that both would).
The second output should be thought of as a map from (output, timestamp)
to an initial count. The count is typically one (a zero is implied for absent keys), and simply indicates that the operator retains the right to emit data with the specified timestamp on the specified output.
The second method is essentially the dual of the first, with the system telling the operator about the connectivity summaries between its outputs and its inputs, and any initial message sending capabilities on each of its inputs.
fn set_external_summary(&mut self,
Vec<Vec<Antichain<T::Summary>>>,
&mut [CountMap<T>]);
Only once the system has full information about the operators it manages and the edges between them can it correctly invoke this message. The typical pattern is to call get_internal_summary
for each managed operator, think for a bit, and then call set_external_summary
for each of them.
The types might be a bit surprising. The second parameter is an owned Vec
, and the operator is expected to take this summary for its own use. The third paramter is a reference to a slice of CountMap
s, and the operator is expected to drain each (but does not get to own them). The reasoning is that in the latter case, the system will want to repeatedly notify the operator of changes in capabilities, and it would rather re-use the same CountMap
s rather than repeatedly allocate and move them to the operator.
Once a timely dataflow computation starts running, messages will be delivered and consumed, and ideally operators will start release their message sending capabilities, or at least advance the capabilities to later timestamps.
To communicate this information, we again have two methods, corresponding to the system telling the operator about changes in the outside world, and the system requesting information from the operator about internal changes as they are reflected in the outside world.
If for any reason there is a change to times of data an operator might expect on any of its inputs, the system indicates them by simply announcing the changed counts for each input.
fn push_external_progress(&mut self,
&mut [CountMap<T>])
This communication is one-directional: the operator does not reply with any information, and is not obliged to react in any way to this information.
The system also periodically ask each operator about progress it may have made. The system is not obliged to call this method, but if it looks like some operator is holding up the show, it might be a good idea to check in and see if it has gotten anywhere.
fn pull_internal_progress(&mut self,
&mut [CountMap<T>],
&mut [CountMap<T>],
&mut [CountMap<T>]) -> bool;
There are three additional arguments to this method, and a boolean return value. The three arguments are:
1. changes to the capabilities on each output.
2. amount of data consumed on each input.
3. amount of data produced on each output.
The first argument probably makes sense: if the operator has finished sending data with some timestamp, it can release the capability (but perhaps retain some new capability).
The other two arguments involve the production and consumption of messages. Recall that we are doing all this because operator might send data to each other. These arguments are how each operator indicates that it did actually produce some data, and correspondingly, that it received (and accepts responsibility for) some data.
There is an important contract here: an operator must not express a new capability without consuming data bearing that timestamp (or one before it), and an operator must not send data for a capability it has not expressed. The system can check these invariants for the operator, as it tracks the capabilities the operator current holds.
The boolean return value indicates whether the operator has any reason not expressed in its output why the computation should not be declared done. For example, operators may need to write data to the screen, persist data to disk, or do some clean-up. The system can ignore this result for the purposes of progress detection, but as the system is probably managing the operator as well as monitoring its progress, this indicates that it is not yet time to shut things down.
A great deal of the power of timely dataflow comes from the ability of each operator to be implemented by another "timely dataflow system", in the language above. While this is probably not actually a fresh system, the interface of the Operate
trait gives us the flexibility to implement the operator internals with similar progress tracking logic at used to manage the operator itself.
An operator only produces connectivity summaries for paths along channels internal to the operator. This makes sense, because at this point it does not know about the connectivity from its outputs to inputs, and cannot reason about path along channels external to the operator.
It may be beneficial to consider a similar restriction of information for the summaries provided by the system to the operator: the output to input path summaries could be only along channels external to the operator. This communicates no less information, and the full path summaries can be reached by blending internal and external summaries, and it may communicate more information. Whether this information is valuable remains to be seen.
Similarly, it may help to communicate only changes in capabilities external to the operator, and not include capabilities that exist only because the operator itself expresses them, and has a path from its output back to its input. The operator can already see that capability, as it knows its own capabilities and the connectivity from its outputs to its inputs. This restriction on information would let an operator see that when it releases a capability internally, some expressed input capability is immediately released, without negotiating this information through its manager.