Skip to content

Commit

Permalink
Merge branch 'master' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeroendevr authored Jul 12, 2024
2 parents 0ef7234 + 25d0030 commit d676f4b
Show file tree
Hide file tree
Showing 38 changed files with 1,121 additions and 142 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.2 | 2024-07-12 | [\#40567](https://github.com/airbytehq/airbyte/pull/40567) | Fix BaseSqlGenerator test case (generation_id support); update minimum platform version for refreshes support. |
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm |
| 0.40.11 | 2024-07-08 | [\#41041](https://github.com/airbytehq/airbyte/pull/41041) | Destinations: Fix truncate refreshes incorrectly discarding data if successful attempt had 0 records |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.1
version=0.41.2
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ constructor(
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null || stream.minimumGenerationId == null) {
throw ConfigErrorException(
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0"
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.7"
)
}
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,9 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
)
)
)
.withSyncId(42)
.withGenerationId(43)
.withMinimumGenerationId(0)
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
)
Expand Down
8 changes: 7 additions & 1 deletion airbyte-ci/connectors/pipelines/pipelines/hacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import TYPE_CHECKING, Callable, List

import asyncclick as click
from connector_ops.utils import ConnectorLanguage # type: ignore
from pipelines import consts
from pipelines.helpers.github import update_commit_status_check

Expand Down Expand Up @@ -81,7 +82,12 @@ def do_regression_test_status_check_maybe(ctx: click.Context, status_check_name:
Only required for certified connectors.
"""
if any([connector.support_level == "certified" for connector in ctx.obj["selected_connectors_with_modified_files"]]):
if any(
[
(connector.language == ConnectorLanguage.PYTHON and connector.support_level == "certified")
for connector in ctx.obj["selected_connectors_with_modified_files"]
]
):
update_commit_status_check(
ctx.obj["git_revision"],
"failure",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.40.11'
cdkVersionRequired = '0.41.2'
features = [
'db-destinations',
'datastore-bigquery',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.8.2
dockerImageTag: 2.8.3
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.38.3'
cdkVersionRequired = '0.41.2'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 3.2.0
dockerImageTag: 3.3.0
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down Expand Up @@ -37,6 +37,7 @@ data:
releaseStage: generally_available
supportLevel: certified
supportsDbt: true
supportsRefreshes: true
tags:
- language:java
connectorTestSuitesOptions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ class RedshiftDestination : BaseConnector(), Destination {
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty(),
),
initialTempRawTableStatus =
InitialRawTableStatus(
rawTableExists = false,
hasUnprocessedRecords = true,
maxProcessedTimestamp = Optional.empty(),
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState =
Expand Down Expand Up @@ -284,7 +290,8 @@ class RedshiftDestination : BaseConnector(), Destination {
)
}

private fun getDatabase(dataSource: DataSource): JdbcDatabase {
@VisibleForTesting
fun getDatabase(dataSource: DataSource): JdbcDatabase {
return DefaultJdbcDatabase(dataSource)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import io.airbyte.integrations.destination.redshift.manifest.Entry
import io.airbyte.integrations.destination.redshift.manifest.Manifest
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.time.ZoneOffset
Expand All @@ -41,17 +40,70 @@ class RedshiftStagingStorageOperation(
private val writeDatetime: ZonedDateTime = Instant.now().atZone(ZoneOffset.UTC)
private val objectMapper = ObjectMapper()

override fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) {
override fun prepareStage(streamId: StreamId, suffix: String, replace: Boolean) {
// create raw table
destinationHandler.execute(Sql.of(createRawTableQuery(streamId)))
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
destinationHandler.execute(Sql.of(truncateRawTableQuery(streamId)))
destinationHandler.execute(Sql.of(createRawTableQuery(streamId, suffix)))
if (replace) {
destinationHandler.execute(Sql.of(truncateRawTableQuery(streamId, suffix)))
}
// create bucket for staging files
s3StorageOperations.createBucketIfNotExists()
}

override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
override fun overwriteStage(streamId: StreamId, suffix: String) {
destinationHandler.execute(
Sql.transactionally(
"""DROP TABLE IF EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" """,
"""ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName}$suffix" RENAME TO "${streamId.rawName}" """
)
)
}

override fun transferFromTempStage(streamId: StreamId, suffix: String) {
destinationHandler.execute(
// ALTER TABLE ... APPEND is an efficient way to move records from one table to another.
// Instead of naively duplicating the data, it actually moves the underlying data
// blocks.
// (https://docs.aws.amazon.com/redshift/latest/dg/r_ALTER_TABLE_APPEND.html)
// But it can't run inside transactions, so run these statements separately.
Sql.separately(
// Note for future developers:
// ALTER TABLE ... APPEND has some interesting restrictions where both tables need
// the exact same structure (clustering, columns, etc.), so if we want to change
// those in the future, this might be tricky/annoying?
// If we have issues at that point, we can always switch to a simple
// `INSERT INTO ... SELECT * FROM ...` query.
"""
ALTER TABLE "${streamId.rawNamespace}"."${streamId.rawName}"
APPEND FROM "${streamId.rawNamespace}"."${streamId.rawName}$suffix"
""".trimIndent(),
"""DROP TABLE IF EXISTS "${streamId.rawNamespace}"."${streamId.rawName}$suffix" """,
),
// Skip the case-sensitivity thing - ALTER TABLE ... APPEND can't be run in a
// transaction, so we can't run the SET statement.
// We're only working with schema/table names, so it's fine to just quote the
// identifiers instead of relying on this option.
forceCaseSensitiveIdentifier = false
)
}

override fun getStageGeneration(streamId: StreamId, suffix: String): Long? {
val generation =
destinationHandler.query(
"""SELECT ${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID} FROM "${streamId.rawNamespace}"."${streamId.rawName}$suffix" LIMIT 1"""
)
if (generation.isEmpty()) {
return null
}

return generation.first()[JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID].asLong()
}

override fun writeToStage(
streamConfig: StreamConfig,
suffix: String,
data: SerializableBuffer
) {
val streamId = streamConfig.id
val objectPath: String = getStagingPath(streamId)
log.info {
Expand All @@ -61,13 +113,19 @@ class RedshiftStagingStorageOperation(
s3StorageOperations.uploadRecordsToBucket(data, streamId.rawNamespace, objectPath)

log.info {
"Starting copy to target table from stage: ${streamId.rawName} in destination from stage: $objectPath/$filename."
"Starting copy to target table from stage: ${streamId.rawName}$suffix in destination from stage: $objectPath/$filename."
}
val manifestContents = createManifest(listOf(filename), objectPath)
val manifestPath = putManifest(manifestContents, objectPath)
executeCopy(manifestPath, destinationHandler, streamId.rawNamespace, streamId.rawName)
executeCopy(
manifestPath,
destinationHandler,
streamId.rawNamespace,
streamId.rawName,
suffix
)
log.info {
"Copy to target table ${streamId.rawNamespace}.${streamId.rawName} in destination complete."
"Copy to target table ${streamId.rawNamespace}.${streamId.rawName}$suffix in destination complete."
}
}

Expand Down Expand Up @@ -172,6 +230,7 @@ class RedshiftStagingStorageOperation(
destinationHandler: RedshiftDestinationHandler,
schemaName: String,
tableName: String,
suffix: String,
) {
val accessKeyId =
s3Config.s3CredentialConfig!!.s3CredentialsProvider.credentials.awsAccessKeyId
Expand All @@ -180,7 +239,7 @@ class RedshiftStagingStorageOperation(

val copyQuery =
"""
COPY $schemaName.$tableName FROM '${getFullS3Path(s3Config.bucketName!!, manifestPath)}'
COPY $schemaName.$tableName$suffix FROM '${getFullS3Path(s3Config.bucketName!!, manifestPath)}'
CREDENTIALS 'aws_access_key_id=$accessKeyId;aws_secret_access_key=$secretAccessKey'
CSV GZIP
REGION '${s3Config.bucketRegion}' TIMEFORMAT 'auto'
Expand All @@ -195,9 +254,9 @@ class RedshiftStagingStorageOperation(
companion object {
private val nameTransformer = RedshiftSQLNameTransformer()

private fun createRawTableQuery(streamId: StreamId): String {
private fun createRawTableQuery(streamId: StreamId, suffix: String): String {
return """
CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}" (
CREATE TABLE IF NOT EXISTS "${streamId.rawNamespace}"."${streamId.rawName}$suffix" (
${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID} VARCHAR(36),
${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT} TIMESTAMPTZ DEFAULT GETDATE(),
${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} TIMESTAMPTZ,
Expand All @@ -208,12 +267,8 @@ class RedshiftStagingStorageOperation(
""".trimIndent()
}

private fun truncateRawTableQuery(streamId: StreamId): String {
return String.format(
"""TRUNCATE TABLE "%s"."%s";""",
streamId.rawNamespace,
streamId.rawName
)
private fun truncateRawTableQuery(streamId: StreamId, suffix: String): String {
return """TRUNCATE TABLE "${streamId.rawNamespace}"."${streamId.rawName}$suffix" """
}

private fun getFullS3Path(s3BucketName: String, s3StagingFile: String): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,17 @@ class RedshiftDestinationHandler(
execute(sql, logStatements = true)
}

fun execute(sql: Sql, logStatements: Boolean) {
/**
* @param forceCaseSensitiveIdentifier Whether to enable `forceCaseSensitiveIdentifier` on all
* transactions. This option is most useful for accessing fields within a `SUPER` value; for
* accessing schemas/tables/columns, quoting the identifier is sufficient to force
* case-sensitivity, so this option is not necessary.
*/
fun execute(
sql: Sql,
logStatements: Boolean = true,
forceCaseSensitiveIdentifier: Boolean = true
) {
val transactions = sql.transactions
val queryId = UUID.randomUUID()
for (transaction in transactions) {
Expand All @@ -103,12 +113,20 @@ class RedshiftDestinationHandler(
// characters, even after
// specifying quotes.
// see https://github.com/airbytehq/airbyte/issues/33900
modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n")
if (forceCaseSensitiveIdentifier) {
modifiedStatements.add("SET enable_case_sensitive_identifier to TRUE;\n")
}
modifiedStatements.addAll(transaction)
jdbcDatabase.executeWithinTransaction(
modifiedStatements,
logStatements = logStatements
)
if (modifiedStatements.size != 1) {
jdbcDatabase.executeWithinTransaction(
modifiedStatements,
logStatements = logStatements
)
} else {
// Redshift doesn't allow some statements to run in a transaction at all,
// so handle the single-statement case specially.
jdbcDatabase.execute(modifiedStatements.first())
}
} catch (e: SQLException) {
log.error(e) { "Sql $queryId-$transactionId failed" }
// This is a big hammer for something that should be much more targetted, only when
Expand Down Expand Up @@ -155,6 +173,8 @@ class RedshiftDestinationHandler(
)
}

fun query(sql: String): List<JsonNode> = jdbcDatabase.queryJsons(sql)

private fun toJdbcTypeName(airbyteProtocolType: AirbyteProtocolType): String {
return when (airbyteProtocolType) {
AirbyteProtocolType.STRING -> "varchar"
Expand Down
Loading

0 comments on commit d676f4b

Please sign in to comment.