Skip to content

Commit

Permalink
Major refactor to make CDM work in cluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
pravinbhat committed Oct 23, 2024
1 parent d85c43f commit 48d72a6
Show file tree
Hide file tree
Showing 22 changed files with 160 additions and 236 deletions.
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,21 @@ spark-submit --properties-file cdm.properties \
- When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in `list` type columns. Note this is [due to a Cassandra/DSE bug](https://issues.apache.org/jira/browse/CASSANDRA-11368) and not a CDM issue. This issue can be addressed by enabling and setting a positive value for `spark.cdm.transform.custom.writetime.incrementBy` param. This param was specifically added to address this issue.
- When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table `cdm_run_info` will be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well.

# Performance FAQ
- Below recommendations may only be needed while migrating large tables where the default performance is not good enough.
# Performance recommendations
Below recommendations may only be useful when migrating large tables where the default performance is not good enough
- Performance bottleneck are usually the result of
- Low resource availability on `Origin` OR `Target` cluster
- Low resource availability on CDM VMs, [see recommendations here](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines)
- Bad schema design which could be caused by out of balance `Origin` cluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count.
- Incorrect configuration of below properties
- Incorrect configuration of below properties may negatively impact performance
- `numParts`: Default is 5K, but ideal value is usually around table-size/10MB.
- `batchSize`: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+).
- `fetchSizeInRows`: Default is 1K & this usually fine. However you can reduce this if your table has many large rows (over 100KB).
- `ratelimit`: Default is 20K. Once you set all the other properties appropriately, set this value to the highest possible value that your cluster (origin & target) is able to handle.
- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impacts performance
- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` cluster.
- Note: For additional performance tuning, refer to details mentioned in [cdm-detailed.properties file here](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm-detailed.properties)
- `fetchSizeInRows`: Default is 1K and this usually works fine. However you can reduce this as needed if your table has many large rows (over 100KB).
- `ratelimit`: Default is 20K, but this property should usually be updated (after updating other properties) to the highest possible value that your `origin` and `target` clusters can efficiently handle.
- Using schema manipulation features (like `constantColumns`, `explodeMap`, `extractJson`), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impact performance
- We typically recommend [this infrastructure](https://docs.datastax.com/en/data-migration/deployment-infrastructure.html#_machines) for CDM VMs and [this starter conf](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm.properties). You can then optimize the job further based on CDM params info provided above and the observed load and throughput on `Origin` and `Target` clusters

Note: For additional performance tuning, refer to details mentioned in the [cdm-detailed.properties file here](https://github.com/datastax/cassandra-data-migrator/blob/main/src/resources/cdm-detailed.properties)

# Building Jar for local development
1. Clone this repo
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/datastax/cdm/data/PKFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ public EnhancedPK getTargetPK(Row originRow) {
}
}

public EnhancedPK toEnhancedPK(List<Object> pkValues, List<Class> pkClasses) {
return new EnhancedPK(this, pkValues, pkClasses, null, null, null);
}

public String getWhereClause(Side side) {
StringBuilder sb;
List<String> pkNames;
Expand Down
62 changes: 10 additions & 52 deletions src/main/java/com/datastax/cdm/feature/Guardrail.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.cdm.data.Record;
import com.datastax.cdm.properties.IPropertyHelper;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.schema.CqlTable;
Expand All @@ -38,14 +37,7 @@ public class Guardrail extends AbstractFeature {
private DecimalFormat decimalFormat = new DecimalFormat("0.###");

private Double colSizeInKB;

private CqlTable originTable;
private CqlTable targetTable;

private ExplodeMap explodeMap = null;
private int explodeMapIndex = -1;
private int explodeMapKeyIndex = -1;
private int explodeMapValueIndex = -1;

@Override
public boolean loadProperties(IPropertyHelper propertyHelper) {
Expand Down Expand Up @@ -79,13 +71,7 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
logger.error("originTable is null, or is not an origin table");
return false;
}
if (null == targetTable || targetTable.isOrigin()) {
logger.error("targetTable is null, or is an origin table");
return false;
}

this.originTable = originTable;
this.targetTable = targetTable;

isValid = true;
if (!validateProperties()) {
Expand All @@ -100,45 +86,29 @@ public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable)
}

private Map<String, Integer> check(Map<String, Integer> currentChecks, int targetIndex, Object targetValue) {
int colSize = targetTable.byteCount(targetIndex, targetValue);
int colSize = originTable.byteCount(targetIndex, targetValue);
if (logTrace)
logger.trace("Column {} at targetIndex {} has size {} bytes",
targetTable.getColumnNames(false).get(targetIndex), targetIndex, colSize);
originTable.getColumnNames(false).get(targetIndex), targetIndex, colSize);
if (colSize > colSizeInKB * BASE_FACTOR) {
if (null == currentChecks)
currentChecks = new HashMap<String, Integer>();
currentChecks.put(targetTable.getColumnNames(false).get(targetIndex), colSize);
currentChecks.put(originTable.getColumnNames(false).get(targetIndex), colSize);
}
return currentChecks;
}

public String guardrailChecks(Record record) {
public String guardrailChecks(Row row) {
if (!isEnabled)
return null;
if (null == record)
return CLEAN_CHECK;
if (null == record.getOriginRow())
return CLEAN_CHECK;
Map<String, Integer> largeColumns = null;

// As the order of feature loading is not guaranteed, we wait until the first record to figure out the
// explodeMap
if (null == explodeMap)
calcExplodeMap();

Row row = record.getOriginRow();
Map<String, Integer> largeColumns = null;
for (int i = 0; i < originTable.getColumnNames(false).size(); i++) {
if (i == explodeMapIndex) {
// Exploded columns are already converted to target type
largeColumns = check(largeColumns, explodeMapKeyIndex, record.getPk().getExplodeMapKey());
largeColumns = check(largeColumns, explodeMapValueIndex, record.getPk().getExplodeMapValue());
} else {
int targetIndex = originTable.getCorrespondingIndex(i);
if (targetIndex < 0)
continue; // TTL and WRITETIME columns for example
Object targetObject = originTable.getAndConvertData(i, row);
largeColumns = check(largeColumns, targetIndex, targetObject);
}
int targetIndex = originTable.getCorrespondingIndex(i);
if (targetIndex < 0)
continue; // TTL and WRITETIME columns for example
Object targetObject = originTable.getAndConvertData(i, row);
largeColumns = check(largeColumns, targetIndex, targetObject);
}

if (null == largeColumns || largeColumns.isEmpty())
Expand All @@ -157,16 +127,4 @@ public String guardrailChecks(Record record) {
return sb.toString();
}

private void calcExplodeMap() {
this.explodeMap = (ExplodeMap) originTable.getFeature(Featureset.EXPLODE_MAP);
if (null != explodeMap && explodeMap.isEnabled()) {
explodeMapIndex = explodeMap.getOriginColumnIndex();
explodeMapKeyIndex = explodeMap.getKeyColumnIndex();
explodeMapValueIndex = explodeMap.getValueColumnIndex();
if (logDebug)
logger.debug(
"ExplodeMap is enabled. explodeMapIndex={}, explodeMapKeyIndex={}, explodeMapValueIndex={}",
explodeMapIndex, explodeMapKeyIndex, explodeMapValueIndex);
}
}
}
56 changes: 32 additions & 24 deletions src/main/java/com/datastax/cdm/job/AbstractJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package com.datastax.cdm.job;

import java.math.BigInteger;
import java.util.Collection;

import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,6 +29,8 @@
import com.datastax.cdm.feature.Guardrail;
import com.datastax.cdm.feature.TrackRun;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.cdm.schema.CqlTable;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.RateLimiter;

Expand All @@ -38,19 +40,18 @@ public abstract class AbstractJobSession<T> extends BaseJobSession {
protected EnhancedSession originSession;
protected EnhancedSession targetSession;
protected Guardrail guardrailFeature;
protected boolean guardrailEnabled;
protected JobCounter jobCounter;
protected Long printStatsAfter;
protected TrackRun trackRunFeature;
protected long runId;

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
this(originSession, targetSession, sc, false);
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) {
this(originSession, targetSession, propHelper, false);
}

protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc,
protected AbstractJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper,
boolean isJobMigrateRowsFromFile) {
super(sc);
super(propHelper);

if (originSession == null) {
return;
Expand All @@ -73,36 +74,43 @@ protected AbstractJobSession(CqlSession originSession, CqlSession targetSession,
logger.info("PARAM -- Origin Rate Limit: {}", rateLimiterOrigin.getRate());
logger.info("PARAM -- Target Rate Limit: {}", rateLimiterTarget.getRate());

CqlTable cqlTableOrigin, cqlTableTarget = null;
this.originSession = new EnhancedSession(propertyHelper, originSession, true);
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
this.originSession.getCqlTable().setOtherCqlTable(this.targetSession.getCqlTable());
this.targetSession.getCqlTable().setOtherCqlTable(this.originSession.getCqlTable());
this.originSession.getCqlTable().setFeatureMap(featureMap);
this.targetSession.getCqlTable().setFeatureMap(featureMap);
cqlTableOrigin = this.originSession.getCqlTable();
cqlTableOrigin.setFeatureMap(featureMap);

boolean allFeaturesValid = true;
for (Feature f : featureMap.values()) {
if (!f.initializeAndValidate(this.originSession.getCqlTable(), this.targetSession.getCqlTable())) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName());
if (targetSession != null) {
this.targetSession = new EnhancedSession(propertyHelper, targetSession, false);
cqlTableTarget = this.targetSession.getCqlTable();
cqlTableOrigin.setOtherCqlTable(cqlTableTarget);
cqlTableTarget.setOtherCqlTable(cqlTableOrigin);
cqlTableTarget.setFeatureMap(featureMap);
PKFactory pkFactory = new PKFactory(propertyHelper, cqlTableOrigin, cqlTableTarget);
this.originSession.setPKFactory(pkFactory);
this.targetSession.setPKFactory(pkFactory);
for (Feature f : featureMap.values()) {
if (!f.initializeAndValidate(cqlTableOrigin, cqlTableTarget)) {
allFeaturesValid = false;
logger.error("Feature {} is not valid. Please check the configuration.", f.getClass().getName());
}
}
}

if (!allFeaturesValid) {
throw new RuntimeException("One or more features are not valid. Please check the configuration.");
}

PKFactory pkFactory = new PKFactory(propertyHelper, this.originSession.getCqlTable(),
this.targetSession.getCqlTable());
this.originSession.setPKFactory(pkFactory);
this.targetSession.setPKFactory(pkFactory);
this.guardrailFeature = (Guardrail) cqlTableOrigin.getFeature(Featureset.GUARDRAIL_CHECK);
}

// Guardrail is referenced by many jobs, and is evaluated against the target
// table
this.guardrailFeature = (Guardrail) this.targetSession.getCqlTable().getFeature(Featureset.GUARDRAIL_CHECK);
this.guardrailEnabled = this.guardrailFeature.isEnabled();
public void processSlice(SplitPartitions.Partition slice, TrackRun trackRunFeature, long runId) {
this.trackRunFeature = trackRunFeature;
this.runId = runId;
this.processSlice(slice.getMin(), slice.getMax());
}

public abstract void processSlice(T slice);
protected abstract void processSlice(BigInteger min, BigInteger max);

public synchronized void initCdmRun(long runId, long prevRunId, Collection<SplitPartitions.Partition> parts,
TrackRun trackRunFeature, TrackRun.RUN_TYPE runType) {
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/com/datastax/cdm/job/BaseJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import java.util.Map;

import org.apache.logging.log4j.ThreadContext;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.cdm.feature.Feature;
import com.datastax.cdm.feature.FeatureFactory;
Expand All @@ -34,14 +31,13 @@ public abstract class BaseJobSession {

public static final String THREAD_CONTEXT_LABEL = "ThreadLabel";
protected static final String NEW_LINE = System.lineSeparator();
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
protected PropertyHelper propertyHelper = PropertyHelper.getInstance();
protected PropertyHelper propertyHelper;
protected Map<Featureset, Feature> featureMap;
protected RateLimiter rateLimiterOrigin;
protected RateLimiter rateLimiterTarget;

protected BaseJobSession(SparkConf sc) {
propertyHelper.initializeSparkConf(sc);
protected BaseJobSession(PropertyHelper propHelper) {
propertyHelper = propHelper;
this.featureMap = calcFeatureMap(propertyHelper);
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel());
}
Expand Down
24 changes: 4 additions & 20 deletions src/main/java/com/datastax/cdm/job/CopyJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.concurrent.CompletionStage;

import org.apache.logging.log4j.ThreadContext;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,8 +29,8 @@
import com.datastax.cdm.cql.statement.TargetUpsertStatement;
import com.datastax.cdm.data.PKFactory;
import com.datastax.cdm.data.Record;
import com.datastax.cdm.feature.Guardrail;
import com.datastax.cdm.feature.TrackRun;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
Expand All @@ -50,8 +49,8 @@ public class CopyJobSession extends AbstractJobSession<SplitPartitions.Partition
private TargetUpsertStatement targetUpsertStatement;
private TargetSelectByPKStatement targetSelectByPKStatement;

protected CopyJobSession(CqlSession originSession, CqlSession targetSession, SparkConf sc) {
super(originSession, targetSession, sc);
protected CopyJobSession(CqlSession originSession, CqlSession targetSession, PropertyHelper propHelper) {
super(originSession, targetSession, propHelper);
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.WRITE,
JobCounter.CounterType.SKIPPED, JobCounter.CounterType.ERROR, JobCounter.CounterType.UNFLUSHED);

Expand All @@ -65,19 +64,13 @@ protected CopyJobSession(CqlSession originSession, CqlSession targetSession, Spa
logger.info("CQL -- target upsert: {}", this.targetSession.getTargetUpsertStatement().getCQL());
}

@Override
public void processSlice(SplitPartitions.Partition slice) {
this.getDataAndInsert(slice.getMin(), slice.getMax());
}

private void getDataAndInsert(BigInteger min, BigInteger max) {
protected void processSlice(BigInteger min, BigInteger max) {
ThreadContext.put(THREAD_CONTEXT_LABEL, getThreadLabel(min, max));
logger.info("ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max);
if (null != trackRunFeature)
trackRunFeature.updateCdmRun(runId, min, TrackRun.RUN_STATUS.STARTED);

BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);
String guardrailCheck;
jobCounter.threadReset();

try {
Expand All @@ -100,15 +93,6 @@ private void getDataAndInsert(BigInteger min, BigInteger max) {
}

for (Record r : pkFactory.toValidRecordList(record)) {
if (guardrailEnabled) {
guardrailCheck = guardrailFeature.guardrailChecks(r);
if (guardrailCheck != null && guardrailCheck != Guardrail.CLEAN_CHECK) {
logger.error("Guardrails failed for PrimaryKey {}; {}", r.getPk(), guardrailCheck);
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
continue;
}
}

BoundStatement boundUpsert = bind(r);
if (null == boundUpsert) {
jobCounter.threadIncrement(JobCounter.CounterType.SKIPPED);
Expand Down
Loading

0 comments on commit 48d72a6

Please sign in to comment.