From 8b0bdee27257e4f7235a57b8ba4a0ee19e30d825 Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Mon, 1 Jul 2024 15:42:01 -0400 Subject: [PATCH 01/10] Update RunTask to receive a Message (#375) --- rust-arroyo/examples/transform_and_produce.rs | 5 +++-- .../src/processing/strategies/run_task.rs | 19 ++++++++++++------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/rust-arroyo/examples/transform_and_produce.rs b/rust-arroyo/examples/transform_and_produce.rs index 47b81f47..56970137 100644 --- a/rust-arroyo/examples/transform_and_produce.rs +++ b/rust-arroyo/examples/transform_and_produce.rs @@ -21,7 +21,8 @@ use rust_arroyo::types::{Message, Topic, TopicOrPartition}; use std::time::Duration; -fn reverse_string(value: KafkaPayload) -> Result { +fn reverse_string(message: Message) -> Result, InvalidMessage> { + let value = message.payload(); let payload = value.payload().unwrap(); let str_payload = std::str::from_utf8(payload).unwrap(); let result_str = str_payload.chars().rev().collect::(); @@ -33,7 +34,7 @@ fn reverse_string(value: KafkaPayload) -> Result { value.headers().cloned(), Some(result_str.to_bytes().to_vec()), ); - Ok(result) + Ok(message.replace(result)) } struct Noop {} impl ProcessingStrategy for Noop { diff --git a/rust-arroyo/src/processing/strategies/run_task.rs b/rust-arroyo/src/processing/strategies/run_task.rs index d5182ed2..73fd04b5 100644 --- a/rust-arroyo/src/processing/strategies/run_task.rs +++ b/rust-arroyo/src/processing/strategies/run_task.rs @@ -5,9 +5,13 @@ use crate::processing::strategies::{ use crate::types::Message; use std::time::Duration; +type Function = dyn Fn(Message) -> Result, InvalidMessage> + + Send + + Sync + + 'static; + pub struct RunTask { - pub function: - Box Result + Send + Sync + 'static>, + pub function: Box>, pub next_step: Box>, pub message_carried_over: Option>, pub commit_request_carried_over: Option, @@ -17,7 +21,10 @@ impl RunTask { pub fn new(function: F, next_step: N) -> Self where N: ProcessingStrategy + 'static, - F: Fn(TPayload) -> Result + Send + Sync + 'static, + F: Fn(Message) -> Result, InvalidMessage> + + Send + + Sync + + 'static, { Self { function: Box::new(function), @@ -62,9 +69,7 @@ impl ProcessingStrategy return Err(SubmitError::MessageRejected(MessageRejected { message })); } - let next_message = message - .try_map(&self.function) - .map_err(SubmitError::InvalidMessage)?; + let next_message = (self.function)(message).map_err(SubmitError::InvalidMessage)?; match self.next_step.submit(next_message) { Err(SubmitError::MessageRejected(MessageRejected { @@ -101,7 +106,7 @@ mod tests { #[test] fn test_run_task() { - fn identity(value: String) -> Result { + fn identity(value: Message) -> Result, InvalidMessage> { Ok(value) } From ab87c3a21e9a624a060c6d38b70cdb5d8b9e3c7f Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Mon, 1 Jul 2024 18:58:52 -0400 Subject: [PATCH 02/10] feat: Add Noop processing strategy (#376) This can be useful when you consume a message and do not wish to commit he offset back to Kafka [1]. [1]: The sentry uptime-checker project has this requirement as it is using Kafka as a mechanism to store configurations. By using log compaction we never actually want to commit a log offset, and want to read the entire log every time. --- arroyo/processing/strategies/noop.py | 34 +++++++++++++++++++ docs/source/strategies/index.rst | 1 + docs/source/strategies/noop.rst | 5 +++ rust-arroyo/examples/transform_and_produce.rs | 19 ++--------- rust-arroyo/src/processing/strategies/mod.rs | 1 + rust-arroyo/src/processing/strategies/noop.rs | 23 +++++++++++++ .../src/processing/strategies/produce.rs | 21 +----------- .../src/processing/strategies/run_task.rs | 22 +++--------- tests/processing/strategies/test_noop.py | 19 +++++++++++ 9 files changed, 90 insertions(+), 55 deletions(-) create mode 100644 arroyo/processing/strategies/noop.py create mode 100644 docs/source/strategies/noop.rst create mode 100644 rust-arroyo/src/processing/strategies/noop.rs create mode 100644 tests/processing/strategies/test_noop.py diff --git a/arroyo/processing/strategies/noop.py b/arroyo/processing/strategies/noop.py new file mode 100644 index 00000000..923b0b40 --- /dev/null +++ b/arroyo/processing/strategies/noop.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from typing import Optional, Union + +from arroyo.processing.strategies.abstract import ProcessingStrategy +from arroyo.types import FilteredPayload, Message, TStrategyPayload + + +class Noop( + ProcessingStrategy[Union[FilteredPayload, object]], +): + """ + Noop strategy that takes a message and does nothing. + """ + + def __init__(self) -> None: + pass + + def submit( + self, message: Message[Union[FilteredPayload, TStrategyPayload]] + ) -> None: + pass + + def poll(self) -> None: + pass + + def join(self, timeout: Optional[float] = None) -> None: + pass + + def close(self) -> None: + pass + + def terminate(self) -> None: + pass diff --git a/docs/source/strategies/index.rst b/docs/source/strategies/index.rst index 1c373df0..60d75e1a 100644 --- a/docs/source/strategies/index.rst +++ b/docs/source/strategies/index.rst @@ -35,4 +35,5 @@ Messages run_task_with_multiprocessing produce commit_offsets + noop healthcheck diff --git a/docs/source/strategies/noop.rst b/docs/source/strategies/noop.rst new file mode 100644 index 00000000..5a2dc622 --- /dev/null +++ b/docs/source/strategies/noop.rst @@ -0,0 +1,5 @@ +Noop +----------------------------- + +.. automodule:: arroyo.processing.strategies.noop + :members: diff --git a/rust-arroyo/examples/transform_and_produce.rs b/rust-arroyo/examples/transform_and_produce.rs index 56970137..a44f188b 100644 --- a/rust-arroyo/examples/transform_and_produce.rs +++ b/rust-arroyo/examples/transform_and_produce.rs @@ -9,18 +9,16 @@ use rust_arroyo::backends::kafka::config::KafkaConfig; use rust_arroyo::backends::kafka::producer::KafkaProducer; use rust_arroyo::backends::kafka::types::KafkaPayload; use rust_arroyo::backends::kafka::InitialOffset; +use rust_arroyo::processing::strategies::noop::Noop; use rust_arroyo::processing::strategies::produce::Produce; use rust_arroyo::processing::strategies::run_task::RunTask; use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig; use rust_arroyo::processing::strategies::{ - CommitRequest, InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, StrategyError, - SubmitError, + InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, }; use rust_arroyo::processing::StreamProcessor; use rust_arroyo::types::{Message, Topic, TopicOrPartition}; -use std::time::Duration; - fn reverse_string(message: Message) -> Result, InvalidMessage> { let value = message.payload(); let payload = value.payload().unwrap(); @@ -36,19 +34,6 @@ fn reverse_string(message: Message) -> Result for Noop { - fn poll(&mut self) -> Result, StrategyError> { - Ok(None) - } - fn submit(&mut self, _message: Message) -> Result<(), SubmitError> { - Ok(()) - } - fn terminate(&mut self) {} - fn join(&mut self, _timeout: Option) -> Result, StrategyError> { - Ok(None) - } -} #[tokio::main] async fn main() { diff --git a/rust-arroyo/src/processing/strategies/mod.rs b/rust-arroyo/src/processing/strategies/mod.rs index fbf9014b..180c1f0f 100644 --- a/rust-arroyo/src/processing/strategies/mod.rs +++ b/rust-arroyo/src/processing/strategies/mod.rs @@ -4,6 +4,7 @@ use std::time::Duration; pub mod commit_offsets; pub mod healthcheck; +pub mod noop; pub mod produce; pub mod reduce; pub mod run_task; diff --git a/rust-arroyo/src/processing/strategies/noop.rs b/rust-arroyo/src/processing/strategies/noop.rs new file mode 100644 index 00000000..143357f6 --- /dev/null +++ b/rust-arroyo/src/processing/strategies/noop.rs @@ -0,0 +1,23 @@ +use std::time::Duration; + +use crate::types::Message; + +use super::{CommitRequest, ProcessingStrategy, StrategyError, SubmitError}; + +/// Noop strategy that takes a message and does nothing. +/// +/// This can be useful when you do not care to commit an offset. +pub struct Noop {} + +impl ProcessingStrategy for Noop { + fn poll(&mut self) -> Result, StrategyError> { + Ok(None) + } + fn submit(&mut self, _message: Message) -> Result<(), SubmitError> { + Ok(()) + } + fn terminate(&mut self) {} + fn join(&mut self, _timeout: Option) -> Result, StrategyError> { + Ok(None) + } +} diff --git a/rust-arroyo/src/processing/strategies/produce.rs b/rust-arroyo/src/processing/strategies/produce.rs index f3b51a9b..a433e26e 100644 --- a/rust-arroyo/src/processing/strategies/produce.rs +++ b/rust-arroyo/src/processing/strategies/produce.rs @@ -95,6 +95,7 @@ mod tests { use crate::backends::local::broker::LocalBroker; use crate::backends::local::LocalProducer; use crate::backends::storages::memory::MemoryMessageStorage; + use crate::processing::strategies::noop::Noop; use crate::processing::strategies::StrategyError; use crate::types::{BrokerMessage, InnerMessage, Partition, Topic}; use crate::utils::clock::TestingClock; @@ -152,26 +153,6 @@ mod tests { let partition = Partition::new(Topic::new("test"), 0); - struct Noop {} - impl ProcessingStrategy for Noop { - fn poll(&mut self) -> Result, StrategyError> { - Ok(None) - } - fn submit( - &mut self, - _message: Message, - ) -> Result<(), SubmitError> { - Ok(()) - } - fn terminate(&mut self) {} - fn join( - &mut self, - _timeout: Option, - ) -> Result, StrategyError> { - Ok(None) - } - } - let producer: KafkaProducer = KafkaProducer::new(config); let concurrency = ConcurrencyConfig::new(10); let mut strategy = Produce::new( diff --git a/rust-arroyo/src/processing/strategies/run_task.rs b/rust-arroyo/src/processing/strategies/run_task.rs index 73fd04b5..59dec1ba 100644 --- a/rust-arroyo/src/processing/strategies/run_task.rs +++ b/rust-arroyo/src/processing/strategies/run_task.rs @@ -101,7 +101,10 @@ impl ProcessingStrategy #[cfg(test)] mod tests { use super::*; - use crate::types::{BrokerMessage, InnerMessage, Message, Partition, Topic}; + use crate::{ + processing::strategies::noop::Noop, + types::{BrokerMessage, InnerMessage, Message, Partition, Topic}, + }; use chrono::Utc; #[test] @@ -110,23 +113,6 @@ mod tests { Ok(value) } - struct Noop {} - impl ProcessingStrategy for Noop { - fn poll(&mut self) -> Result, StrategyError> { - Ok(None) - } - fn submit(&mut self, _message: Message) -> Result<(), SubmitError> { - Ok(()) - } - fn terminate(&mut self) {} - fn join( - &mut self, - _timeout: Option, - ) -> Result, StrategyError> { - Ok(None) - } - } - let mut strategy = RunTask::new(identity, Noop {}); let partition = Partition::new(Topic::new("test"), 0); diff --git a/tests/processing/strategies/test_noop.py b/tests/processing/strategies/test_noop.py new file mode 100644 index 00000000..c1ec4eb7 --- /dev/null +++ b/tests/processing/strategies/test_noop.py @@ -0,0 +1,19 @@ +from datetime import datetime + +from arroyo.processing.strategies.noop import Noop +from arroyo.types import Message, Partition, Topic, Value + + +def test_noop() -> None: + """ + Test that the interface of the noop strategy is correct. + """ + now = datetime.now() + + strategy = Noop() + partition = Partition(Topic("topic"), 0) + + strategy.submit(Message(Value(b"hello", {partition: 1}, now))) + strategy.poll() + strategy.submit(Message(Value(b"world", {partition: 2}, now))) + strategy.poll() From 0b84afc07131d8b8d48abcb7c8de8cfa2a98e526 Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Tue, 2 Jul 2024 12:42:25 -0400 Subject: [PATCH 03/10] feat: Add From> impl for InvalidMessage (#377) --- rust-arroyo/src/processing/strategies/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/rust-arroyo/src/processing/strategies/mod.rs b/rust-arroyo/src/processing/strategies/mod.rs index 180c1f0f..995f5b19 100644 --- a/rust-arroyo/src/processing/strategies/mod.rs +++ b/rust-arroyo/src/processing/strategies/mod.rs @@ -1,4 +1,4 @@ -use crate::types::{Message, Partition}; +use crate::types::{BrokerMessage, Message, Partition}; use std::collections::HashMap; use std::time::Duration; @@ -27,6 +27,15 @@ pub struct InvalidMessage { pub offset: u64, } +impl From<&BrokerMessage> for InvalidMessage { + fn from(value: &BrokerMessage) -> Self { + Self { + partition: value.partition, + offset: value.offset, + } + } +} + /// Signals that we need to commit offsets #[derive(Debug, Clone, PartialEq)] pub struct CommitRequest { From cbc46dea11a3ec641a47b57311aef4512f3ec1c2 Mon Sep 17 00:00:00 2001 From: Marcel Johannesmann Date: Mon, 8 Jul 2024 17:11:56 +0200 Subject: [PATCH 04/10] feat: enhance metrics defs (#378) * feat: added metric types to comments * feat: create script to auto-generate metric definitions as JSON file * build: added data file to package and build * ci: run linters on scripts dir too and ignore auto-generated file * docs: updated metrics section * docs: fix typo * fix: strip all `#` * docs: described parsing of comment --- .github/workflows/build.yml | 2 + .github/workflows/ci.yml | 4 +- .gitignore | 2 + arroyo/utils/metric_defs.py | 72 ++++++++++++------------- docs/source/metrics.rst | 10 ++++ scripts/create_metrics_def_json.py | 85 ++++++++++++++++++++++++++++++ setup.py | 2 +- 7 files changed, 138 insertions(+), 39 deletions(-) create mode 100644 scripts/create_metrics_def_json.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index af3ec83d..d65c4a4f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,6 +16,8 @@ jobs: - uses: actions/setup-python@v2 with: python-version: 3.8 + - run: | + python scripts/create_metrics_def_json.py - run: | pip install wheel python setup.py sdist bdist_wheel diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ef8fe4e0..86d3cdbe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,8 +23,8 @@ jobs: pip install -r requirements-linter.txt - name: Run linter run: | - black arroyo tests - flake8 arroyo tests + black arroyo tests scripts + flake8 arroyo tests scripts typing: name: "Type checking" runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 398c83ee..6a2816f6 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ .idea/ target/ /Cargo.lock + +arroyo/utils/metricDefs.json diff --git a/arroyo/utils/metric_defs.py b/arroyo/utils/metric_defs.py index 05f47ac3..d6cdd33e 100644 --- a/arroyo/utils/metric_defs.py +++ b/arroyo/utils/metric_defs.py @@ -1,70 +1,70 @@ from typing import Literal MetricName = Literal[ - # Number of messages in a multiprocessing batch + # Time: Number of messages in a multiprocessing batch "arroyo.strategies.run_task_with_multiprocessing.batch.size.msg", - # Number of bytes in a multiprocessing batch + # Time: Number of bytes in a multiprocessing batch "arroyo.strategies.run_task_with_multiprocessing.batch.size.bytes", - # Number of messages in a multiprocessing batch after the message transformation + # Time: Number of messages in a multiprocessing batch after the message transformation "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.msg", - # Number of bytes in a multiprocessing batch after the message transformation + # Time: Number of bytes in a multiprocessing batch after the message transformation "arroyo.strategies.run_task_with_multiprocessing.output_batch.size.bytes", - # Number of times the consumer is spinning + # Counter: Number of times the consumer is spinning "arroyo.consumer.run.count", - # Number of times the consumer encounted an invalid message. + # Counter: Number of times the consumer encountered an invalid message. "arroyo.consumer.invalid_message.count", - # How long it took the Reduce step to fill up a batch + # Time: How long it took the Reduce step to fill up a batch "arroyo.strategies.reduce.batch_time", - # Counter, incremented when a strategy after multiprocessing applies + # Counter: Incremented when a strategy after multiprocessing applies # backpressure to multiprocessing. May be a reason why CPU cannot be # saturated. "arroyo.strategies.run_task_with_multiprocessing.batch.backpressure", - # Counter, incremented when multiprocessing cannot fill the input batch + # Counter: Incremented when multiprocessing cannot fill the input batch # because not enough memory was allocated. This results in batches smaller # than configured. Increase `input_block_size` to fix. "arroyo.strategies.run_task_with_multiprocessing.batch.input.overflow", - # Counter, incremented when multiprocessing cannot pull results in batches + # Counter: Incremented when multiprocessing cannot pull results in batches # equal to the input batch size, because not enough memory was allocated. # This can be devastating for throughput. Increase `output_block_size` to # fix. "arroyo.strategies.run_task_with_multiprocessing.batch.output.overflow", - # Arroyo has decided to re-allocate a block in order to combat input buffer - # overflow. This behavior can be disabled by explicitly setting + # Counter: Arroyo has decided to re-allocate a block in order to combat input + # buffer overflow. This behavior can be disabled by explicitly setting # `input_block_size` to a not-None value in `RunTaskWithMultiprocessing`. "arroyo.strategies.run_task_with_multiprocessing.batch.input.resize", - # Arroyo has decided to re-allocate a block in order to combat output buffer - # overflow. This behavior can be disabled by explicitly setting + # Counter: Arroyo has decided to re-allocate a block in order to combat output + # buffer overflow. This behavior can be disabled by explicitly setting # `output_block_size` to a not-None value in `RunTaskWithMultiprocessing`. "arroyo.strategies.run_task_with_multiprocessing.batch.output.resize", - # How many batches are being processed in parallel by multiprocessing. + # Gauge: How many batches are being processed in parallel by multiprocessing. "arroyo.strategies.run_task_with_multiprocessing.batches_in_progress", - # Counter. A subprocess by multiprocessing unexpectedly died. + # Counter: A subprocess by multiprocessing unexpectedly died. "sigchld.detected", - # Gauge. Shows how many processes the multiprocessing strategy is + # Gauge: Shows how many processes the multiprocessing strategy is # configured with. "arroyo.strategies.run_task_with_multiprocessing.processes", - # Counter. Incremented when the multiprocessing pool is created (or re-created). + # Counter: Incremented when the multiprocessing pool is created (or re-created). "arroyo.strategies.run_task_with_multiprocessing.pool.create", - # Time (unitless) spent polling librdkafka for new messages. + # Time: (unitless) spent polling librdkafka for new messages. "arroyo.consumer.poll.time", - # Time (unitless) spent in strategies (blocking in strategy.submit or + # Time: (unitless) spent in strategies (blocking in strategy.submit or # strategy.poll) "arroyo.consumer.processing.time", - # Time (unitless) spent pausing the consumer due to backpressure (MessageRejected) + # Time: (unitless) spent pausing the consumer due to backpressure (MessageRejected) "arroyo.consumer.backpressure.time", - # Time (unitless) spent in handling `InvalidMessage` exceptions and sending + # Time: (unitless) spent in handling `InvalidMessage` exceptions and sending # messages to the the DLQ. "arroyo.consumer.dlq.time", - # Time (unitless) spent in waiting for the strategy to exit, such as during + # Time: (unitless) spent in waiting for the strategy to exit, such as during # shutdown or rebalancing. "arroyo.consumer.join.time", - # Time (unitless) spent in librdkafka callbacks. This metric's timings + # Time: (unitless) spent in librdkafka callbacks. This metric's timings # overlap other timings, and might spike at the same time. "arroyo.consumer.callback.time", - # Time (unitless) spent in shutting down the consumer. This metric's + # Time: (unitless) spent in shutting down the consumer. This metric's # timings overlap other timings, and might spike at the same time. "arroyo.consumer.shutdown.time", - # A regular duration metric where each datapoint is measuring the time it + # Time: A regular duration metric where each datapoint is measuring the time it # took to execute a single callback. This metric is distinct from the # arroyo.consumer.*.time metrics as it does not attempt to accumulate time # spent per second in an attempt to keep monitoring overhead low. @@ -73,29 +73,29 @@ # executed, as 'callback_name'. Possible values are on_partitions_assigned # and on_partitions_revoked. "arroyo.consumer.run.callback", - # Duration metric measuring the time it took to flush in-flight messages + # Time: Duration metric measuring the time it took to flush in-flight messages # and shut down the strategies. "arroyo.consumer.run.close_strategy", - # Duration metric measuring the time it took to create the processing strategy. + # Time: Duration metric measuring the time it took to create the processing strategy. "arroyo.consumer.run.create_strategy", - # How many partitions have been revoked just now. + # Counter: How many partitions have been revoked just now. "arroyo.consumer.partitions_revoked.count", - # How many partitions have been assigned just now. + # Counter: How many partitions have been assigned just now. "arroyo.consumer.partitions_assigned.count", - # Consumer latency in seconds. Recorded by the commit offsets strategy. + # Time: Consumer latency in seconds. Recorded by the commit offsets strategy. "arroyo.consumer.latency", - # Counter metric for when the underlying rdkafka consumer is being paused. + # Counter: Metric for when the underlying rdkafka consumer is being paused. # # This flushes internal prefetch buffers. "arroyo.consumer.pause", - # Counter metric for when the underlying rdkafka consumer is being resumed. + # Counter: Metric for when the underlying rdkafka consumer is being resumed. # # This might cause increased network usage as messages are being re-fetched. "arroyo.consumer.resume", - # Queue size of background queue that librdkafka uses to prefetch messages. + # Gauge: Queue size of background queue that librdkafka uses to prefetch messages. "arroyo.consumer.librdkafka.total_queue_size", - # Counter metric to measure how often the healthcheck file has been touched. + # Counter: Counter metric to measure how often the healthcheck file has been touched. "arroyo.processing.strategies.healthcheck.touch", - # Number of messages dropped in the FilterStep strategy + # Counter: Number of messages dropped in the FilterStep strategy "arroyo.strategies.filter.dropped_messages", ] diff --git a/docs/source/metrics.rst b/docs/source/metrics.rst index 94011dbe..c005c740 100644 --- a/docs/source/metrics.rst +++ b/docs/source/metrics.rst @@ -45,6 +45,16 @@ Available Metrics .. literalinclude:: ../../arroyo/utils/metric_defs.py +For convenience Arroyo includes a machine readable version which can be loaded like: + +.. code:: python + + import importlib.resources + import json + + with importlib.resources.files("arroyo.utils").joinpath("metricDefs.json").open() as f: + metric_defs = json.load(f) + API ======= diff --git a/scripts/create_metrics_def_json.py b/scripts/create_metrics_def_json.py new file mode 100644 index 00000000..c38f9924 --- /dev/null +++ b/scripts/create_metrics_def_json.py @@ -0,0 +1,85 @@ +import itertools +import json +import re +from collections.abc import Iterable, Iterator +from pathlib import Path +from typing import Dict, List, Tuple, TypeVar + +TElement = TypeVar("TElement") + +METRICS_DEF_SRC_PATH = "arroyo/utils/metric_defs.py" +METRICS_DEF_JSON_PATH = ( + "arroyo/utils/metricDefs.json" # must match name in `package_data` +) + + +def extract_literal_content_from_source(src_file: str) -> str: + return re.search(r"\[(.|\n)*\]", src_file).group().lstrip("[\n").rstrip("\n]") # type: ignore + + +def create_comment_metric_list(content_of_literal: str) -> List[str]: + # split content on each metric - create a list of [comment_1,metric_1,...] + return re.split(r"(\"arroyo\.[\w\.]+\")", content_of_literal) + + +def batched(iterable: Iterable[TElement], n: int) -> Iterator[Tuple[TElement, ...]]: + # taken from https://docs.python.org/3/library/itertools.html#itertools.batched + # + # because CI runs on 3.8 and itertools.batched ships with 3.12 + # + # batched('ABCDEFG', 3) → ABC DEF G + if n < 1: + raise ValueError("n must be at least one") + iterator = iter(iterable) + while batch := tuple(itertools.islice(iterator, n)): + yield batch + + +def parse_metric_name(metric_name_raw: str) -> str: + return metric_name_raw.replace('"', "") + + +def parse_description_comment(comment: str) -> Tuple[str, str]: + # a metric description is a single or multi-line comment where the metric type precedes + # on the first line e.g. : + type_, description = ( + re.sub(r"(#\s|#)", "", comment) + .lstrip(",\n") + .rstrip("\n") + .split(": ", maxsplit=1) + ) + return type_, description + + +def create_machine_readable_structure( + comment_metric_list: List[str], +) -> Dict[str, Dict[str, str]]: + metrics = {} + for full_description, metric in batched( + filter(lambda x: x != ",", comment_metric_list), 2 + ): + name = parse_metric_name(metric) + type_, description = parse_description_comment(full_description) + metrics[name] = {"name": name, "type": type_, "description": description} + + return metrics + + +def main() -> None: + src_file = Path(METRICS_DEF_SRC_PATH).read_text() + + # remove indentation + src_file = src_file.replace(" " * 4, "") + + content_of_literal = extract_literal_content_from_source(src_file) + + comment_metric_list = create_comment_metric_list(content_of_literal) + + metrics = create_machine_readable_structure(comment_metric_list) + + with open(Path(METRICS_DEF_JSON_PATH), mode="w") as f: + json.dump(metrics, f) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index ea473c4f..912712fe 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ def get_requirements() -> Sequence[str]: long_description=open("README.md").read(), long_description_content_type="text/markdown", packages=find_packages(exclude=["tests", "examples"]), - package_data={"arroyo": ["py.typed"]}, + package_data={"arroyo": ["py.typed","utils/metricDefs.json"]}, zip_safe=False, install_requires=get_requirements(), classifiers=[ From ca204da69a74f2dcd2bde80367db94886cb0e9f6 Mon Sep 17 00:00:00 2001 From: Riya Chakraborty <47572810+ayirr7@users.noreply.github.com> Date: Wed, 11 Sep 2024 08:49:37 -0700 Subject: [PATCH 05/10] Add a basic metric for tracking the capacity in VecDeque buffer (#383) * add a gauge to capture buffer capacity * add capacity gauge * change to u64 * change metric name * get number of partitions * add samples * remove smampling * fix typo * remove unused imports --- rust-arroyo/src/processing/dlq.rs | 33 ++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/rust-arroyo/src/processing/dlq.rs b/rust-arroyo/src/processing/dlq.rs index 05d4dba1..7cb5b1c1 100644 --- a/rust-arroyo/src/processing/dlq.rs +++ b/rust-arroyo/src/processing/dlq.rs @@ -11,6 +11,7 @@ use tokio::task::JoinHandle; use crate::backends::kafka::producer::KafkaProducer; use crate::backends::kafka::types::KafkaPayload; use crate::backends::Producer; +use crate::gauge; use crate::types::{BrokerMessage, Partition, Topic, TopicOrPartition}; // This is a per-partition max @@ -402,6 +403,12 @@ impl BufferedMessages { return; } + // Number of partitions in the buffer map + gauge!( + "arroyo.consumer.dlq_buffer.assigned_partitions", + self.buffered_messages.len() as u64, + ); + let buffered = self.buffered_messages.entry(message.partition).or_default(); if let Some(max) = self.max_per_partition { if buffered.len() >= max { @@ -414,22 +421,46 @@ impl BufferedMessages { } buffered.push_back(message); + + // Number of elements that can be held in buffer deque without reallocating + gauge!( + "arroyo.consumer.dlq_buffer.capacity", + buffered.capacity() as u64 + ); } /// Return the message at the given offset or None if it is not found in the buffer. /// Messages up to the offset for the given partition are removed. pub fn pop(&mut self, partition: &Partition, offset: u64) -> Option> { + // Number of partitions in the buffer map + gauge!( + "arroyo.consumer.dlq_buffer.assigned_partitions", + self.buffered_messages.len() as u64, + ); + let messages = self.buffered_messages.get_mut(partition)?; while let Some(message) = messages.front() { match message.offset.cmp(&offset) { Ordering::Equal => { - return messages.pop_front(); + let first = messages.pop_front(); + + gauge!( + "arroyo.consumer.dlq_buffer.capacity", + messages.capacity() as u64 + ); + + return first; } Ordering::Greater => { return None; } Ordering::Less => { messages.pop_front(); + + gauge!( + "arroyo.consumer.dlq_buffer.capacity", + messages.capacity() as u64 + ); } }; } From 60daa37eecb71ec95fd894149bce296bb91a93ec Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 11 Sep 2024 18:03:29 +0200 Subject: [PATCH 06/10] chore: Fix release builds (#385) * Also run wheel build on PRs * bump wheel build * install setuptools --- .github/workflows/build.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d65c4a4f..2f200766 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,6 +5,7 @@ on: branches: - main - release/** + pull_request: jobs: dist: @@ -15,12 +16,11 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: 3.12 - run: | python scripts/create_metrics_def_json.py - - run: | - pip install wheel - python setup.py sdist bdist_wheel + - run: pip install wheel setuptools + - run: python setup.py sdist bdist_wheel - uses: actions/upload-artifact@v3.1.1 with: name: ${{ github.sha }} From 2292f5754e386dda4ae04a487077fa0f6ea94185 Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Wed, 11 Sep 2024 17:37:10 +0000 Subject: [PATCH 07/10] release: 2.17.5 --- CHANGELOG.md | 20 ++++++++++++++++++++ Cargo.toml | 2 +- setup.py | 2 +- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b02236f5..c686663c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,25 @@ # Changelog and versioning +## 2.17.5 + +### Various fixes & improvements + +- chore: Fix release builds (#385) by @untitaker +- Add a basic metric for tracking the capacity in VecDeque buffer (#383) by @ayirr7 +- feat: enhance metrics defs (#378) by @mj0nez +- feat: Add From> impl for InvalidMessage (#377) by @evanpurkhiser +- feat: Add Noop processing strategy (#376) by @evanpurkhiser +- Update RunTask to receive a Message (#375) by @evanpurkhiser +- hotfix, fix master ci (66f1efc3) by @untitaker +- fix: add guard to Produce.poll to ensure next_step is called regardless of produce queue (#370) by @mj0nez +- ref: Add pre-commit hook for rustfmt (#364) by @untitaker +- update accumulator sig to return Result instead of TResult (#359) by @john-z-yang +- ref: Use coarsetime consistently (#366) by @untitaker +- ref(rust): Backpressure metrics for threadpools (#367) by @untitaker +- ref(reduce): Refactor for timeout=0 (#363) by @untitaker +- ref(rust): Remove strategy.close (#361) by @untitaker +- ref(rust): Add join-time metric for threadpools (#362) by @untitaker + ## 2.17.4 ### Various fixes & improvements diff --git a/Cargo.toml b/Cargo.toml index 78c99463..f301be47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust_arroyo" -version = "2.17.4" +version = "2.17.5" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/setup.py b/setup.py index 912712fe..c8ee832d 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ def get_requirements() -> Sequence[str]: setup( name="sentry-arroyo", - version="2.17.4", + version="2.17.5", author="Sentry", author_email="oss@sentry.io", license="Apache-2.0", From 0c1a4021637d4875408aaf54acd86614fa9f44b4 Mon Sep 17 00:00:00 2001 From: Onkar Deshpande Date: Wed, 11 Sep 2024 12:09:41 -0700 Subject: [PATCH 08/10] Remove non-existent @getsentry/processing from CODEOWNERS (#386) --- CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CODEOWNERS b/CODEOWNERS index b4563502..e0e400a8 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,2 +1,2 @@ * @getsentry/owners-snuba @getsentry/ops -/rust-arroyo @getsentry/owners-snuba @getsentry/ops @getsentry/processing +/rust-arroyo @getsentry/owners-snuba @getsentry/ops From 8d4d32f83ce757b60ad40a2e4fbda40642f17185 Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Wed, 11 Sep 2024 20:33:46 +0000 Subject: [PATCH 09/10] release: 2.17.6 --- CHANGELOG.md | 6 ++++++ Cargo.toml | 2 +- setup.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c686663c..7914f835 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog and versioning +## 2.17.6 + +### Various fixes & improvements + +- Remove non-existent @getsentry/processing from CODEOWNERS (#386) by @onkar + ## 2.17.5 ### Various fixes & improvements diff --git a/Cargo.toml b/Cargo.toml index f301be47..95b6143a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust_arroyo" -version = "2.17.5" +version = "2.17.6" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/setup.py b/setup.py index c8ee832d..af71904c 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ def get_requirements() -> Sequence[str]: setup( name="sentry-arroyo", - version="2.17.5", + version="2.17.6", author="Sentry", author_email="oss@sentry.io", license="Apache-2.0", From d85400046a703911a02ec172f3a136ef73f839d2 Mon Sep 17 00:00:00 2001 From: joshuarli Date: Mon, 23 Sep 2024 10:26:04 -0700 Subject: [PATCH 10/10] all-repos: update actions/upload-artifact to v4 (#381) Committed via https://github.com/asottile/all-repos --- .github/workflows/build.yml | 2 +- .github/workflows/docs.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2f200766..16bf2cb3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -21,7 +21,7 @@ jobs: python scripts/create_metrics_def_json.py - run: pip install wheel setuptools - run: python setup.py sdist bdist_wheel - - uses: actions/upload-artifact@v3.1.1 + - uses: actions/upload-artifact@v4 with: name: ${{ github.sha }} path: dist/* diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 31a59dd6..43925e56 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -28,7 +28,7 @@ jobs: force_orphan: true - name: Archive Docs - uses: actions/upload-artifact@v3.1.1 + uses: actions/upload-artifact@v4 with: name: docs path: docs/build