diff --git a/ci/scripts/common.sh b/ci/scripts/common.sh index 3b31afeef253..31896410a762 100755 --- a/ci/scripts/common.sh +++ b/ci/scripts/common.sh @@ -114,3 +114,16 @@ get_latest_kafka_download_url() { local download_url="https://downloads.apache.org/kafka/${latest_version}/kafka_2.13-${latest_version}.tgz" echo "$download_url" } + +get_latest_cassandra_version() { + local versions=$(curl -s https://downloads.apache.org/cassandra/ | grep -Eo 'href="[0-9]+\.[0-9]+\.[0-9]+/"' | grep -Eo "[0-9]+\.[0-9]+\.[0-9]+") + # Sort the version numbers and get the latest one + local latest_version=$(echo "$versions" | sort -V | tail -n1) + echo "$latest_version" +} + +get_latest_cassandra_download_url() { + local latest_version=$(get_latest_cassandra_version) + local download_url="https://downloads.apache.org/cassandra/${latest_version}/apache-cassandra-${latest_version}-bin.tar.gz" + echo "$download_url" +} diff --git a/ci/scripts/e2e-cassandra-sink-test.sh b/ci/scripts/e2e-cassandra-sink-test.sh index cae03843c470..9425d2a08d12 100755 --- a/ci/scripts/e2e-cassandra-sink-test.sh +++ b/ci/scripts/e2e-cassandra-sink-test.sh @@ -33,40 +33,32 @@ tar xf ./risingwave-connector.tar.gz -C ./connector-node echo "--- starting risingwave cluster" risedev ci-start ci-sink-test -sleep 1 +# Wait cassandra server to start +sleep 40 -echo "--- create cassandra table" -curl https://downloads.apache.org/cassandra/4.1.3/apache-cassandra-4.1.3-bin.tar.gz --output apache-cassandra-4.1.3-bin.tar.gz -tar xfvz apache-cassandra-4.1.3-bin.tar.gz -# remove bundled packages, and use installed packages, because Python 3.12 has removed asyncore, but I failed to install libev support for bundled Python driver. -rm apache-cassandra-4.1.3/lib/six-1.12.0-py2.py3-none-any.zip -rm apache-cassandra-4.1.3/lib/cassandra-driver-internal-only-3.25.0.zip -apt-get install -y libev4 libev-dev -pip3 install --break-system-packages cassandra-driver +echo "--- install cassandra" +wget $(get_latest_cassandra_download_url) -O cassandra_latest.tar.gz +tar xfvz cassandra_latest.tar.gz +export LATEST_CASSANDRA_VERSION=$(get_latest_cassandra_version) +export CASSANDRA_DIR="./apache-cassandra-${LATEST_CASSANDRA_VERSION}" + +# Cassandra only support python 3.11 +apt-get install -y software-properties-common +add-apt-repository ppa:deadsnakes/ppa +apt-get update +apt-get install -y python3.11 +apt-get install -y python3.11-venv +python3.11 -m venv cqlsh_env +source cqlsh_env/bin/activate -cd apache-cassandra-4.1.3/bin export CQLSH_HOST=cassandra-server export CQLSH_PORT=9042 -./cqlsh -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo; -CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);" echo "--- testing sinks" -cd ../../ sqllogictest -p 4566 -d dev './e2e_test/sink/cassandra_sink.slt' -sleep 1 -cd apache-cassandra-4.1.3/bin -./cqlsh -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';" -if cat ./query_result.csv | awk -F "," '{ - exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01 01:01:01.000+0000" && $9 == "False\r"); }'; then - echo "Cassandra sink check passed" -else - echo "The output is not as expected." - echo "output:" - cat ./query_result.csv - exit 1 -fi +deactivate echo "--- Kill cluster" cd ../../ -risedev ci-kill \ No newline at end of file +risedev ci-kill diff --git a/e2e_test/sink/cassandra_sink.slt b/e2e_test/sink/cassandra_sink.slt index 7091e8da7078..fe5ca331b591 100644 --- a/e2e_test/sink/cassandra_sink.slt +++ b/e2e_test/sink/cassandra_sink.slt @@ -1,13 +1,19 @@ +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "CREATE KEYSPACE demo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};use demo;CREATE table demo_bhv_table(v1 int primary key,v2 smallint,v3 bigint,v4 float,v5 double,v6 text,v7 date,v8 timestamp,v9 boolean);" + +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "use demo;CREATE table \"Test_uppercase\"(\"TEST_V1\" int primary key, \"TEST_V2\" int,\"TEST_V3\" int);" + statement ok CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean); statement ok -CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; +CREATE TABLE t7 ("TEST_V1" int primary key, "TEST_V2" int, "TEST_V3" int); statement ok CREATE SINK s6 FROM - mv6 WITH ( + t6 WITH ( connector = 'cassandra', type = 'append-only', force_append_only='true', @@ -17,9 +23,25 @@ FROM cassandra.datacenter = 'datacenter1', ); +statement ok +CREATE SINK s7 +FROM + t7 WITH ( + connector = 'cassandra', + type = 'append-only', + force_append_only='true', + cassandra.url = 'cassandra-server:9042', + cassandra.keyspace = 'demo', + cassandra.table = 'Test_uppercase', + cassandra.datacenter = 'datacenter1', +); + statement ok INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false); +statement ok +INSERT INTO t7 VALUES (1, 1, 1); + statement ok FLUSH; @@ -27,7 +49,27 @@ statement ok DROP SINK s6; statement ok -DROP MATERIALIZED VIEW mv6; +DROP TABLE t6; + +statement ok +DROP SINK s7; statement ok -DROP TABLE t6; \ No newline at end of file +DROP TABLE t7; + +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.demo_bhv_table TO './query_result.csv' WITH HEADER = false AND ENCODING = 'UTF-8';" + +system ok +${CASSANDRA_DIR}/bin/cqlsh --request-timeout=20 -e "COPY demo.\"Test_uppercase\" TO './query_result2.csv' WITH HEADER = false AND ENCODING = 'UTF-8';" + +system ok +cat ./query_result.csv +---- +1,1,1,1.1,1.2,test,2013-01-01,2013-01-01 01:01:01.000+0000,False + + +system ok +cat ./query_result2.csv +---- +1,1,1 \ No newline at end of file diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java index bfc40111818a..afc29e2f5f43 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraConfig.java @@ -57,7 +57,7 @@ public CassandraConfig( @JsonProperty(value = "type") String type) { this.url = url; this.keyspace = keyspace; - this.table = table; + this.table = CassandraUtil.convertCQLIdentifiers(table); this.datacenter = datacenter; this.type = type; } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java index 2f8a035911f2..e277505400b5 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraSink.java @@ -193,8 +193,15 @@ public void drop() { private String createInsertStatement(String tableName, TableSchema tableSchema) { String[] columnNames = tableSchema.getColumnNames(); - String columnNamesString = String.join(", ", columnNames); + String columnNamesString = + Arrays.stream(columnNames) + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName)) + .collect(Collectors.joining(", ")); String placeholdersString = String.join(", ", Collections.nCopies(columnNames.length, "?")); + System.out.println( + String.format( + "INSERT INTO %s (%s) VALUES (%s)", + tableName, columnNamesString, placeholdersString)); return String.format( "INSERT INTO %s (%s) VALUES (%s)", tableName, columnNamesString, placeholdersString); @@ -204,11 +211,11 @@ private String createUpdateStatement(String tableName, TableSchema tableSchema) List primaryKeys = tableSchema.getPrimaryKeys(); String setClause = // cassandra does not allow SET on primary keys nonKeyColumns.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?") .collect(Collectors.joining(", ")); String whereClause = primaryKeys.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?") .collect(Collectors.joining(" AND ")); return String.format("UPDATE %s SET %s WHERE %s", tableName, setClause, whereClause); } @@ -217,7 +224,7 @@ private static String createDeleteStatement(String tableName, TableSchema tableS List primaryKeys = tableSchema.getPrimaryKeys(); String whereClause = primaryKeys.stream() - .map(columnName -> columnName + " = ?") + .map(columnName -> CassandraUtil.convertCQLIdentifiers(columnName) + " = ?") .collect(Collectors.joining(" AND ")); return String.format("DELETE FROM %s WHERE %s", tableName, whereClause); } diff --git a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java index a6be8f7fc89c..c1c7cae85c03 100644 --- a/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java +++ b/java/connector-node/risingwave-sink-cassandra/src/main/java/com/risingwave/connector/CassandraUtil.java @@ -166,4 +166,8 @@ public static Object convertRow(Object value, TypeName typeName) { .asRuntimeException(); } } + + public static String convertCQLIdentifiers(String identifier) { + return "\"" + identifier + "\""; + } }