Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-9148: Implement DLQ Functionality [DRAFT] #274

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ project(':kcbq-connector') {
}

repositories {
mavenLocal()
mavenCentral()
}

Expand Down Expand Up @@ -200,16 +201,18 @@ project(':kcbq-connector') {
}

dependencies {
implementation('org.apache.kafka:connect-api:2.6.0-SNAPSHOT') {
force = true
}

compile (
project(':kcbq-api'),

"org.apache.kafka:kafka-clients:$kafkaVersion",
"com.google.cloud:google-cloud-bigquery:$googleCloudVersion",
"com.google.cloud:google-cloud-storage:$googleCloudVersion",
"com.google.auth:google-auth-library-oauth2-http:$googleAuthVersion",
"com.google.code.gson:gson:$googleCloudGsonVersion",
"io.debezium:debezium-core:$debeziumVersion",
"org.apache.kafka:connect-api:$kafkaVersion",
"org.apache.kafka:kafka-clients:$kafkaVersion",
"org.slf4j:slf4j-api:$slf4jVersion",
)

Expand Down Expand Up @@ -256,9 +259,9 @@ project('kcbq-api') {
}

dependencies {
implementation("org.apache.kafka:connect-api:2.6.0-SNAPSHOT")
compile (
"com.google.cloud:google-cloud-bigquery:$googleCloudVersion",
"org.apache.kafka:connect-api:$kafkaVersion"
)
}

Expand Down
3 changes: 2 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#Mon Jun 01 16:39:15 PDT 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.5-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-5.5-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder;
import com.wepay.kafka.connect.bigquery.convert.RecordConverter;
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException;
import com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer;
import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId;
Expand All @@ -50,6 +52,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand All @@ -75,18 +78,23 @@
public class BigQuerySinkTask extends SinkTask {
private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkTask.class);

// Visible for testing
ErrantRecordReporter reporter;
// Visible for testing
RecordConverter<Map<String, Object>> recordConverter;
// Visible for testing
KCBQThreadPoolExecutor executor;

private SchemaRetriever schemaRetriever;
private BigQueryWriter bigQueryWriter;
private GCSToBQWriter gcsToBQWriter;
private BigQuerySinkTaskConfig config;
private RecordConverter<Map<String, Object>> recordConverter;
private Map<String, TableId> topicsToBaseTableIds;
private boolean useMessageTimeDatePartitioning;
private boolean usePartitionDecorator;

private TopicPartitionManager topicPartitionManager;

private KCBQThreadPoolExecutor executor;
private static final int EXECUTOR_SHUTDOWN_TIMEOUT_SEC = 30;

private final BigQuery testBigQuery;
Expand Down Expand Up @@ -128,6 +136,12 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
executor.awaitCurrentTasks();
} catch (InterruptedException err) {
throw new ConnectException("Interrupted while waiting for write tasks to complete.", err);
} catch (BigQueryConnectException e) {
if (e.isInvalidSchema()) {
e.getFailedRowsMap().forEach((row, record) -> reporter.report(record, e));
} else {
throw e;
}
}

topicPartitionManager.resumeAll();
Expand Down Expand Up @@ -161,7 +175,17 @@ private PartitionedTableId getRecordTable(SinkRecord record) {
}

private RowToInsert getRecordRow(SinkRecord record) {
Map<String, Object> convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
Map<String, Object> convertedRecord;
try {
convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE);
} catch(ConversionConnectException e) {
if (reporter != null) {
reporter.report(record, e);
} else {
throw e;
}
return null;
}
Optional<String> kafkaKeyFieldName = config.getKafkaKeyFieldName();
if (kafkaKeyFieldName.isPresent()) {
convertedRecord.put(kafkaKeyFieldName.get(), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY));
Expand Down Expand Up @@ -221,7 +245,11 @@ public void put(Collection<SinkRecord> records) {
}
tableWriterBuilders.put(table, tableWriterBuilder);
}
tableWriterBuilders.get(table).addRow(getRecordRow(record));
RowToInsert row = getRecordRow(record);
if (row == null) {
return;
}
tableWriterBuilders.get(table).addToRowMap(row, record);
}
}

Expand Down Expand Up @@ -344,6 +372,13 @@ public void start(Map<String, String> properties) {
if (hasGCSBQTask) {
startGCSToBQLoadTask();
}

try {
reporter = context.errantRecordReporter(); // may be null if DLQ not enabled
} catch (NoClassDefFoundError e) {
// Will occur in Connect runtimes earlier than 2.6
reporter = null;
}
}

private void startGCSToBQLoadTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@

import com.google.cloud.bigquery.BigQueryError;

import com.google.cloud.bigquery.InsertAllRequest;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Class for exceptions that occur while interacting with BigQuery, such as login failures, schema
* update failures, and table insertion failures.
*/
public class BigQueryConnectException extends ConnectException {

private Map<InsertAllRequest.RowToInsert, SinkRecord> failedRowsMap = new HashMap<>();
private boolean invalidSchema = false;

public BigQueryConnectException(String msg) {
super(msg);
}
Expand All @@ -42,8 +51,26 @@ public BigQueryConnectException(Throwable thr) {
super(thr);
}

public BigQueryConnectException(Map<Long, List<BigQueryError>> errors) {
public BigQueryConnectException(Map<Long, List<BigQueryError>> errors,
List<InsertAllRequest.RowToInsert> failedRows) {
super(formatInsertAllErrors(errors));
failedRowsMap = failedRows.stream().collect(Collectors.toMap(row -> row, null));
}

public Map<InsertAllRequest.RowToInsert, SinkRecord> getFailedRowsMap() {
return failedRowsMap;
}

public void setFailedRowsMap(Map<InsertAllRequest.RowToInsert, SinkRecord> failedRowsMap) {
this.failedRowsMap = failedRowsMap;
}

public boolean isInvalidSchema() {
return invalidSchema;
}

public void setInvalidSchema(boolean invalidSchema) {
this.invalidSchema = invalidSchema;
}

private static String formatInsertAllErrors(Map<Long, List<BigQueryError>> errorsMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter;
import org.apache.kafka.connect.errors.ConnectException;

import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -94,7 +96,7 @@ public static class Builder implements TableWriterBuilder {

private final TableId tableId;

private List<RowToInsert> rows;
private Map<RowToInsert, SinkRecord> rowMap;
private final RecordConverter<Map<String, Object>> recordConverter;
private final GCSToBQWriter writer;

Expand All @@ -121,7 +123,7 @@ public Builder(GCSToBQWriter writer,

this.tableId = tableId;

this.rows = new ArrayList<>();
this.rowMap = new HashMap<>();
this.recordConverter = recordConverter;
this.writer = writer;
}
Expand All @@ -132,15 +134,16 @@ public Builder setBlobName(String blobName) {
}

/**
* Adds a record to the builder.
* @param rowToInsert the row to add
* Add a row and its corresponding record to the builder.
* @param row the row to add.
* @param record the corresponding sink record.
*/
public void addRow(RowToInsert rowToInsert) {
rows.add(rowToInsert);
public void addToRowMap(RowToInsert row, SinkRecord record) {
rowMap.put(row, record);
}

public GCSBatchTableWriter build() {
return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName, topic);
return new GCSBatchTableWriter(new ArrayList<>(rowMap.keySet()), writer, tableId, bucketName, blobName, topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/


import com.google.cloud.bigquery.InsertAllRequest;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -91,12 +95,36 @@ public void awaitCurrentTasks() throws InterruptedException, BigQueryConnectExce
execute(new CountDownRunnable(countDownLatch));
}
countDownLatch.await();
if (encounteredErrors.size() > 0) {
String errorString = createErrorString(encounteredErrors);
String errorString = createErrorString(encounteredErrors);


Map<InsertAllRequest.RowToInsert, SinkRecord> failedRowMap = new HashMap<>();
for (Throwable error : encounteredErrors) {
if (error instanceof BigQueryConnectException) {
BigQueryConnectException bqce = (BigQueryConnectException) error;
if (bqce.isInvalidSchema()) {
failedRowMap.putAll(bqce.getFailedRowsMap());
} else {
encounteredErrors.clear();
throw new BigQueryConnectException("Some write threads encountered unrecoverable errors: "
+ errorString + "; See logs for more detail");
}
} else {
encounteredErrors.clear();
throw new BigQueryConnectException("Some write threads encountered unrecoverable errors: "
+ errorString + "; See logs for more detail");
}
}

if (!failedRowMap.isEmpty()) {
encounteredErrors.clear();
throw new BigQueryConnectException("Some write threads encountered unrecoverable errors: "
+ errorString + "; See logs for more detail");
BigQueryConnectException e = new BigQueryConnectException("Invalid schema for one or more "
+ "rows");
e.setInvalidSchema(true);
e.setFailedRowsMap(failedRowMap);
throw e;
}

}

private static String createErrorString(Collection<Throwable> errors) {
Expand Down
Loading