Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into fix/unfold-message-ha…
Browse files Browse the repository at this point in the history
…ndling
  • Loading branch information
untitaker committed Sep 26, 2024
2 parents b467071 + d854000 commit 564ba1f
Show file tree
Hide file tree
Showing 21 changed files with 319 additions and 113 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- main
- release/**
pull_request:

jobs:
dist:
Expand All @@ -15,11 +16,12 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: 3.8
python-version: 3.12
- run: |
pip install wheel
python setup.py sdist bdist_wheel
- uses: actions/[email protected]
python scripts/create_metrics_def_json.py
- run: pip install wheel setuptools
- run: python setup.py sdist bdist_wheel
- uses: actions/upload-artifact@v4
with:
name: ${{ github.sha }}
path: dist/*
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ jobs:
pip install -r requirements-linter.txt
- name: Run linter
run: |
black arroyo tests
flake8 arroyo tests
black arroyo tests scripts
flake8 arroyo tests scripts
typing:
name: "Type checking"
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
force_orphan: true

- name: Archive Docs
uses: actions/upload-artifact@v3.1.1
uses: actions/upload-artifact@v4
with:
name: docs
path: docs/build
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
.idea/
target/
/Cargo.lock

arroyo/utils/metricDefs.json
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
# Changelog and versioning

## 2.17.6

### Various fixes & improvements

- Remove non-existent @getsentry/processing from CODEOWNERS (#386) by @onkar

## 2.17.5

### Various fixes & improvements

- chore: Fix release builds (#385) by @untitaker
- Add a basic metric for tracking the capacity in VecDeque buffer (#383) by @ayirr7
- feat: enhance metrics defs (#378) by @mj0nez
- feat: Add From<BrokerMessage<_>> impl for InvalidMessage (#377) by @evanpurkhiser
- feat: Add Noop processing strategy (#376) by @evanpurkhiser
- Update RunTask to receive a Message<TPayload> (#375) by @evanpurkhiser
- hotfix, fix master ci (66f1efc3) by @untitaker
- fix: add guard to Produce.poll to ensure next_step is called regardless of produce queue (#370) by @mj0nez
- ref: Add pre-commit hook for rustfmt (#364) by @untitaker
- update accumulator sig to return Result<TResult> instead of TResult (#359) by @john-z-yang
- ref: Use coarsetime consistently (#366) by @untitaker
- ref(rust): Backpressure metrics for threadpools (#367) by @untitaker
- ref(reduce): Refactor for timeout=0 (#363) by @untitaker
- ref(rust): Remove strategy.close (#361) by @untitaker
- ref(rust): Add join-time metric for threadpools (#362) by @untitaker

## 2.17.4

### Various fixes & improvements
Expand Down
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
* @getsentry/owners-snuba @getsentry/ops
/rust-arroyo @getsentry/owners-snuba @getsentry/ops @getsentry/processing
/rust-arroyo @getsentry/owners-snuba @getsentry/ops
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rust_arroyo"
version = "2.17.4"
version = "2.17.6"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
34 changes: 34 additions & 0 deletions arroyo/processing/strategies/noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from typing import Optional, Union

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.types import FilteredPayload, Message, TStrategyPayload


class Noop(
ProcessingStrategy[Union[FilteredPayload, object]],
):
"""
Noop strategy that takes a message and does nothing.
"""

def __init__(self) -> None:
pass

def submit(
self, message: Message[Union[FilteredPayload, TStrategyPayload]]
) -> None:
pass

def poll(self) -> None:
pass

def join(self, timeout: Optional[float] = None) -> None:
pass

def close(self) -> None:
pass

def terminate(self) -> None:
pass
72 changes: 36 additions & 36 deletions arroyo/utils/metric_defs.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,70 @@
from typing import Literal

MetricName = Literal[
# Number of messages in a multiprocessing batch
# Time: Number of messages in a multiprocessing batch
"arroyo.strategies.run_task_with_multiprocessing.batch.size.msg",
# Number of bytes in a multiprocessing batch
# Time: Number of bytes in a multiprocessing batch
"arroyo.strategies.run_task_with_multiprocessing.batch.size.bytes",
# Number of messages in a multiprocessing batch after the message transformation
# Time: Number of messages in a multiprocessing batch after the message transformation
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg",
# Number of bytes in a multiprocessing batch after the message transformation
# Time: Number of bytes in a multiprocessing batch after the message transformation
"arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes",
# Number of times the consumer is spinning
# Counter: Number of times the consumer is spinning
"arroyo.consumer.run.count",
# Number of times the consumer encounted an invalid message.
# Counter: Number of times the consumer encountered an invalid message.
"arroyo.consumer.invalid_message.count",
# How long it took the Reduce step to fill up a batch
# Time: How long it took the Reduce step to fill up a batch
"arroyo.strategies.reduce.batch_time",
# Counter, incremented when a strategy after multiprocessing applies
# Counter: Incremented when a strategy after multiprocessing applies
# backpressure to multiprocessing. May be a reason why CPU cannot be
# saturated.
"arroyo.strategies.run_task_with_multiprocessing.batch.backpressure",
# Counter, incremented when multiprocessing cannot fill the input batch
# Counter: Incremented when multiprocessing cannot fill the input batch
# because not enough memory was allocated. This results in batches smaller
# than configured. Increase `input_block_size` to fix.
"arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow",
# Counter, incremented when multiprocessing cannot pull results in batches
# Counter: Incremented when multiprocessing cannot pull results in batches
# equal to the input batch size, because not enough memory was allocated.
# This can be devastating for throughput. Increase `output_block_size` to
# fix.
"arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow",
# Arroyo has decided to re-allocate a block in order to combat input buffer
# overflow. This behavior can be disabled by explicitly setting
# Counter: Arroyo has decided to re-allocate a block in order to combat input
# buffer overflow. This behavior can be disabled by explicitly setting
# `input_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
"arroyo.strategies.run_task_with_multiprocessing.batch.input.resize",
# Arroyo has decided to re-allocate a block in order to combat output buffer
# overflow. This behavior can be disabled by explicitly setting
# Counter: Arroyo has decided to re-allocate a block in order to combat output
# buffer overflow. This behavior can be disabled by explicitly setting
# `output_block_size` to a not-None value in `RunTaskWithMultiprocessing`.
"arroyo.strategies.run_task_with_multiprocessing.batch.output.resize",
# How many batches are being processed in parallel by multiprocessing.
# Gauge: How many batches are being processed in parallel by multiprocessing.
"arroyo.strategies.run_task_with_multiprocessing.batches_in_progress",
# Counter. A subprocess by multiprocessing unexpectedly died.
# Counter: A subprocess by multiprocessing unexpectedly died.
"sigchld.detected",
# Gauge. Shows how many processes the multiprocessing strategy is
# Gauge: Shows how many processes the multiprocessing strategy is
# configured with.
"arroyo.strategies.run_task_with_multiprocessing.processes",
# Counter. Incremented when the multiprocessing pool is created (or re-created).
# Counter: Incremented when the multiprocessing pool is created (or re-created).
"arroyo.strategies.run_task_with_multiprocessing.pool.create",
# Time (unitless) spent polling librdkafka for new messages.
# Time: (unitless) spent polling librdkafka for new messages.
"arroyo.consumer.poll.time",
# Time (unitless) spent in strategies (blocking in strategy.submit or
# Time: (unitless) spent in strategies (blocking in strategy.submit or
# strategy.poll)
"arroyo.consumer.processing.time",
# Time (unitless) spent pausing the consumer due to backpressure (MessageRejected)
# Time: (unitless) spent pausing the consumer due to backpressure (MessageRejected)
"arroyo.consumer.backpressure.time",
# Time (unitless) spent in handling `InvalidMessage` exceptions and sending
# Time: (unitless) spent in handling `InvalidMessage` exceptions and sending
# messages to the the DLQ.
"arroyo.consumer.dlq.time",
# Time (unitless) spent in waiting for the strategy to exit, such as during
# Time: (unitless) spent in waiting for the strategy to exit, such as during
# shutdown or rebalancing.
"arroyo.consumer.join.time",
# Time (unitless) spent in librdkafka callbacks. This metric's timings
# Time: (unitless) spent in librdkafka callbacks. This metric's timings
# overlap other timings, and might spike at the same time.
"arroyo.consumer.callback.time",
# Time (unitless) spent in shutting down the consumer. This metric's
# Time: (unitless) spent in shutting down the consumer. This metric's
# timings overlap other timings, and might spike at the same time.
"arroyo.consumer.shutdown.time",
# A regular duration metric where each datapoint is measuring the time it
# Time: A regular duration metric where each datapoint is measuring the time it
# took to execute a single callback. This metric is distinct from the
# arroyo.consumer.*.time metrics as it does not attempt to accumulate time
# spent per second in an attempt to keep monitoring overhead low.
Expand All @@ -73,29 +73,29 @@
# executed, as 'callback_name'. Possible values are on_partitions_assigned
# and on_partitions_revoked.
"arroyo.consumer.run.callback",
# Duration metric measuring the time it took to flush in-flight messages
# Time: Duration metric measuring the time it took to flush in-flight messages
# and shut down the strategies.
"arroyo.consumer.run.close_strategy",
# Duration metric measuring the time it took to create the processing strategy.
# Time: Duration metric measuring the time it took to create the processing strategy.
"arroyo.consumer.run.create_strategy",
# How many partitions have been revoked just now.
# Counter: How many partitions have been revoked just now.
"arroyo.consumer.partitions_revoked.count",
# How many partitions have been assigned just now.
# Counter: How many partitions have been assigned just now.
"arroyo.consumer.partitions_assigned.count",
# Consumer latency in seconds. Recorded by the commit offsets strategy.
# Time: Consumer latency in seconds. Recorded by the commit offsets strategy.
"arroyo.consumer.latency",
# Counter metric for when the underlying rdkafka consumer is being paused.
# Counter: Metric for when the underlying rdkafka consumer is being paused.
#
# This flushes internal prefetch buffers.
"arroyo.consumer.pause",
# Counter metric for when the underlying rdkafka consumer is being resumed.
# Counter: Metric for when the underlying rdkafka consumer is being resumed.
#
# This might cause increased network usage as messages are being re-fetched.
"arroyo.consumer.resume",
# Queue size of background queue that librdkafka uses to prefetch messages.
# Gauge: Queue size of background queue that librdkafka uses to prefetch messages.
"arroyo.consumer.librdkafka.total_queue_size",
# Counter metric to measure how often the healthcheck file has been touched.
# Counter: Counter metric to measure how often the healthcheck file has been touched.
"arroyo.processing.strategies.healthcheck.touch",
# Number of messages dropped in the FilterStep strategy
# Counter: Number of messages dropped in the FilterStep strategy
"arroyo.strategies.filter.dropped_messages",
]
10 changes: 10 additions & 0 deletions docs/source/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ Available Metrics

.. literalinclude:: ../../arroyo/utils/metric_defs.py

For convenience Arroyo includes a machine readable version which can be loaded like:

.. code:: python
import importlib.resources
import json
with importlib.resources.files("arroyo.utils").joinpath("metricDefs.json").open() as f:
metric_defs = json.load(f)
API
=======

Expand Down
1 change: 1 addition & 0 deletions docs/source/strategies/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ Messages
run_task_with_multiprocessing
produce
commit_offsets
noop
healthcheck
5 changes: 5 additions & 0 deletions docs/source/strategies/noop.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Noop
-----------------------------

.. automodule:: arroyo.processing.strategies.noop
:members:
24 changes: 5 additions & 19 deletions rust-arroyo/examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,18 @@ use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::producer::KafkaProducer;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::InitialOffset;
use rust_arroyo::processing::strategies::noop::Noop;
use rust_arroyo::processing::strategies::produce::Produce;
use rust_arroyo::processing::strategies::run_task::RunTask;
use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
use rust_arroyo::processing::strategies::{
CommitRequest, InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, StrategyError,
SubmitError,
InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory,
};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::{Message, Topic, TopicOrPartition};

use std::time::Duration;

fn reverse_string(value: KafkaPayload) -> Result<KafkaPayload, InvalidMessage> {
fn reverse_string(message: Message<KafkaPayload>) -> Result<Message<KafkaPayload>, InvalidMessage> {
let value = message.payload();
let payload = value.payload().unwrap();
let str_payload = std::str::from_utf8(payload).unwrap();
let result_str = str_payload.chars().rev().collect::<String>();
Expand All @@ -33,20 +32,7 @@ fn reverse_string(value: KafkaPayload) -> Result<KafkaPayload, InvalidMessage> {
value.headers().cloned(),
Some(result_str.to_bytes().to_vec()),
);
Ok(result)
}
struct Noop {}
impl ProcessingStrategy<KafkaPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
Ok(message.replace(result))
}

#[tokio::main]
Expand Down
Loading

0 comments on commit 564ba1f

Please sign in to comment.