diff --git a/.env.template b/.env.template index 5e7d72e..fed3a2d 100644 --- a/.env.template +++ b/.env.template @@ -1,3 +1,4 @@ PULSAR_ADDRESS=pulsar://localhost:6650 +PULSAR_TOPIC_NAME=test PULSAR_TOPIC=persistent://public/default/test TCP_SOCKET=127.0.0.1:9999 \ No newline at end of file diff --git a/README.md b/README.md index 5b6c0f7..9df370b 100644 --- a/README.md +++ b/README.md @@ -43,15 +43,14 @@ CREATE TABLE pulsar_cassandra_sink (key text PRIMARY KEY, col text); ```bash pulsar-daemon start standalone -pulsar-admin schemas upload test -f ./connectors/avro-schema +pulsar-admin schemas upload test -f $PWD/pulsar/connectors/avro-schema pulsar-admin sinks create \ - --tenant public \ - --namespace default \ - --name cassandra-sink \ --archive $PWD/pulsar/connectors/cassandra/pulsar-io-cassandra-3.2.2.nar \ + --inputs $PULSAR_TOPIC_NAME \ + --name cassandra-sink \ --sink-config-file $PWD/pulsar/connectors/cassandra/cassandra-sink.yml \ - --inputs test + --parallelism 1 ``` #### PostgreSQL @@ -68,11 +67,11 @@ CREATE TABLE IF NOT EXISTS pulsar_postgres_jdbc_sink ( ```bash pulsar-daemon start standalone -pulsar-admin schemas upload test -f ./connectors/avro-schema +pulsar-admin schemas upload test -f $PWD/pulsar/connectors/avro-schema pulsar-admin sinks create \ --archive $PWD/pulsar/connectors/postgres/pulsar-io-jdbc-postgres-3.2.2.nar \ - --inputs test \ + --inputs $PULSAR_TOPIC_NAME \ --name postgres-sink \ --sink-config-file $PWD/pulsar/connectors/postgres/postgres-sink.yaml \ --parallelism 1 @@ -92,11 +91,11 @@ CREATE TABLE IF NOT EXISTS pulsar_questdb_sink ( ```bash pulsar-daemon start standalone -pulsar-admin schemas upload test -f ./connectors/avro-schema +pulsar-admin schemas upload test -f $PWD/pulsar/connectors/avro-schema pulsar-admin sinks create \ --archive $PWD/pulsar/connectors/postgres/pulsar-io-jdbc-postgres-3.2.2.nar \ - --inputs test \ + --inputs $PULSAR_TOPIC_NAME \ --name questdb-sink \ --sink-config-file $PWD/pulsar/connectors/questdb/questdb-sink.yaml \ --parallelism 1 diff --git a/pulsar/data/zookeeper/version-2/log.1 b/pulsar/data/zookeeper/version-2/log.1 deleted file mode 100644 index 00618c3..0000000 Binary files a/pulsar/data/zookeeper/version-2/log.1 and /dev/null differ diff --git a/pulsar/data/zookeeper/version-2/snapshot.0 b/pulsar/data/zookeeper/version-2/snapshot.0 deleted file mode 100644 index 96c3844..0000000 Binary files a/pulsar/data/zookeeper/version-2/snapshot.0 and /dev/null differ diff --git a/scripts/run_cassandra.sh b/scripts/run_cassandra.sh index 040342d..c7a4479 100644 --- a/scripts/run_cassandra.sh +++ b/scripts/run_cassandra.sh @@ -2,12 +2,11 @@ pulsar-daemon start standalone -pulsar-admin schemas upload test -f ./connectors/avro-schema +pulsar-admin schemas upload test -f $PWD/pulsar/connectors/avro-schema pulsar-admin sinks create \ - --tenant public \ - --namespace default \ - --name cassandra-sink \ --archive $PWD/pulsar/connectors/cassandra/pulsar-io-cassandra-3.2.2.nar \ + --inputs $PULSAR_TOPIC_NAME \ + --name cassandra-sink \ --sink-config-file $PWD/pulsar/connectors/cassandra/cassandra-sink.yml \ - --inputs test + --parallelism 1 diff --git a/scripts/run_postgres.sh b/scripts/run_postgres.sh index 13c8bf1..097111c 100644 --- a/scripts/run_postgres.sh +++ b/scripts/run_postgres.sh @@ -2,11 +2,11 @@ pulsar-daemon start standalone -pulsar-admin schemas upload test -f ./connectors/avro-schema +pulsar-admin schemas upload test -f $PWD/pulsar/connectors/avro-schema pulsar-admin sinks create \ --archive $PWD/pulsar/connectors/postgres/pulsar-io-jdbc-postgres-3.2.2.nar \ - --inputs test \ + --inputs $PULSAR_TOPIC_NAME \ --name postgres-sink \ --sink-config-file $PWD/pulsar/connectors/postgres/postgres-sink.yaml \ --parallelism 1 diff --git a/scripts/run_questdb.sh b/scripts/run_questdb.sh index 276609c..6b73a2c 100644 --- a/scripts/run_questdb.sh +++ b/scripts/run_questdb.sh @@ -2,11 +2,11 @@ pulsar-daemon start standalone -pulsar-admin schemas upload test -f ./connectors/avro-schema +pulsar-admin schemas upload test -f $PWD/pulsar/connectors/avro-schema pulsar-admin sinks create \ --archive $PWD/pulsar/connectors/postgres/pulsar-io-jdbc-postgres-3.2.2.nar \ - --inputs test \ + --inputs $PULSAR_TOPIC_NAME \ --name questdb-sink \ --sink-config-file $PWD/pulsar/connectors/questdb/questdb-sink.yaml \ --parallelism 1 diff --git a/src/main.rs b/src/main.rs index 9c8ab5d..14ef397 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,7 +81,7 @@ async fn send_messages( mut rx: mpsc::Receiver, ) { while let Some(message) = rx.recv().await { - if let Err(e) = producer.send(message).await { + if let Err(e) = producer.send_non_blocking(message).await { eprintln!("Failed to send message to Pulsar; err = {:?}...", e); } }