The Asynchronous Barrier Snapshotting (ABS) based release of Onyx moves away from fault tolerance and state mechanisms that track and acking individual segments, to inserting and tracking barriers that flow through the Directed Acyclic Graph (DAG).
ABS improves performance by reducing acking overhead, and allows for exactly once aggregations which do not require message de-duplication. In ABS, consistent state snapshots can be made by tracking and aligning the barriers, and snapshotting state at appropriate points of barrier alignment.
Onyx 0.10.0 uses the Asynchronous Barrier Snapshotting method described in Lightweight Asynchronous Snapshots for Distributed Dataflows, Carbone et al. to ensure fault tolerance and exactly once processing of data (not exactly once side effects!).
Every job is assigned a coordinator peer, that notifies input peers of when they should inject a barrier into their datastream (generally every n seconds). These barriers are tracked and aligned throughout the job, with the tasks performing snapshots of their state every time a barrier is aligned from all of its input channels.
Concepts:
- Barrier: a message injected into the data stream, containing the epoch id of the barrier
- Epoch: the id of the barrier, re-starting from 0 whenever the cluster has performed a reallocation.
- Coordinator - a process that injects a barrier into the data stream on a schedule.
- Barrier Alignment: occurs when barriers with a particular id have been received from all input channels on a peer.
- Snapshot: Peer state that can be stored whenever a barrier alignment has occurred.
- Channel: Network messaging channel. Channel may become blocked waiting for channel alignment.
Step 1: Coordinator peer emits barrier with epoch 3 after the coordinator period passes.
Step 2: :input1 peer synchronizes on epoch 3, snapshots state to durable storage, and re-emits the barrier.
Step 3: :input2 peer synchronizes on epoch 3, snapshots state to durable storage, and re-emits the barrier. :agg1 reads barrier with epoch 3 from :input1, blocks the channel.
Step 4: :agg1 synchronizes on epoch 3, snapshots state to durable storage, and re-emits the barrier to :output.
Easier to use plugin interfaces handle more of the work around checkpointing. Plugin authors previously needed to checkpointing code that wrote to ZooKeeper. This is now handled by simply implementing the checkpoint protocol function.
Resume points are a new feature that make it simple to resume state from jobs in new jobs. This allows for simplified deployment / upgrades of long running streaming jobs, simpler refactoring of jobs (e.g. split one job into two jobs, but keep the state for each respective part), and more.
Onyx Windows now perform exactly once data processing (not side effects!),
without needing deduplication and an :onyx/uniqueness-key
to be set.
Performance will be much better than 0.9.x in the future. Performance is currently limited by slow serialization and lack of batch messaging, however this will improve greatly before release.
ABS requires performant checkpointing to durable storage. The current checkpointing implementation checkpoints to ZooKeeper, which is much slower than the alternatives (S3/HDFS/etc), and has a 1MB maximum node size. The final 0.10.0 release will include checkpointing via alternative storage mechanisms.
- :onyx/uniqueness-key
- :onyx/deduplicate?
- :onyx/input-retry-timeout
- :onyx/max-pending
- :onyx/pending-timeout
- :onyx.messaging/retry-ch-buffer-size
- :onyx.messaging/ack-daemon-timeout
- :onyx.messaging/allow-short-circuit?
- :onyx.messaging.aeron/offer-idle-strategy
- :onyx.peer/backpressure-low-water-pct
- :onyx.peer/backpressure-high-water-pct
- :onyx.messaging/compress-fn
- :onyx.messaging/peer-link-idle-timeout
- :onyx.messaging/release-ch-buffer-size
- :onyx.messaging.aeron/publication-creation-timeout
- :onyx.messaging.aeron/subscriber-count
- :onyx.messaging/peer-link-gc-interval
- :onyx.messaging/ack-daemon-clear-interval
- :onyx.messaging/completion-buffer-size
- :onyx.messaging/decompress-fn
- :onyx.messaging/inbound-buffer-size
- :onyx.messaging.aeron/write-buffer-size
- Implement ability for triggers to emit segments to downstream tasks #639
- Re-implement flow condition retries #714
- Fast implementation of checkpointing in S3 #662 and HDFS #715.
- Function plugins cannot currently be used as output tasks #716
- Improve serialization with Simple Binary Encoding. Current serialization of network messages is slow.
- Some barrier behaviours should be definable on a per job basis, not just a peer-config basis #619
- Improve idle strategies for peers that are not processing work.
- Iterative computation - we do not currently provide the ability to feed segments back up the DAG (i.e. you cannot currently turn a DAG into a cyclic graph).
Supported Plugins:
onyx-seq
- Now included in onyx under onyx.plugin.seq and onyx.tasks.seq.onyx-core-async
onyx-kafka
onyx-datomic
onyx-amazon-sqs
onyx-amazon-s3
To use the supported plugins, please use version coordinates such as
[org.onyxplatform/onyx-amazon-sqs "0.10.0-technical-preview2"]
.
Supported tools and libraries:
- [
onyx-dashboard
] (https://github.com/onyx-platform/onyx-dashboard) - [
onyx-peer-http-query
] (https://github.com/onyx-platform/onyx-peer-http-query)
Currently unsupported tools and libraries:
- [
onyx-metrics
] (https://github.com/onyx-platform/onyx-metrics)
Currently Unsupported Plugins: