From 9c69dece97a59f3ac23920473499f7339a0ea68f Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Mon, 25 Nov 2024 16:08:00 -0500 Subject: [PATCH] Added a static async SCB delete delay to address intermittent connection issues on spark worker nodes. (#332) --- README.md | 39 ++++++++++--------- RELEASE.md | 3 ++ .../cql/statement/OriginSelectStatement.java | 4 +- .../com/datastax/cdm/data/DataUtility.java | 32 +++++++++++---- .../datastax/cdm/data/DataUtilityTest.java | 10 ++++- 5 files changed, 58 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 9e079ca4..1963bff1 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ tar -xvzf spark-3.5.3-bin-hadoop3-scala2.13.tgz ``` > [!CAUTION] -> If the above Spark and Scala version does not match, you may see an exception similar like below when running the CDM jobs, +> If the above Spark and Scala version does not match, you may see an exception like below when running the CDM jobs, ``` Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.releaseFence()V ``` @@ -41,24 +41,24 @@ Exception in thread "main" java.lang.NoSuchMethodError: scala.runtime.Statics.re # Steps for Data-Migration: -1. `cdm.properties` file needs to be configured as applicable for the environment. Parameter descriptions and defaults are described in the file. The file can have any name, it does not need to be `cdm.properties`. - > * A simplified sample properties file configuration can be found here as [cdm.properties](./src/resources/cdm.properties) - > * A complete sample properties file configuration can be found here as [cdm-detailed.properties](./src/resources/cdm-detailed.properties) +1. `cdm.properties` file needs to be configured as applicable for the environment. The file can have any name, it does not need to be `cdm.properties`. + > * A sample properties file with default values can be found here as [cdm.properties](./src/resources/cdm.properties) + > * A complete reference properties file with default values can be found here as [cdm-detailed.properties](./src/resources/cdm-detailed.properties) 2. Place the properties file where it can be accessed while running the job via spark-submit. -3. Run the below job using `spark-submit` command as shown below: +3. Run the job using `spark-submit` command as shown below: ``` spark-submit --properties-file cdm.properties \ --conf spark.cdm.schema.origin.keyspaceTable="." \ --master "local[*]" --driver-memory 25G --executor-memory 25G \ ---class com.datastax.cdm.job.Migrate cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt +--class com.datastax.cdm.job.Migrate cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt ``` **Note:** - Above command generates a log file `logfile_name_*.txt` to avoid log output on the console. - Update the memory options (driver & executor memory) based on your use-case -- To track details of a run in the `target` keyspace, pass param `--conf spark.cdm.trackRun=true` -- To filter and migrate data only in a specific token range, you can pass the below two additional params to the `Migration` or `Validation` jobs +- To track details of a run (recorded on the `target` keyspace), pass param `--conf spark.cdm.trackRun=true` +- To filter records only for a specific token range, pass the below two additional params to the `Migration` OR `Validation` job ``` --conf spark.cdm.filter.cassandra.partition.min= @@ -73,7 +73,7 @@ spark-submit --properties-file cdm.properties \ spark-submit --properties-file cdm.properties \ --conf spark.cdm.schema.origin.keyspaceTable="." \ --master "local[*]" --driver-memory 25G --executor-memory 25G \ ---class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt +--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt ``` - Validation job will report differences as “ERRORS” in the log file as shown below. @@ -95,13 +95,13 @@ spark-submit --properties-file cdm.properties \ --conf spark.executor.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \ --conf spark.driver.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \ --master "local[*]" --driver-memory 25G --executor-memory 25G \ ---class com.datastax.cdm.job.DiffData cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt +--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt ``` - The Validation job can also be run in an AutoCorrect mode. This mode can - - Add any missing records from origin to target - - Update any mismatched records between origin and target (makes target same as origin). -- Enable/disable this feature using one or both of the below setting in the config file + - Add any missing records from `origin` to `target` + - Update any mismatched records between `origin` and `target` +- Enable/disable this feature using one or both of the below params in the properties file ``` spark.cdm.autocorrect.missing false|true spark.cdm.autocorrect.mismatch false|true @@ -117,24 +117,27 @@ spark-submit --properties-file cdm.properties \ --conf spark.cdm.schema.origin.keyspaceTable="." \ --conf spark.cdm.trackRun.previousRunId= \ --master "local[*]" --driver-memory 25G --executor-memory 25G \ - --class com.datastax.cdm.job. cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt + --class com.datastax.cdm.job. cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt ``` # Perform large-field Guardrail violation checks -- The tool can be used to identify large fields from a table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field), use class option `--class com.datastax.cdm.job.GuardrailCheck` as shown below +- This mode can help identify large fields on an `origin` table that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field), use class option `--class com.datastax.cdm.job.GuardrailCheck` as shown below ``` spark-submit --properties-file cdm.properties \ --conf spark.cdm.schema.origin.keyspaceTable="." \ --conf spark.cdm.feature.guardrail.colSizeInKB=10000 \ --master "local[*]" --driver-memory 25G --executor-memory 25G \ ---class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-4.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt +--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt ``` +> [!NOTE] +> This mode only operates on one database i.e. `origin`, there is no `target` in this mode + # Features - Auto-detects table schema (column names, types, keys, collections, UDTs, etc.) - Including counter table [Counter tables](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_using/useCountersConcept.html) -- Rerun job from where the previous job had stopped for any reason (killed, had exceptions, etc.) +- Rerun/Resume a previous job that may have stopped for any reason (killed, had exceptions, etc.) - If you rerun a `validation` job, it will include any token-ranges that had differences in the previous run - Preserve [writetimes](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__retrieving-the-datetime-a-write-occurred-p) and [TTLs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/cql_commands/cqlSelect.html#cqlSelect__ref-select-ttl-p) - Supports migration/validation of advanced DataTypes ([Sets](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__set), [Lists](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__list), [Maps](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__map), [UDTs](https://docs.datastax.com/en/dse/6.8/cql/cql/cql_reference/refDataTypes.html#refDataTypes__udt)) @@ -184,7 +187,7 @@ Below recommendations may only be useful when migrating large tables where the d 1. Clone this repo 2. Move to the repo folder `cd cassandra-data-migrator` 3. Run the build `mvn clean package` (Needs Maven 3.9.x) -4. The fat jar (`cassandra-data-migrator-4.x.x.jar`) file should now be present in the `target` folder +4. The fat jar (`cassandra-data-migrator-5.x.x.jar`) file should now be present in the `target` folder # Contributors Checkout all our wonderful contributors [here](./CONTRIBUTING.md#contributors). diff --git a/RELEASE.md b/RELEASE.md index 09aea421..267605c8 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,7 @@ # Release Notes +## [5.1.2] - 2024-11-26 +- Bug fix: SCB file on some Spark worker nodes may get deleted before the connection is established, which may cause connection exception on that worker node. Added a static async SCB delete delay to address such issues. + ## [5.1.1] - 2024-11-22 - Bug fix: Writetime filter does not work as expected when custom writetimestamp is also used (issue #327). - Removed deprecated properties `printStatsAfter` and `printStatsPerPart`. Run metrics should now be tracked using the `trackRun` feature instead. diff --git a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java index 6cb5aa4c..b1c7e76a 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/OriginSelectStatement.java @@ -122,8 +122,8 @@ public boolean shouldFilterRecord(Record record) { } if (originWriteTimeStamp < minWriteTimeStampFilter || originWriteTimeStamp > maxWriteTimeStampFilter) { if (logger.isInfoEnabled()) - logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}", record.getPk(), - originWriteTimeStamp); + logger.info("Timestamp filter removing record with primary key: {} with write timestamp: {}", + record.getPk(), originWriteTimeStamp); return true; } } diff --git a/src/main/java/com/datastax/cdm/data/DataUtility.java b/src/main/java/com/datastax/cdm/data/DataUtility.java index 0e5708b2..16d13c61 100644 --- a/src/main/java/com/datastax/cdm/data/DataUtility.java +++ b/src/main/java/com/datastax/cdm/data/DataUtility.java @@ -25,6 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -39,6 +42,7 @@ public class DataUtility { public static final Logger logger = LoggerFactory.getLogger(DataUtility.class.getName()); protected static final String SCB_FILE_NAME = "_temp_cdm_scb_do_not_touch.zip"; + protected static final int SCB_DELETE_DELAY = 5; public static boolean diff(Object obj1, Object obj2) { if (obj1 == null && obj2 == null) { @@ -157,15 +161,27 @@ public static String getMyClassMethodLine(Exception e) { return "Unknown"; } + public static void deleteGeneratedSCB(long runId, int waitSeconds) { + CompletableFuture.runAsync(() -> { + try { + File originFile = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(runId) + SCB_FILE_NAME); + File targetFile = new File(PKFactory.Side.TARGET + "_" + Long.toString(runId) + SCB_FILE_NAME); + + if (originFile.exists() || targetFile.exists()) { + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(waitSeconds)); + if (originFile.exists()) + originFile.delete(); + if (targetFile.exists()) + targetFile.delete(); + } + } catch (Exception e) { + logger.error("Unable to delete generated SCB files: {}", e.getMessage()); + } + }); + } + public static void deleteGeneratedSCB(long runId) { - File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(runId) + SCB_FILE_NAME); - if (file.exists()) { - file.delete(); - } - file = new File(PKFactory.Side.TARGET + "_" + Long.toString(runId) + SCB_FILE_NAME); - if (file.exists()) { - file.delete(); - } + deleteGeneratedSCB(runId, SCB_DELETE_DELAY); } public static File generateSCB(String host, String port, String trustStorePassword, String trustStorePath, diff --git a/src/test/java/com/datastax/cdm/data/DataUtilityTest.java b/src/test/java/com/datastax/cdm/data/DataUtilityTest.java index 493fd985..205f27cd 100644 --- a/src/test/java/com/datastax/cdm/data/DataUtilityTest.java +++ b/src/test/java/com/datastax/cdm/data/DataUtilityTest.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -171,7 +173,9 @@ public void generateSCBOrigin() throws IOException { File file = new File(PKFactory.Side.ORIGIN + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME); assertTrue(file.exists()); - DataUtility.deleteGeneratedSCB(0); + DataUtility.deleteGeneratedSCB(0, 0); + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); + assertFalse(file.exists()); } @@ -183,7 +187,9 @@ public void generateSCBTarget() throws IOException { File file = new File(PKFactory.Side.TARGET + "_" + Long.toString(0) + DataUtility.SCB_FILE_NAME); assertTrue(file.exists()); - DataUtility.deleteGeneratedSCB(0); + DataUtility.deleteGeneratedSCB(0, 0); + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); + assertFalse(file.exists()); } }