diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..73f7dbc --- /dev/null +++ b/.dockerignore @@ -0,0 +1,180 @@ +pulsar/data/standalone/ + +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +*.log +.DS_Store \ No newline at end of file diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..ff98815 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @alexandrebrilhante \ No newline at end of file diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..06b57dd --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,23 @@ +name: Rust + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: Swatinem/rust-cache@v2 + - name: Build + run: sudo apt-get install -y protobuf-compiler && cargo build --release + - name: Install nightly rustfmt + run: rustup toolchain install nightly --component rustfmt + - name: Check format + run: cargo +nightly fmt --all --check diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2e62a1d --- /dev/null +++ b/.gitignore @@ -0,0 +1,181 @@ +pulsar/data/standalone/ +utils/ + +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +*.log +.DS_Store \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ede3046 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "tick_handler" +version = "0.1.0" +edition = "2021" + +[dependencies] +pulsar = "6.1.0" +tokio = { version = "1.37.0", features = ["full"] } \ No newline at end of file diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..635a8d0 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,20 @@ +Copyright (c) 2024 Alexandre Brilhante + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e8563bb --- /dev/null +++ b/README.md @@ -0,0 +1,99 @@ +# tick_handler + +[![Rust](https://github.com/alexandrebrilhante/tick_handler/actions/workflows/rust.yml/badge.svg)](https://github.com/alexandrebrilhante/tick_handler/actions/workflows/rust.yml) + +This Rust application demonstrates how to set up an asynchronous Apache Pulsar producer that sends messages received from a TCP server and persists those same messages in Cassandra or PostgreSQL using a sink connector. + +This project was built as a proof-of-concept for a low-latency market data feed handler. + +This software comes bundled with a Grafana dashboard to observe the message queue and measure latencies. + +## Dependencies + +Ensure your `Cargo.toml` file includes dependencies for the `tokio` and `pulsar-rs` crates: + +```toml +[dependencies] +pulsar = "6.1.0" +tokio = { version = "1.37.0", features = ["full"] } +``` + +## Usage + +### Standalone + +```bash +cargo build --release && cargo run --release +``` + +### Complete Setup + +#### Cassandra + +Ensure your Cassandra is setup: + +```bash +CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; + +USE pulsar_test_keyspace; + +CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text); +```` + +```bash +pulsar-daemon start standalone + +docker run -d --rm --name=cassandra -p 9042:9042 cassandra:latest + +docker exec -ti cassandra cqlsh localhost + +CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; + +USE pulsar_test_keyspace; + +CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text); + +pulsar-admin sinks create \ + --tenant public \ + --namespace default \ + --name cassandra-test-sink \ + --archive $PWD/pulsar/connectors/pulsar-io-cassandra-3.2.2.nar \ + --sink-config-file $PWD/pulsar/connectors/cassandra-sink.yml \ + --inputs test + +``` + +#### PostgreSQL + +Ensure your PostgreSQL database is setup: + +```sql +psql -U postgres postgres + +CREATE TABLE IF NOT EXISTS pulsar_postgres_jdbc_sink ( + id serial PRIMARY KEY, + name TEXT NOT NULL +); +``` + +```bash +pulsar-daemon start standalone + +docker pull postgres:12 + +docker run -d -it --rm \ + --name pulsar-postgres \ + -p 5432:5432 \ + -e POSTGRES_PASSWORD=postgres \ + -e POSTGRES_USER=postgres \ + postgres:latest + +pulsar-admin schemas upload pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema + +pulsar-admin sinks create \ + --archive $PWD/pulsar/connectors/pulsar-io-jdbc-postgres-3.2.2.nar \ + --inputs test \ + --name pulsar-postgres-jdbc-sink \ + --sink-config-file $PWD/pulsar/connectors/pulsar-postgres-jdbc-sink.yaml \ + --parallelism 1 +``` \ No newline at end of file diff --git a/dashboard/docker-compose.yml b/dashboard/docker-compose.yml new file mode 100644 index 0000000..3a3539e --- /dev/null +++ b/dashboard/docker-compose.yml @@ -0,0 +1,19 @@ +services: + prometheus: + image: prom/prometheus:latest + ports: + - 9000:9090 + volumes: + - ./:/etc/prometheus/ + command: --config.file=/etc/prometheus/prometheus.yml + + grafana: + image: streamnative/apache-pulsar-grafana-dashboard:latest + stdin_open: true + tty: true + ports: + - 3000:3000 + environment: + - PULSAR_PROMETHEUS_URL=http://localhost:9090 + - PULSAR_CLUSTER=standalone + restart: unless-stopped \ No newline at end of file diff --git a/dashboard/prometheus.yml b/dashboard/prometheus.yml new file mode 100644 index 0000000..21b6235 --- /dev/null +++ b/dashboard/prometheus.yml @@ -0,0 +1,30 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + external_labels: + cluster: "standalone" + +scrape_configs: + - job_name: "proxy" + honor_labels: true + static_configs: + - targets: + - "192.168.64.1:8080" + + - job_name: "broker" + honor_labels: true + static_configs: + - targets: + - "192.168.64.1:8080" + + - job_name: "bookie" + honor_labels: true + static_configs: + - targets: + - "192.168.64.1:8080" + + - job_name: "zookeeper" + honor_labels: true + static_configs: + - targets: + - "192.168.64.1:8080" diff --git a/db/docker-compose.yml b/db/docker-compose.yml new file mode 100644 index 0000000..b64aa1f --- /dev/null +++ b/db/docker-compose.yml @@ -0,0 +1,25 @@ +services: + postgres: + image: postgres:latest + container_name: postgres + restart: always + ports: + - "5432:5432" + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + volumes: + - ./init.sql:/docker-entrypoint-initdb.d/init.sql + - ./postgres-data:/var/lib/postgresql/data + pgadmin: + image: dpage/pgadmin4:latest + container_name: pgadmin4 + restart: always + ports: + - "80:80" + environment: + PGADMIN_DEFAULT_EMAIL: alexandre.brilhante@gmail.com + PGADMIN_DEFAULT_PASSWORD: admin + +volumes: + postgres-data: \ No newline at end of file diff --git a/db/init.sql b/db/init.sql new file mode 100644 index 0000000..1741d67 --- /dev/null +++ b/db/init.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS pulsar_postgres_jdbc_sink ( + id serial PRIMARY KEY, + name VARCHAR(255) NOT NULL +); \ No newline at end of file diff --git a/logs/.gitkeep b/logs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/pulsar/connectors/cassandra-sink.yml b/pulsar/connectors/cassandra-sink.yml new file mode 100644 index 0000000..e895080 --- /dev/null +++ b/pulsar/connectors/cassandra-sink.yml @@ -0,0 +1,6 @@ +configs: + roots: "localhost:9042" + keyspace: "pulsar_test_keyspace" + columnFamily: "pulsar_test_table" + keyname: "key" + columnName: "col" \ No newline at end of file diff --git a/pulsar/connectors/pulsar-io-cassandra-3.2.2.nar b/pulsar/connectors/pulsar-io-cassandra-3.2.2.nar new file mode 100644 index 0000000..a4389f7 Binary files /dev/null and b/pulsar/connectors/pulsar-io-cassandra-3.2.2.nar differ diff --git a/pulsar/connectors/pulsar-io-jdbc-postgres-3.2.2.nar b/pulsar/connectors/pulsar-io-jdbc-postgres-3.2.2.nar new file mode 100644 index 0000000..11443fb Binary files /dev/null and b/pulsar/connectors/pulsar-io-jdbc-postgres-3.2.2.nar differ diff --git a/pulsar/connectors/pulsar-postgres-jdbc-sink.yaml b/pulsar/connectors/pulsar-postgres-jdbc-sink.yaml new file mode 100644 index 0000000..8e8c27c --- /dev/null +++ b/pulsar/connectors/pulsar-postgres-jdbc-sink.yaml @@ -0,0 +1,5 @@ +configs: + userName: "alexandre-brilhante" + password: "postgres" + jdbcUrl: "jdbc:postgresql://localhost:5432/postgres" + tableName: "pulsar_postgres_jdbc_sink" diff --git a/pulsar/connectors/schema.avsc b/pulsar/connectors/schema.avsc new file mode 100644 index 0000000..92318e8 --- /dev/null +++ b/pulsar/connectors/schema.avsc @@ -0,0 +1,24 @@ +{ + "type": "AVRO", + "schema": { + "type": "record", + "name": "test", + "fields": [ + { + "name": "id", + "type": [ + "null", + "int" + ] + }, + { + "name": "name", + "type": [ + "null", + "string" + ] + } + ] + }, + "properties": {} +} \ No newline at end of file diff --git a/pulsar/docker-compose.yml b/pulsar/docker-compose.yml new file mode 100644 index 0000000..c0968be --- /dev/null +++ b/pulsar/docker-compose.yml @@ -0,0 +1,91 @@ +networks: + pulsar: + driver: bridge +services: + zookeeper: + image: apachepulsar/pulsar-all:latest + container_name: zookeeper + restart: on-failure + networks: + - pulsar + volumes: + - ./data/zookeeper:/pulsar/data/zookeeper + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m + command: > + bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \ + bin/generate-zookeeper-config.sh conf/zookeeper.conf && \ + exec bin/pulsar zookeeper" + healthcheck: + test: ["CMD", "bin/pulsar-zookeeper-ruok.sh"] + interval: 10s + timeout: 5s + retries: 30 + + pulsar-init: + container_name: pulsar-init + hostname: pulsar-init + image: apachepulsar/pulsar-all:latest + networks: + - pulsar + command: > + bin/pulsar initialize-cluster-metadata \ + --cluster cluster-a \ + --zookeeper zookeeper:2181 \ + --configuration-store zookeeper:2181 \ + --web-service-url http://broker:8080 \ + --broker-service-url pulsar://broker:6650 + depends_on: + zookeeper: + condition: service_healthy + + bookie: + image: apachepulsar/pulsar-all:latest + container_name: bookie + restart: on-failure + networks: + - pulsar + environment: + - clusterName=cluster-a + - zkServers=zookeeper:2181 + - metadataServiceUri=metadata-store:zk:zookeeper:2181 + - advertisedAddress=bookie + - BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m + depends_on: + zookeeper: + condition: service_healthy + pulsar-init: + condition: service_completed_successfully + volumes: + - ./data/bookkeeper:/pulsar/data/bookkeeper + command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie" + + broker: + image: apachepulsar/pulsar-all:latest + container_name: broker + hostname: broker + restart: on-failure + networks: + - pulsar + environment: + - metadataStoreUrl=zk:zookeeper:2181 + - zookeeperServers=zookeeper:2181 + - clusterName=cluster-a + - managedLedgerDefaultEnsembleSize=1 + - managedLedgerDefaultWriteQuorum=1 + - managedLedgerDefaultAckQuorum=1 + - advertisedAddress=broker + - advertisedListeners=external:pulsar://127.0.0.1:6650 + - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m + depends_on: + zookeeper: + condition: service_healthy + bookie: + condition: service_started + ports: + - "6650:6650" + - "8080:8080" + volumes: + - ./pulsar/connectors/:/pulsar/data/ + command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" \ No newline at end of file diff --git a/scripts/run_cassandra.sh b/scripts/run_cassandra.sh new file mode 100644 index 0000000..d04fe85 --- /dev/null +++ b/scripts/run_cassandra.sh @@ -0,0 +1,21 @@ +#!/bin/zsh + +pulsar-daemon start standalone + +docker run -d --rm --name=cassandra -p 9042:9042 cassandra:latest + +docker exec -ti cassandra cqlsh localhost + +CREATE KEYSPACE pulsar_test_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; + +USE pulsar_test_keyspace; + +CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text); + +pulsar-admin sinks create \ + --tenant public \ + --namespace default \ + --name cassandra-test-sink \ + --archive $PWD/pulsar/connectors/pulsar-io-cassandra-3.2.2.nar \ + --sink-config-file $PWD/pulsar/connectors/cassandra-sink.yml \ + --inputs test diff --git a/scripts/run_postgres.sh b/scripts/run_postgres.sh new file mode 100644 index 0000000..1c9b26a --- /dev/null +++ b/scripts/run_postgres.sh @@ -0,0 +1,31 @@ +#!/bin/zsh + +pulsar-daemon start standalone + +docker pull postgres:12 + +docker run -d -it --rm \ + --name pulsar-postgres \ + -p 5432:5432 \ + -e POSTGRES_PASSWORD=postgres \ + -e POSTGRES_USER=postgres \ + postgres:latest + +docker exec -it pulsar-postgres /bin/bash + +psql -U postgres postgres + +create table if not exists pulsar_postgres_jdbc_sink +( +id serial PRIMARY KEY, +name VARCHAR(255) NOT NULL +); + +pulsar-admin schemas upload pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema + +pulsar-admin sinks create \ + --archive $PWD/pulsar/connectors/pulsar-io-jdbc-postgres-3.2.2.nar \ + --inputs test \ + --name pulsar-postgres-jdbc-sink \ + --sink-config-file $PWD/pulsar/connectors/pulsar-postgres-jdbc-sink.yaml \ + --parallelism 1 diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..f483a15 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,145 @@ +use pulsar::{producer::ProducerOptions, Pulsar, TokioExecutor}; +use std::error::Error; +use tokio::{io::AsyncReadExt, net::TcpListener, sync::mpsc}; + +/// Creates a Pulsar instance with the given address. +/// +/// # Arguments +/// +/// * `addr` - The address of the Pulsar instance. +/// +/// # Returns +/// +/// A `Result` containing the `Pulsar` instance or an error. +async fn create_pulsar(addr: &str) -> Result, Box> { + Pulsar::builder(addr, TokioExecutor) + .build() + .await + .map_err(|e: pulsar::Error| e.into()) +} + +/// Creates a Pulsar producer with the given Pulsar instance and topic name. +/// +/// # Arguments +/// +/// * `pulsar` - The Pulsar instance. +/// * `topic_name` - The name of the topic. +/// +/// # Returns +/// +/// A `Result` containing the `Producer` instance or an error. +async fn create_producer( + pulsar: Pulsar, + topic_name: &str, +) -> Result, Box> { + pulsar + .producer() + .with_topic(topic_name) + .with_name("producer") + .with_options(ProducerOptions { + batch_size: Some(4), + ..Default::default() + }) + .build() + .await + .map_err(|e: pulsar::Error| e.into()) +} + +/// Creates a channel for sending and receiving messages. +/// +/// # Returns +/// +/// A tuple containing the sender and receiver of the channel. +async fn create_channel() -> (mpsc::Sender, mpsc::Receiver) { + mpsc::channel(100) +} + +/// Creates a TCP listener with the given address. +/// +/// # Arguments +/// +/// * `addr` - The address to bind the listener to. +/// +/// # Returns +/// +/// A `Result` containing the `TcpListener` instance or an error. +async fn create_listener(addr: &str) -> Result> { + TcpListener::bind(addr) + .await + .map_err(|e: std::io::Error| e.into()) +} + +/// Sends messages from the receiver to the Pulsar producer. +/// +/// # Arguments +/// +/// * `producer` - The Pulsar producer. +/// * `rx` - The receiver of the channel. +async fn send_messages( + mut producer: pulsar::Producer, + mut rx: mpsc::Receiver, +) { + while let Some(message) = rx.recv().await { + if let Err(e) = producer.send(message).await { + eprintln!("Failed to send message to Pulsar; err = {:?}...", e); + } + } +} + +/// Handles a connection from a TCP socket. +/// +/// # Arguments +/// +/// * `socket` - The TCP socket. +/// * `tx` - The sender of the channel. +async fn handle_connection(mut socket: tokio::net::TcpStream, tx: mpsc::Sender) { + let mut buf: [u8; 1024] = [0; 1024]; + + loop { + match socket.read(&mut buf).await { + Ok(n) if n == 0 => return, + Ok(n) => { + let message: String = String::from_utf8_lossy(&buf[0..n]).to_string(); + + if tx.send(message).await.is_err() { + eprintln!("Failed to send message to channel..."); + return; + } + } + Err(e) => { + eprintln!("Failed to read from socket; err = {:?}...", e); + return; + } + }; + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr: &str = "pulsar://localhost:6650"; + let pulsar: Pulsar = create_pulsar(addr).await?; + + let topic_name: &str = "persistent://public/default/test"; + let producer: pulsar::Producer = create_producer(pulsar, topic_name).await?; + + let (tx, rx) = create_channel().await; + + let _producer_task: tokio::task::JoinHandle<()> = tokio::spawn(async move { + send_messages(producer, rx).await; + }); + + let listener: TcpListener = create_listener("127.0.0.1:9999").await?; + + loop { + let (socket, _addr) = listener + .accept() + .await + .expect("Failed to accept connection..."); + + let tx: mpsc::Sender = tx.clone(); + + tokio::spawn(async move { + handle_connection(socket, tx).await; + }); + } +}