From 3d9d92e2df7f8f9299901382abb21101dd82191c Mon Sep 17 00:00:00 2001 From: Aakash Shah Date: Wed, 10 Jun 2020 00:29:54 -0700 Subject: [PATCH 1/2] CC-9148: Implement DLQ Functionality Signed-off-by: Aakash Shah --- build.gradle | 11 +++-- gradle/wrapper/gradle-wrapper.properties | 3 +- .../connect/bigquery/BigQuerySinkTask.java | 47 +++++++++++++++++-- .../exception/BigQueryConnectException.java | 23 +++++++++ .../write/row/AdaptiveBigQueryWriter.java | 37 +++------------ .../bigquery/write/row/BigQueryWriter.java | 40 ++++++++++++++++ .../bigquery/BigQuerySinkTaskTest.java | 37 +++++++++++++++ 7 files changed, 158 insertions(+), 40 deletions(-) diff --git a/build.gradle b/build.gradle index 1a824d4e9..31d79cd51 100644 --- a/build.gradle +++ b/build.gradle @@ -133,6 +133,7 @@ project(':kcbq-connector') { } repositories { + mavenLocal() mavenCentral() } @@ -200,16 +201,18 @@ project(':kcbq-connector') { } dependencies { + implementation('org.apache.kafka:connect-api:2.6.0-SNAPSHOT') { + force = true + } + compile ( project(':kcbq-api'), - + "org.apache.kafka:kafka-clients:$kafkaVersion", "com.google.cloud:google-cloud-bigquery:$googleCloudVersion", "com.google.cloud:google-cloud-storage:$googleCloudVersion", "com.google.auth:google-auth-library-oauth2-http:$googleAuthVersion", "com.google.code.gson:gson:$googleCloudGsonVersion", "io.debezium:debezium-core:$debeziumVersion", - "org.apache.kafka:connect-api:$kafkaVersion", - "org.apache.kafka:kafka-clients:$kafkaVersion", "org.slf4j:slf4j-api:$slf4jVersion", ) @@ -256,9 +259,9 @@ project('kcbq-api') { } dependencies { + implementation("org.apache.kafka:connect-api:2.6.0-SNAPSHOT") compile ( "com.google.cloud:google-cloud-bigquery:$googleCloudVersion", - "org.apache.kafka:connect-api:$kafkaVersion" ) } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b0acbdcd7..8edf97297 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Mon Jun 01 16:39:15 PDT 2020 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.5-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-5.5-all.zip diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index bf4b43d6d..01e80a64e 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -32,6 +32,8 @@ import com.wepay.kafka.connect.bigquery.convert.KafkaDataBuilder; import com.wepay.kafka.connect.bigquery.convert.RecordConverter; import com.wepay.kafka.connect.bigquery.convert.SchemaConverter; +import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; +import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException; import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException; import com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer; import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; @@ -50,6 +52,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.slf4j.Logger; @@ -75,11 +78,15 @@ public class BigQuerySinkTask extends SinkTask { private static final Logger logger = LoggerFactory.getLogger(BigQuerySinkTask.class); + // Visible for testing + ErrantRecordReporter reporter; + // Visible for testing + RecordConverter> recordConverter; + private SchemaRetriever schemaRetriever; private BigQueryWriter bigQueryWriter; private GCSToBQWriter gcsToBQWriter; private BigQuerySinkTaskConfig config; - private RecordConverter> recordConverter; private Map topicsToBaseTableIds; private boolean useMessageTimeDatePartitioning; private boolean usePartitionDecorator; @@ -161,7 +168,17 @@ private PartitionedTableId getRecordTable(SinkRecord record) { } private RowToInsert getRecordRow(SinkRecord record) { - Map convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); + Map convertedRecord; + try { + convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); + } catch(ConversionConnectException e) { + if (reporter != null) { + reporter.report(record, e); + } else { + throw e; + } + return null; + } Optional kafkaKeyFieldName = config.getKafkaKeyFieldName(); if (kafkaKeyFieldName.isPresent()) { convertedRecord.put(kafkaKeyFieldName.get(), recordConverter.convertRecord(record, KafkaSchemaRecordType.KEY)); @@ -189,6 +206,8 @@ public void put(Collection records) { // create tableWriters Map tableWriterBuilders = new HashMap<>(); + // RowToInsert to SinkRecord map + Map rowAndSinkRecord = new HashMap<>(); for (SinkRecord record : records) { if (record.value() != null) { @@ -221,13 +240,26 @@ public void put(Collection records) { } tableWriterBuilders.put(table, tableWriterBuilder); } - tableWriterBuilders.get(table).addRow(getRecordRow(record)); + RowToInsert row = getRecordRow(record); + rowAndSinkRecord.put(row, record); + if (row == null) { + return; + } + tableWriterBuilders.get(table).addRow(row); } } // add tableWriters to the executor work queue for (TableWriterBuilder builder : tableWriterBuilders.values()) { - executor.execute(builder.build()); + try { + executor.execute(builder.build()); + } catch (BigQueryConnectException e) { + if (e.isInvalidSchema()) { + for (RowToInsert row : e.getFailedRows()) { + reporter.report(rowAndSinkRecord.get(row), e); + } + } + } } // check if we should pause topics @@ -344,6 +376,13 @@ public void start(Map properties) { if (hasGCSBQTask) { startGCSToBQLoadTask(); } + + try { + reporter = context.errantRecordReporter(); // may be null if DLQ not enabled + } catch (NoClassDefFoundError e) { + // Will occur in Connect runtimes earlier than 2.6 + reporter = null; + } } private void startGCSToBQLoadTask() { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java index 40fefd7da..084c197fd 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java @@ -20,8 +20,10 @@ import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.InsertAllRequest; import org.apache.kafka.connect.errors.ConnectException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -30,6 +32,10 @@ * update failures, and table insertion failures. */ public class BigQueryConnectException extends ConnectException { + + private List failedRows = new ArrayList<>(); + private boolean invalidSchema = false; + public BigQueryConnectException(String msg) { super(msg); } @@ -46,6 +52,23 @@ public BigQueryConnectException(Map> errors) { super(formatInsertAllErrors(errors)); } + public BigQueryConnectException( + Map> errors, + List failedRows + ) { + super(formatInsertAllErrors(errors)); + this.failedRows = failedRows; + this.invalidSchema = true; + } + + public List getFailedRows() { + return failedRows; + } + + public boolean isInvalidSchema() { + return invalidSchema; + } + private static String formatInsertAllErrors(Map> errorsMap) { StringBuilder messageBuilder = new StringBuilder(); messageBuilder.append("table insertion failed for the following rows:"); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index 2b64085d5..1829c718c 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -103,8 +103,12 @@ public Map> performWriteRequest( writeResponse = bigQuery.insertAll(request); // Should only perform one schema update attempt. if (writeResponse.hasErrors() - && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdateSchemas) { - attemptSchemaUpdate(tableId, topic); + && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { + if (autoUpdateSchemas) { + attemptSchemaUpdate(tableId, topic); + } else { + return writeResponse.getInsertErrors(); + } } } catch (BigQueryException exception) { // Should only perform one table creation attempt. @@ -168,33 +172,4 @@ private void attemptTableCreate(TableId tableId, String topic) { "Failed to create table " + tableId, exception); } } - - /* - * Currently, the only way to determine the cause of an insert all failure is by examining the map - * object returned by the insertErrors() method of an insert all response. The only way to - * determine the cause of each individual error is by manually examining each error's reason() and - * message() strings, and guessing what they mean. Ultimately, the goal of this method is to - * return whether or not an insertion failed due solely to a mismatch between the schemas of the - * inserted rows and the schema of the actual BigQuery table. - * This is why we can't have nice things, Google. - */ - private boolean onlyContainsInvalidSchemaErrors(Map> errors) { - boolean invalidSchemaError = false; - for (List errorList : errors.values()) { - for (BigQueryError error : errorList) { - if (error.getReason().equals("invalid") && error.getMessage().contains("no such field")) { - invalidSchemaError = true; - } else if (!error.getReason().equals("stopped")) { - /* if some rows are in the old schema format, and others aren't, the old schema - * formatted rows will show up as error: stopped. We still want to continue if this is - * the case, because these errors don't represent a unique error if there are also - * invalidSchemaErrors. - */ - return false; - } - } - } - // if we only saw "stopped" errors, we want to return false. (otherwise, return true) - return invalidSchemaError; - } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java index d766b1b2a..4d0183ed5 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java @@ -124,9 +124,20 @@ public void writeRows(PartitionedTableId table, rows.size() - failedRowsMap.size(), failedRowsMap.size()); // update insert rows and retry in case of partial failure rows = getFailedRows(rows, failedRowsMap.keySet(), topic, table); + // if the rows are failing due to invalid schema, throw an exception + if (onlyContainsInvalidSchemaErrors(failedRowsMap)) { + throw new BigQueryConnectException(failedRowsMap, rows); + } mostRecentException = new BigQueryConnectException(failedRowsMap); retryCount++; } else { + // if the rows are failing due to invalid schema, throw an exception with the failed rows + if (onlyContainsInvalidSchemaErrors(failedRowsMap)) { + throw new BigQueryConnectException( + failedRowsMap, + getFailedRows(rows, failedRowsMap.keySet(), topic, table) + ); + } // throw an exception in case of complete failure throw new BigQueryConnectException(failedRowsMap); } @@ -203,4 +214,33 @@ private void waitRandomTime() throws InterruptedException { // wait Thread.sleep(retryWaitMs + random.nextInt(WAIT_MAX_JITTER)); } + + /* + * Currently, the only way to determine the cause of an insert all failure is by examining the map + * object returned by the insertErrors() method of an insert all response. The only way to + * determine the cause of each individual error is by manually examining each error's reason() and + * message() strings, and guessing what they mean. Ultimately, the goal of this method is to + * return whether or not an insertion failed due solely to a mismatch between the schemas of the + * inserted rows and the schema of the actual BigQuery table. + * This is why we can't have nice things, Google. + */ + public boolean onlyContainsInvalidSchemaErrors(Map> errors) { + boolean invalidSchemaError = false; + for (List errorList : errors.values()) { + for (BigQueryError error : errorList) { + if (error.getReason().equals("invalid") && error.getMessage().contains("no such field")) { + invalidSchemaError = true; + } else if (!error.getReason().equals("stopped")) { + /* if some rows are in the old schema format, and others aren't, the old schema + * formatted rows will show up as error: stopped. We still want to continue if this is + * the case, because these errors don't represent a unique error if there are also + * invalidSchemaErrors. + */ + return false; + } + } + } + // if we only saw "stopped" errors, we want to return false. (otherwise, return true) + return invalidSchemaError; + } } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index af6547568..133c2ab9b 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -39,7 +39,9 @@ import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; +import com.wepay.kafka.connect.bigquery.convert.BigQueryRecordConverter; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; +import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException; import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException; import org.apache.kafka.common.config.ConfigException; @@ -48,6 +50,7 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.ErrantRecordReporter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; @@ -69,6 +72,36 @@ public static void initializePropertiesFactory() { propertiesFactory = new SinkTaskPropertiesFactory(); } + @Test + public void testDataNotStructReport() { + final String topic = "test-topic"; + + Map properties = propertiesFactory.getProperties(); + properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic); + properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch"); + + BigQuery bigQuery = mock(BigQuery.class); + Storage storage = mock(Storage.class); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + testTask.recordConverter = mock(BigQueryRecordConverter.class); + testTask.reporter = mock(ErrantRecordReporter.class); + + when(testTask.recordConverter.convertRecord(any(), any())).thenThrow(ConversionConnectException.class); + + testTask.put(Collections.singletonList(spoofSinkRecord(topic))); + + verify(testTask.reporter, times(1)).report(any(), any()); + } + @Test public void testSimplePut() { final String topic = "test-topic"; @@ -133,11 +166,13 @@ public void testEmptyPut() { Map properties = propertiesFactory.getProperties(); BigQuery bigQuery = mock(BigQuery.class); Storage storage = mock(Storage.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); SchemaManager schemaManager = mock(SchemaManager.class); BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.emptyList()); @@ -155,11 +190,13 @@ public void testEmptyRecordPut() { Map properties = propertiesFactory.getProperties(); BigQuery bigQuery = mock(BigQuery.class); Storage storage = mock(Storage.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); SchemaManager schemaManager = mock(SchemaManager.class); BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); testTask.start(properties); SinkRecord emptyRecord = spoofSinkRecord(topic, simpleSchema, null); From c20670a3dddf5d463b89fdaecacb0116d084b692 Mon Sep 17 00:00:00 2001 From: Aakash Shah Date: Tue, 16 Jun 2020 11:03:12 -0700 Subject: [PATCH 2/2] fixed schema mismatch dlq --- .../connect/bigquery/BigQuerySinkTask.java | 24 ++++----- .../exception/BigQueryConnectException.java | 26 ++++++---- .../write/batch/GCSBatchTableWriter.java | 17 ++++--- .../write/batch/KCBQThreadPoolExecutor.java | 36 +++++++++++-- .../bigquery/write/batch/TableWriter.java | 36 +++++++++---- .../write/batch/TableWriterBuilder.java | 6 ++- .../bigquery/write/row/BigQueryWriter.java | 50 +++++++++---------- .../bigquery/BigQuerySinkTaskTest.java | 39 +++++++++++++++ 8 files changed, 158 insertions(+), 76 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 01e80a64e..37269dcda 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -82,6 +82,8 @@ public class BigQuerySinkTask extends SinkTask { ErrantRecordReporter reporter; // Visible for testing RecordConverter> recordConverter; + // Visible for testing + KCBQThreadPoolExecutor executor; private SchemaRetriever schemaRetriever; private BigQueryWriter bigQueryWriter; @@ -93,7 +95,6 @@ public class BigQuerySinkTask extends SinkTask { private TopicPartitionManager topicPartitionManager; - private KCBQThreadPoolExecutor executor; private static final int EXECUTOR_SHUTDOWN_TIMEOUT_SEC = 30; private final BigQuery testBigQuery; @@ -135,6 +136,12 @@ public void flush(Map offsets) { executor.awaitCurrentTasks(); } catch (InterruptedException err) { throw new ConnectException("Interrupted while waiting for write tasks to complete.", err); + } catch (BigQueryConnectException e) { + if (e.isInvalidSchema()) { + e.getFailedRowsMap().forEach((row, record) -> reporter.report(record, e)); + } else { + throw e; + } } topicPartitionManager.resumeAll(); @@ -206,8 +213,6 @@ public void put(Collection records) { // create tableWriters Map tableWriterBuilders = new HashMap<>(); - // RowToInsert to SinkRecord map - Map rowAndSinkRecord = new HashMap<>(); for (SinkRecord record : records) { if (record.value() != null) { @@ -241,25 +246,16 @@ public void put(Collection records) { tableWriterBuilders.put(table, tableWriterBuilder); } RowToInsert row = getRecordRow(record); - rowAndSinkRecord.put(row, record); if (row == null) { return; } - tableWriterBuilders.get(table).addRow(row); + tableWriterBuilders.get(table).addToRowMap(row, record); } } // add tableWriters to the executor work queue for (TableWriterBuilder builder : tableWriterBuilders.values()) { - try { - executor.execute(builder.build()); - } catch (BigQueryConnectException e) { - if (e.isInvalidSchema()) { - for (RowToInsert row : e.getFailedRows()) { - reporter.report(rowAndSinkRecord.get(row), e); - } - } - } + executor.execute(builder.build()); } // check if we should pause topics diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java index 084c197fd..07db0176b 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/exception/BigQueryConnectException.java @@ -22,10 +22,13 @@ import com.google.cloud.bigquery.InsertAllRequest; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Class for exceptions that occur while interacting with BigQuery, such as login failures, schema @@ -33,7 +36,7 @@ */ public class BigQueryConnectException extends ConnectException { - private List failedRows = new ArrayList<>(); + private Map failedRowsMap = new HashMap<>(); private boolean invalidSchema = false; public BigQueryConnectException(String msg) { @@ -48,27 +51,28 @@ public BigQueryConnectException(Throwable thr) { super(thr); } - public BigQueryConnectException(Map> errors) { + public BigQueryConnectException(Map> errors, + List failedRows) { super(formatInsertAllErrors(errors)); + failedRowsMap = failedRows.stream().collect(Collectors.toMap(row -> row, null)); } - public BigQueryConnectException( - Map> errors, - List failedRows - ) { - super(formatInsertAllErrors(errors)); - this.failedRows = failedRows; - this.invalidSchema = true; + public Map getFailedRowsMap() { + return failedRowsMap; } - public List getFailedRows() { - return failedRows; + public void setFailedRowsMap(Map failedRowsMap) { + this.failedRowsMap = failedRowsMap; } public boolean isInvalidSchema() { return invalidSchema; } + public void setInvalidSchema(boolean invalidSchema) { + this.invalidSchema = invalidSchema; + } + private static String formatInsertAllErrors(Map> errorsMap) { StringBuilder messageBuilder = new StringBuilder(); messageBuilder.append("table insertion failed for the following rows:"); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java index 48a9512a6..546fae96f 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java @@ -26,10 +26,12 @@ import com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -94,7 +96,7 @@ public static class Builder implements TableWriterBuilder { private final TableId tableId; - private List rows; + private Map rowMap; private final RecordConverter> recordConverter; private final GCSToBQWriter writer; @@ -121,7 +123,7 @@ public Builder(GCSToBQWriter writer, this.tableId = tableId; - this.rows = new ArrayList<>(); + this.rowMap = new HashMap<>(); this.recordConverter = recordConverter; this.writer = writer; } @@ -132,15 +134,16 @@ public Builder setBlobName(String blobName) { } /** - * Adds a record to the builder. - * @param rowToInsert the row to add + * Add a row and its corresponding record to the builder. + * @param row the row to add. + * @param record the corresponding sink record. */ - public void addRow(RowToInsert rowToInsert) { - rows.add(rowToInsert); + public void addToRowMap(RowToInsert row, SinkRecord record) { + rowMap.put(row, record); } public GCSBatchTableWriter build() { - return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName, topic); + return new GCSBatchTableWriter(new ArrayList<>(rowMap.keySet()), writer, tableId, bucketName, blobName, topic); } } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java index 637ac2ea4..0f0f954ab 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/KCBQThreadPoolExecutor.java @@ -18,14 +18,18 @@ */ +import com.google.cloud.bigquery.InsertAllRequest; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -91,12 +95,36 @@ public void awaitCurrentTasks() throws InterruptedException, BigQueryConnectExce execute(new CountDownRunnable(countDownLatch)); } countDownLatch.await(); - if (encounteredErrors.size() > 0) { - String errorString = createErrorString(encounteredErrors); + String errorString = createErrorString(encounteredErrors); + + + Map failedRowMap = new HashMap<>(); + for (Throwable error : encounteredErrors) { + if (error instanceof BigQueryConnectException) { + BigQueryConnectException bqce = (BigQueryConnectException) error; + if (bqce.isInvalidSchema()) { + failedRowMap.putAll(bqce.getFailedRowsMap()); + } else { + encounteredErrors.clear(); + throw new BigQueryConnectException("Some write threads encountered unrecoverable errors: " + + errorString + "; See logs for more detail"); + } + } else { + encounteredErrors.clear(); + throw new BigQueryConnectException("Some write threads encountered unrecoverable errors: " + + errorString + "; See logs for more detail"); + } + } + + if (!failedRowMap.isEmpty()) { encounteredErrors.clear(); - throw new BigQueryConnectException("Some write threads encountered unrecoverable errors: " - + errorString + "; See logs for more detail"); + BigQueryConnectException e = new BigQueryConnectException("Invalid schema for one or more " + + "rows"); + e.setInvalidSchema(true); + e.setFailedRowsMap(failedRowMap); + throw e; } + } private static String createErrorString(Collection errors) { diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java index 0f418bed0..0bde388d9 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriter.java @@ -22,6 +22,7 @@ import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; import com.wepay.kafka.connect.bigquery.convert.RecordConverter; +import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; import com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter; @@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,20 +51,23 @@ public class TableWriter implements Runnable { private final List rows; private final String topic; + private Map rowMap; + /** * @param writer the {@link BigQueryWriter} to use. * @param table the BigQuery table to write to. - * @param rows the rows to write. + * @param rowMap the map containing the rows to write and their corresponding sink records. * @param topic the kafka source topic of this data. */ public TableWriter(BigQueryWriter writer, PartitionedTableId table, - List rows, + Map rowMap, String topic) { this.writer = writer; this.table = table; - this.rows = rows; + this.rowMap = rowMap; this.topic = topic; + this.rows = new ArrayList<>(rowMap.keySet()); } @Override @@ -86,6 +91,15 @@ public void run() { failureCount++; currentBatchSize = getNewBatchSize(currentBatchSize); } + } catch (BigQueryConnectException e) { + if (e.isInvalidSchema()) { + Map failedRowsMap = e.getFailedRowsMap(); + for (RowToInsert row : failedRowsMap.keySet()) { + failedRowsMap.put(row, rowMap.get(row)); + } + e.setFailedRowsMap(failedRowsMap); + } + throw e; } } } catch (InterruptedException err) { @@ -150,7 +164,7 @@ public static class Builder implements TableWriterBuilder { private final PartitionedTableId table; private final String topic; - private List rows; + private Map rowMap; private RecordConverter> recordConverter; @@ -166,17 +180,17 @@ public Builder(BigQueryWriter writer, PartitionedTableId table, String topic, this.table = table; this.topic = topic; - this.rows = new ArrayList<>(); - + this.rowMap = new HashMap<>(); this.recordConverter = recordConverter; } /** - * Add a record to the builder. - * @param rowToInsert the row to add + * Add a row and its corresponding record to the builder. + * @param row the row to add + * @param record the corresponding sink record */ - public void addRow(RowToInsert rowToInsert) { - rows.add(rowToInsert); + public void addToRowMap(RowToInsert row, SinkRecord record) { + rowMap.put(row, record); } /** @@ -184,7 +198,7 @@ public void addRow(RowToInsert rowToInsert) { * @return a TableWriter containing the given writer, table, topic, and all added rows. */ public TableWriter build() { - return new TableWriter(writer, table, rows, topic); + return new TableWriter(writer, table, rowMap, topic); } } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java index 12bdd4c16..1a01b3db5 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/TableWriterBuilder.java @@ -19,6 +19,7 @@ import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; +import org.apache.kafka.connect.sink.SinkRecord; /** * Interface for building a {@link TableWriter} or TableWriterGCS. @@ -26,10 +27,11 @@ public interface TableWriterBuilder { /** - * Add a record to the builder. + * Add a row and its corresponding record to the builder. * @param rowToInsert the row to add. + * @param sinkRecord the corresponding sink record. */ - void addRow(RowToInsert rowToInsert); + void addToRowMap(RowToInsert rowToInsert, SinkRecord sinkRecord); /** * Create a {@link TableWriter} from this builder. diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java index 4d0183ed5..3c88f8787 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriter.java @@ -124,22 +124,18 @@ public void writeRows(PartitionedTableId table, rows.size() - failedRowsMap.size(), failedRowsMap.size()); // update insert rows and retry in case of partial failure rows = getFailedRows(rows, failedRowsMap.keySet(), topic, table); - // if the rows are failing due to invalid schema, throw an exception - if (onlyContainsInvalidSchemaErrors(failedRowsMap)) { - throw new BigQueryConnectException(failedRowsMap, rows); - } - mostRecentException = new BigQueryConnectException(failedRowsMap); + mostRecentException = new BigQueryConnectException(failedRowsMap, rows); retryCount++; } else { - // if the rows are failing due to invalid schema, throw an exception with the failed rows + // create an exception in case of complete failure + BigQueryConnectException exception = new BigQueryConnectException(failedRowsMap, + getFailedRows(rows, failedRowsMap.keySet(), topic, table)); + + // if the rows are failing due to invalid schema, set the invalid schema field to true if (onlyContainsInvalidSchemaErrors(failedRowsMap)) { - throw new BigQueryConnectException( - failedRowsMap, - getFailedRows(rows, failedRowsMap.keySet(), topic, table) - ); + exception.setInvalidSchema(true); } - // throw an exception in case of complete failure - throw new BigQueryConnectException(failedRowsMap); + throw exception; } } catch (BigQueryException err) { mostRecentException = err; @@ -175,27 +171,16 @@ public void writeRows(PartitionedTableId table, mostRecentException); } - /** - * Decide whether the failure is a partial failure or complete failure - * @param rows The rows to write. - * @param failedRowsMap A map from failed row index to the BigQueryError. - * @return isPartialFailure. - */ - private boolean isPartialFailure(List rows, - Map> failedRowsMap) { - return failedRowsMap.size() < rows.size(); - } - /** * Filter out succeed rows, and return a list of failed rows. * @param rows The rows to write. * @param failRowsSet A set of failed row index. * @return A list of failed rows. */ - private List getFailedRows(List rows, - Set failRowsSet, - String topic, - PartitionedTableId table) { + public List getFailedRows(List rows, + Set failRowsSet, + String topic, + PartitionedTableId table) { List failRows = new ArrayList<>(); for (int index = 0; index < rows.size(); index++) { if (failRowsSet.contains((long)index)) { @@ -206,6 +191,17 @@ private List getFailedRows(List rows, + Map> failedRowsMap) { + return failedRowsMap.size() < rows.size(); + } + /** * Wait at least {@link #retryWaitMs}, with up to an additional 1 second of random jitter. * @throws InterruptedException if interrupted. diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 133c2ab9b..d75e5dbba 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -43,6 +44,8 @@ import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException; import com.wepay.kafka.connect.bigquery.exception.SinkConfigConnectException; +import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor; +import com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.record.TimestampType; @@ -61,6 +64,7 @@ import org.mockito.Captor; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.RejectedExecutionException; @@ -72,6 +76,41 @@ public static void initializePropertiesFactory() { propertiesFactory = new SinkTaskPropertiesFactory(); } + @Test + public void testSchemaMismatch() throws InterruptedException { + final String topic = "test-topic"; + + Map properties = propertiesFactory.getProperties(); + properties.put(BigQuerySinkConfig.TOPICS_CONFIG, topic); + properties.put(BigQuerySinkConfig.DATASETS_CONFIG, ".*=scratch"); + + BigQuery bigQuery = mock(BigQuery.class); + Storage storage = mock(Storage.class); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + + testTask.reporter = mock(ErrantRecordReporter.class); + testTask.executor = mock(KCBQThreadPoolExecutor.class); + + BigQueryConnectException exception = new BigQueryConnectException("exception!"); + Map failedRowsMap = new HashMap<>(); + failedRowsMap.put(mock(InsertAllRequest.RowToInsert.class), mock(SinkRecord.class)); + exception.setInvalidSchema(true); + exception.setFailedRowsMap(failedRowsMap); + + doThrow(exception).when(testTask.executor).awaitCurrentTasks(); + testTask.flush(Collections.emptyMap()); + + verify(testTask.reporter, times(1)).report(any(), any()); + } + @Test public void testDataNotStructReport() { final String topic = "test-topic";