diff --git a/build.sbt b/build.sbt index 98615ef25..6b64c8824 100644 --- a/build.sbt +++ b/build.sbt @@ -40,7 +40,7 @@ lazy val commonSettings = Seq( ) ) -lazy val root = (project in file(".")).aggregate(spark, server) +lazy val root = (project in file(".")).aggregate(client, spark, server) lazy val client = (project in file("client")) settings( name := "delta-sharing-client", diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index e9e8221c1..bd079043d 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -100,7 +100,9 @@ class DeltaSharingRestClient( numRetries: Int = 10, maxRetryDuration: Long = Long.MaxValue, sslTrustAll: Boolean = false, - forStreaming: Boolean = false) extends DeltaSharingClient { + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ) extends DeltaSharingClient with Logging { @volatile private var created = false @@ -193,9 +195,10 @@ class DeltaSharingRestClient( val target = getTargetUrl(s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/" + s"$encodedTableName/version$encodedParam") - val (version, _) = getResponse(new HttpGet(target), true, true) + val (version, _, _) = getResponse(new HttpGet(target), true, true) version.getOrElse { - throw new IllegalStateException("Cannot find Delta-Table-Version in the header") + throw new IllegalStateException(s"Cannot find " + + s"${DeltaSharingRestClient.RESPONSE_TABLE_VERSION_HEADER_KEY} in the header") } } @@ -205,7 +208,18 @@ class DeltaSharingRestClient( val encodedTableName = URLEncoder.encode(table.name, "UTF-8") val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/metadata") - val (version, lines) = getNDJson(target) + val (version, respondedFormat, lines) = getNDJson(target) + if (responseFormat != respondedFormat) { + // This could only happen when the asked format is delta and the server doesn't support + // the requested format. + logWarning(s"RespondedFormat($respondedFormat) is different from requested responseFormat(" + + s"$responseFormat) for getMetadata.${table.share}.${table.schema}.${table.name}.") + } + // To ensure that it works with delta sharing server that doesn't support the requested format. + if (respondedFormat == DeltaSharingRestClient.RESPONSE_FORMAT_DELTA) { + return DeltaTableMetadata(version, lines = lines) + } + val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol checkProtocol(protocol) val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData @@ -235,7 +249,7 @@ class DeltaSharingRestClient( val encodedTableName = URLEncoder.encode(table.name, "UTF-8") val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/query") - val (version, lines) = getNDJson( + val (version, respondedFormat, lines) = getNDJson( target, QueryTableRequest( predicates, @@ -247,6 +261,15 @@ class DeltaSharingRestClient( jsonPredicateHints ) ) + if (responseFormat != respondedFormat) { + logWarning(s"RespondedFormat($respondedFormat) is different from requested responseFormat(" + + s"$responseFormat) for getFiles(versionAsOf-$versionAsOf, timestampAsOf-$timestampAsOf " + + s"for table ${table.share}.${table.schema}.${table.name}.") + } + // To ensure that it works with delta sharing server that doesn't support the requested format. + if (respondedFormat == DeltaSharingRestClient.RESPONSE_FORMAT_DELTA) { + return DeltaTableFiles(version, lines = lines) + } require(versionAsOf.isEmpty || versionAsOf.get == version) val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol checkProtocol(protocol) @@ -265,8 +288,17 @@ class DeltaSharingRestClient( val encodedTableName = URLEncoder.encode(table.name, "UTF-8") val target = getTargetUrl( s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/query") - val (version, lines) = getNDJson( + val (version, respondedFormat, lines) = getNDJson( target, QueryTableRequest(Nil, None, None, None, Some(startingVersion), endingVersion, None)) + if (responseFormat != respondedFormat) { + logWarning(s"RespondedFormat($respondedFormat) is different from requested responseFormat(" + + s"$responseFormat) for getFiles(startingVersion-$startingVersion, endingVersion-" + + s"$endingVersion) for table ${table.share}.${table.schema}.${table.name}.") + } + // To ensure that it works with delta sharing server that doesn't support the requested format. + if (respondedFormat == DeltaSharingRestClient.RESPONSE_FORMAT_DELTA) { + return DeltaTableFiles(version, lines = lines) + } val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol checkProtocol(protocol) val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData @@ -300,7 +332,16 @@ class DeltaSharingRestClient( val target = getTargetUrl( s"/shares/$encodedShare/schemas/$encodedSchema/tables/$encodedTable/changes?$encodedParams") - val (version, lines) = getNDJson(target, requireVersion = false) + val (version, respondedFormat, lines) = getNDJson(target, requireVersion = false) + if (responseFormat != respondedFormat) { + logWarning(s"RespondedFormat($respondedFormat) is different from requested responseFormat(" + + s"$responseFormat) for getCDFFiles(cdfOptions-$cdfOptions) for table " + + s"${table.share}.${table.schema}.${table.name}.") + } + // To ensure that it works with delta sharing server that doesn't support the requested format. + if (respondedFormat == DeltaSharingRestClient.RESPONSE_FORMAT_DELTA) { + return DeltaTableFiles(version, lines = lines) + } val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol checkProtocol(protocol) val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData @@ -340,30 +381,60 @@ class DeltaSharingRestClient( }.mkString("&") } - private def getNDJson(target: String, requireVersion: Boolean = true): (Long, Seq[String]) = { - val (version, lines) = getResponse(new HttpGet(target)) - version.getOrElse { - if (requireVersion) { - throw new IllegalStateException("Cannot find Delta-Table-Version in the header") - } else { - 0L - } - } -> lines + private def getNDJson( + target: String, requireVersion: Boolean = true): (Long, String, Seq[String]) = { + val (version, capabilities, lines) = getResponse(new HttpGet(target)) + ( + version.getOrElse { + if (requireVersion) { + throw new IllegalStateException(s"Cannot find " + + s"${DeltaSharingRestClient.RESPONSE_TABLE_VERSION_HEADER_KEY} in the header") + } else { + 0L + } + }, + getRespondedFormat(capabilities), + lines + ) } - private def getNDJson[T: Manifest](target: String, data: T): (Long, Seq[String]) = { + private def getNDJson[T: Manifest](target: String, data: T): (Long, String, Seq[String]) = { val httpPost = new HttpPost(target) val json = JsonUtils.toJson(data) httpPost.setHeader("Content-type", "application/json") httpPost.setEntity(new StringEntity(json, UTF_8)) - val (version, lines) = getResponse(httpPost) - version.getOrElse { - throw new IllegalStateException("Cannot find Delta-Table-Version in the header") - } -> lines + val (version, capabilities, lines) = getResponse(httpPost) + ( + version.getOrElse { + throw new IllegalStateException("Cannot find Delta-Table-Version in the header") + }, + getRespondedFormat(capabilities), + lines + ) + } + + private def getRespondedFormat(capabilities: Option[String]): String = { + val capabilitiesMap = getDeltaSharingCapabilitiesMap(capabilities) + capabilitiesMap.get(DeltaSharingRestClient.RESPONSE_FORMAT).getOrElse( + DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ) + } + private def getDeltaSharingCapabilitiesMap(capabilities: Option[String]): Map[String, String] = { + if (capabilities.isEmpty) { + return Map.empty[String, String] + } + capabilities.get.toLowerCase().split(",").map { capability => + val splits = capability.split("=") + if (splits.size == 2) { + (splits(0), splits(1)) + } else { + ("", "") + } + }.toMap } private def getJson[R: Manifest](target: String): R = { - val (_, response) = getResponse(new HttpGet(target), false, true) + val (_, _, response) = getResponse(new HttpGet(target), false, true) if (response.size != 1) { throw new IllegalStateException( "Unexpected response for target: " + target + ", response=" + response @@ -394,8 +465,7 @@ class DeltaSharingRestClient( } } - // TODO: [linzhou] mark this as private once tests are migrated. - def prepareHeaders(httpRequest: HttpRequestBase): HttpRequestBase = { + private[client] def prepareHeaders(httpRequest: HttpRequestBase): HttpRequestBase = { val customeHeaders = profileProvider.getCustomHeaders if (customeHeaders.contains(HttpHeaders.AUTHORIZATION) || customeHeaders.contains(HttpHeaders.USER_AGENT)) { @@ -406,7 +476,8 @@ class DeltaSharingRestClient( } val headers = Map( HttpHeaders.AUTHORIZATION -> s"Bearer ${profileProvider.getProfile.bearerToken}", - HttpHeaders.USER_AGENT -> getUserAgent() + HttpHeaders.USER_AGENT -> getUserAgent(), + DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER -> getDeltaSharingCapabilities() ) ++ customeHeaders headers.foreach(header => httpRequest.setHeader(header._1, header._2)) @@ -426,7 +497,7 @@ class DeltaSharingRestClient( httpRequest: HttpRequestBase, allowNoContent: Boolean = false, fetchAsOneString: Boolean = false - ): (Option[Long], Seq[String]) = { + ): (Option[Long], Option[String], Seq[String]) = { RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) { val profile = profileProvider.getProfile val response = client.execute( @@ -476,7 +547,15 @@ class DeltaSharingRestClient( s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo", statusCode) } - Option(response.getFirstHeader("Delta-Table-Version")).map(_.getValue.toLong) -> lines + ( + Option( + response.getFirstHeader(DeltaSharingRestClient.RESPONSE_TABLE_VERSION_HEADER_KEY) + ).map(_.getValue.toLong), + Option( + response.getFirstHeader(DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER) + ).map(_.getValue), + lines + ) } finally { response.close() } @@ -494,6 +573,13 @@ class DeltaSharingRestClient( s"$sparkAgent/$VERSION" + DeltaSharingRestClient.USER_AGENT } + // The value for delta-sharing-capabilities header, semicolon separated capabilities. + // Each capability is in the format of "key=value1,value2", values are separated by comma. + // Example: "capability1=value1;capability2=value3,value4,value5" + private def getDeltaSharingCapabilities(): String = { + s"${DeltaSharingRestClient.RESPONSE_FORMAT}=$responseFormat" + } + def close(): Unit = { if (created) { try client.close() finally created = false @@ -509,6 +595,11 @@ object DeltaSharingRestClient extends Logging { val CURRENT = 1 val SPARK_STRUCTURED_STREAMING = "Delta-Sharing-SparkStructuredStreaming" + val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities" + val RESPONSE_TABLE_VERSION_HEADER_KEY = "Delta-Table-Version" + val RESPONSE_FORMAT = "responseformat" + val RESPONSE_FORMAT_DELTA = "delta" + val RESPONSE_FORMAT_PARQUET = "parquet" lazy val USER_AGENT = { try { @@ -546,7 +637,11 @@ object DeltaSharingRestClient extends Logging { if (value == null) "" else value.replace(' ', '_') } - def apply(profileFile: String, forStreaming: Boolean = false): DeltaSharingClient = { + def apply( + profileFile: String, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ): DeltaSharingClient = { val sqlConf = SparkSession.active.sessionState.conf val profileProviderClass = ConfUtils.profileProviderClass(sqlConf) @@ -565,14 +660,21 @@ object DeltaSharingRestClient extends Logging { val clientClass = ConfUtils.clientClass(sqlConf) Class.forName(clientClass) - .getConstructor(classOf[DeltaSharingProfileProvider], - classOf[Int], classOf[Int], classOf[Long], classOf[Boolean], classOf[Boolean]) - .newInstance(profileProvider, + .getConstructor( + classOf[DeltaSharingProfileProvider], + classOf[Int], + classOf[Int], + classOf[Long], + classOf[Boolean], + classOf[Boolean], + classOf[String] + ).newInstance(profileProvider, java.lang.Integer.valueOf(timeoutInSeconds), java.lang.Integer.valueOf(numRetries), java.lang.Long.valueOf(maxRetryDurationMillis), java.lang.Boolean.valueOf(sslTrustAll), - java.lang.Boolean.valueOf(forStreaming)) - .asInstanceOf[DeltaSharingClient] + java.lang.Boolean.valueOf(forStreaming), + responseFormat + ).asInstanceOf[DeltaSharingClient] } } diff --git a/client/src/main/scala/io/delta/sharing/client/model.scala b/client/src/main/scala/io/delta/sharing/client/model.scala index 2f4a1f81a..f06cf0ebe 100644 --- a/client/src/main/scala/io/delta/sharing/client/model.scala +++ b/client/src/main/scala/io/delta/sharing/client/model.scala @@ -38,18 +38,20 @@ private[sharing] object CDFColumnInfo { private[sharing] case class DeltaTableMetadata( version: Long, - protocol: Protocol, - metadata: Metadata) + protocol: Protocol = null, + metadata: Metadata = null, + lines: Seq[String] = Nil) private[sharing] case class DeltaTableFiles( version: Long, - protocol: Protocol, - metadata: Metadata, + protocol: Protocol = null, + metadata: Metadata = null, files: Seq[AddFile] = Nil, addFiles: Seq[AddFileForCDF] = Nil, cdfFiles: Seq[AddCDCFile] = Nil, removeFiles: Seq[RemoveFile] = Nil, - additionalMetadatas: Seq[Metadata] = Nil) + additionalMetadatas: Seq[Metadata] = Nil, + lines: Seq[String] = Nil) private[sharing] case class Share(name: String) diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala new file mode 100644 index 000000000..a9e75c4b2 --- /dev/null +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientDeltaSuite.scala @@ -0,0 +1,334 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.client + +import java.sql.Timestamp + +import org.apache.http.HttpHeaders +import org.apache.http.client.methods.HttpGet + +import io.delta.sharing.client.model._ +import io.delta.sharing.client.util.UnexpectedHttpStatus + +// scalastyle:off maxLineLength +class DeltaSharingRestClientDeltaSuite extends DeltaSharingIntegrationTest { + + private def getDeltaSharingClientWithDeltaResponse: DeltaSharingRestClient = { + new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + + integrationTest("Check headers") { + val httpRequest = new HttpGet("random_url") + + val client = new DeltaSharingRestClient(testProfileProvider) + var h = client.prepareHeaders(httpRequest).getFirstHeader(DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER) + // scalastyle:off caselocale + assert(h.getValue.toLowerCase().contains(s"responseformat=${DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET}")) + + val deltaClient = new DeltaSharingRestClient( + testProfileProvider, + responseFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + h = deltaClient.prepareHeaders(httpRequest).getFirstHeader(DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER) + // scalastyle:off caselocale + assert(h.getValue.toLowerCase().contains(s"responseformat=${DeltaSharingRestClient.RESPONSE_FORMAT_DELTA}")) + } + + integrationTest("getMetadata") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableMatadata = + client.getMetadata(Table(name = "table2", schema = "default", share = "share2")) + assert(null == tableMatadata.protocol) + assert(null == tableMatadata.metadata) + assert(2 == tableMatadata.lines.size) + assert(tableMatadata.lines(0) == """{"protocol":{"minReaderVersion":1}}""") + assert(tableMatadata.lines(1) == """{"metaData":{"id":"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1619652806049}}""") + } finally { + client.close() + } + } + + integrationTest("getMetadata with configuration") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableMatadata = + client.getMetadata(Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8")) + assert(tableMatadata.lines(1) == """{"metaData":{"id":"16736144-3306-4577-807a-d3f899b77670","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1651272615011}}""") + } finally { + client.close() + } + } + + private def checkDeltaTableFilesBasics( + tableFiles: DeltaTableFiles, + expectedVersion: Int, + expectedNumLines: Int): Unit = { + assert(expectedVersion == tableFiles.version) + assert(null == tableFiles.protocol) + assert(null == tableFiles.metadata) + assert(Nil == tableFiles.files) + assert(expectedNumLines == tableFiles.lines.size) + if (expectedNumLines > 0) { + assert(tableFiles.lines(0) == """{"protocol":{"minReaderVersion":1}}""") + } + } + + integrationTest("getFiles") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableFiles = + client.getFiles(Table(name = "table2", schema = "default", share = "share2"), Nil, None, None, None, None) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 2, expectedNumLines = 4) + assert(tableFiles.lines(1) == """{"metaData":{"id":"f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"eventTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1619652806049}}""") + assert(tableFiles.lines(2).startsWith("""{"add":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-8b0086f2-7b27-4935-ac5a-8ed6215a6640.c000.snappy.parquet?X-Amz-Algorithm=""")) + assert(tableFiles.lines(2).contains("""","id":"9f1a49539c5cffe1ea7f9e055d5c003c","partitionValues":{"date":"2021-04-28"},"size":573,"modificationTime":1619652839000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:57.955Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:57.955Z\"},\"nullCount\":{\"eventTime\":0}}","expirationTimestamp":""")) + assert(tableFiles.lines(3).startsWith("""{"add":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/table2/date%3D2021-04-28/part-00000-591723a8-6a27-4240-a90e-57426f4736d2.c000.snappy.parquet?X-Amz-Algorithm=""")) + assert(tableFiles.lines(3).contains("""","id":"cd2209b32f5ed5305922dd50f5908a75","partitionValues":{"date":"2021-04-28"},"size":573,"modificationTime":1619652832000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"maxValues\":{\"eventTime\":\"2021-04-28T23:33:48.719Z\"},\"nullCount\":{\"eventTime\":0}}","expirationTimestamp":""")) + } finally { + client.close() + } + } + + integrationTest("getFiles with version") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableFiles = client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), + Nil, + None, + Some(1L), + None, + None) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 1, expectedNumLines = 5) + assert(tableFiles.lines(1) == """{"metaData":{"id":"16736144-3306-4577-807a-d3f899b77670","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1651272615011}}""") + val commonPrefix = """{"add":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/cdf_table_cdf_enabled/part-""" + assert(tableFiles.lines(2).startsWith(commonPrefix)) + assert(tableFiles.lines(3).startsWith(commonPrefix)) + assert(tableFiles.lines(4).startsWith(commonPrefix)) + } finally { + client.close() + } + } + + private def checkCdfTableCdfEnabledTableV0Metadata(line: String, version: String = "1"): Unit = { + assert(line == """{"metaData":{"id":"16736144-3306-4577-807a-d3f899b77670","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"version":""" + version + ""","createdTime":1651272615011}}""") + } + + private def checkCdfTableCdfEnabledTableV1(lines: Seq[String]): Unit = { + // VERSION 1: INSERT 3 files + val addFilePrefix = """{"add":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/cdf_table_cdf_enabled/part-0000""" + // VERSION 1: INSERT 3 files + for (lineNum <- 0 to 2) { + assert(lines(lineNum).startsWith(addFilePrefix)) + assert(lines(lineNum).contains(""""version":1,"timestamp":1651272635000,"expirationTimestamp":""")) + } + } + + private def checkCdfTableCdfEnabledTableV2ToV3(lines: Seq[String]): Unit = { + // VERSION 2: REMOVE 1 file + val removeFilePrefix = """{"remove":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/cdf_table_cdf_enabled/part-0000""" + assert(lines(0).startsWith(removeFilePrefix)) + assert(lines(0).contains(""""version":2,"timestamp":1651272655000,"expirationTimestamp":""")) + // VERSION 3: UPDATE 1 row: 1 remove + 1 add + assert(lines(1).startsWith(removeFilePrefix)) + assert(lines(1).contains(""""version":3,"timestamp":1651272660000,"expirationTimestamp":""")) + val addFilePrefix = """{"add":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/cdf_table_cdf_enabled/part-0000""" + assert(lines(2).startsWith(addFilePrefix)) + assert(lines(2).contains(""""version":3,"timestamp":1651272660000,"expirationTimestamp":""")) + } + + private def checkCdfTableCdfEnabledTableV5Metadata(line: String): Unit = { + // VERSION 5: set delta.enableChangeDataFeed -> true + assert(line == """{"metaData":{"id":"16736144-3306-4577-807a-d3f899b77670","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"version":5,"createdTime":1651272615011}}""") + } + private def checkCdfTableCdfEnabledTableV4ToV5(lines: Seq[String]): Unit = { + // VERSION 4: set delta.enableChangeDataFeed -> false + assert(lines(0) == """{"metaData":{"id":"16736144-3306-4577-807a-d3f899b77670","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"false"},"version":4,"createdTime":1651272615011}}""") + checkCdfTableCdfEnabledTableV5Metadata(lines(1)) + } + + integrationTest("getFiles with startingVersion - success") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableFiles = client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), 1L, None + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 1, expectedNumLines = 10) + checkCdfTableCdfEnabledTableV0Metadata(tableFiles.lines(1)) + checkCdfTableCdfEnabledTableV1(tableFiles.lines.slice(2, 5)) + checkCdfTableCdfEnabledTableV2ToV3(tableFiles.lines.slice(5, 8)) + checkCdfTableCdfEnabledTableV4ToV5(tableFiles.lines.slice(8, 10)) + } finally { + client.close() + } + } + + integrationTest("getFiles with startingVersion/endingVersion - success 1") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableFiles = client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), 1L, Some(1L) + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 1, expectedNumLines = 5) + checkCdfTableCdfEnabledTableV0Metadata(tableFiles.lines(1)) + checkCdfTableCdfEnabledTableV1(tableFiles.lines.slice(2, 5)) + } finally { + client.close() + } + } + + integrationTest("getFiles with startingVersion/endingVersion - success 2") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableFiles = client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), 2L, Some(3L) + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 2, expectedNumLines = 5) + checkCdfTableCdfEnabledTableV0Metadata(tableFiles.lines(1), version = "2") + checkCdfTableCdfEnabledTableV2ToV3(tableFiles.lines.slice(2, 5)) + } finally { + client.close() + } + } + + integrationTest("getFiles with startingVersion/endingVersion - success 3") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val tableFiles = client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), 4L, Some(5L) + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 4, expectedNumLines = 3) + checkCdfTableCdfEnabledTableV4ToV5(tableFiles.lines.slice(1, 3)) + } finally { + client.close() + } + } + + private def checkCdfTableCdfEnabledTableV2V3Cdc(lines: Seq[String]): Unit = { + // VERSION 2: REMOVE 1 row + val cdcFilePrefix = """{"cdc":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/cdf_table_cdf_enabled/_change_data/cdc-00000""" + assert(lines(0).startsWith(cdcFilePrefix)) + assert(lines(0).contains(""""version":2,"timestamp":1651272655000,"expirationTimestamp":""")) + // VERSION 3: UPDATE 1 row: 1 remove + 1 add + assert(lines(1).startsWith(cdcFilePrefix)) + assert(lines(1).contains(""""version":3,"timestamp":1651272660000,"expirationTimestamp":""")) + } + + integrationTest("getCDFFiles") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val cdfOptions = Map("startingVersion" -> "0", "endingVersion" -> "3") + val tableFiles = client.getCDFFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), + cdfOptions, + false + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 0, expectedNumLines = 7) + checkCdfTableCdfEnabledTableV5Metadata(tableFiles.lines(1)) + checkCdfTableCdfEnabledTableV2V3Cdc(tableFiles.lines.slice(2, 4)) + checkCdfTableCdfEnabledTableV1(tableFiles.lines.slice(4, 7)) + } finally { + client.close() + } + } + + private def checkNotnullToNullTableV0Metadata(line: String): Unit = { + assert(line == """{"metaData":{"id":"1e2201ff-12ad-4c3b-a539-4d34e9e36680","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"version":0,"createdTime":1668327039667}}""") + } + + private def checkNotnullToNullTableV2V3Metadata(line: String, version: String): Unit = { + assert(line == """{"metaData":{"id":"1e2201ff-12ad-4c3b-a539-4d34e9e36680","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"version":""" + version + ""","createdTime":1668327039667}}""") + } + private def checkNotnullToNullTableAddFiles(lines: Seq[String]): Unit = { + // VERSION 1: INSERT 1 file + // VERSION 3: INSERT 1 file + val addFilePrefix = """{"add":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/streaming_notnull_to_null/part-00000-""" + assert(lines(0).startsWith(addFilePrefix)) + assert(lines(1).startsWith(addFilePrefix)) + assert(lines(0).contains(""""version":1,"timestamp":1668327046000,"expirationTimestamp":""")) + assert(lines(1).contains(""""version":3,"timestamp":1668327050000,"expirationTimestamp":""")) + } + + integrationTest("getCDFFiles: more metadatas returned for includeHistoricalMetadata=true") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val cdfOptions = Map("startingVersion" -> "0") + val tableFiles = client.getCDFFiles( + Table(name = "streaming_notnull_to_null", schema = "default", share = "share8"), + cdfOptions, + includeHistoricalMetadata = true + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 0, expectedNumLines = 5) + checkNotnullToNullTableV0Metadata(tableFiles.lines(1)) + checkNotnullToNullTableV2V3Metadata(tableFiles.lines(2), version = "2") + checkNotnullToNullTableAddFiles(tableFiles.lines.slice(3, 5)) + } finally { + client.close() + } + } + + integrationTest("getCDFFiles: no additional metadatas returned for includeHistoricalMetadata=false") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val cdfOptions = Map("startingVersion" -> "0") + val tableFiles = client.getCDFFiles( + Table(name = "streaming_notnull_to_null", schema = "default", share = "share8"), + cdfOptions, + includeHistoricalMetadata = false + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 0, expectedNumLines = 4) + checkNotnullToNullTableV2V3Metadata(tableFiles.lines(1), version = "3") + checkNotnullToNullTableAddFiles(tableFiles.lines.slice(2, 4)) + } finally { + client.close() + } + } + + integrationTest("getCDFFiles: cdf_table_with_vacuum") { + val client = getDeltaSharingClientWithDeltaResponse + try { + val cdfOptions = Map("startingVersion" -> "0") + val tableFiles = client.getCDFFiles( + Table(name = "cdf_table_with_vacuum", schema = "default", share = "share8"), + cdfOptions, + false + ) + checkDeltaTableFilesBasics(tableFiles, expectedVersion = 0, expectedNumLines = 8) + assert(tableFiles.lines(1) == """{"metaData":{"id":"b960061d-dc64-4b29-8fb0-d0ddc1b29cc2","format":{"provider":"parquet"},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed":"true"},"version":4,"createdTime":1655408042120}}""") + val cdcFilePrefix = """{"cdc":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/cdf_table_with_vacuum/_change_data/cdc-00000""" + assert(tableFiles.lines(2).startsWith(cdcFilePrefix)) + assert(tableFiles.lines(2).contains(""""version":2,"timestamp":1655410824000,"expirationTimestamp":""")) + assert(tableFiles.lines(3).startsWith(cdcFilePrefix)) + assert(tableFiles.lines(3).contains(""""version":4,"timestamp":1655410847000,"expirationTimestamp":""")) + val addFilePrefix = """{"add":{"path":"https://delta-exchange-test.s3.us-west-2.amazonaws.com/delta-exchange-test/cdf_table_with_vacuum/part-0000""" + for (lineNum <- 4 to 7) { + assert(tableFiles.lines(lineNum).startsWith(addFilePrefix)) + } + assert(tableFiles.lines(4).contains(""""version":1,"timestamp":1655408048000,"expirationTimestamp":""")) + assert(tableFiles.lines(5).contains(""""version":1,"timestamp":1655408048000,"expirationTimestamp":""")) + assert(tableFiles.lines(6).contains(""""version":3,"timestamp":1655410829000,"expirationTimestamp":""")) + assert(tableFiles.lines(7).contains(""""version":3,"timestamp":1655410829000,"expirationTimestamp":""")) + } finally { + client.close() + } + } +} diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index cc642c2d4..ba917aed4 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -99,16 +99,25 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } integrationTest("getTableVersion - exceptions") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - val errorMessage = intercept[UnexpectedHttpStatus] { - client.getTableVersion(Table(name = "table1", schema = "default", share = "share1"), - startingTimestamp = Some("2020-01-01T00:00:00Z")) - }.getMessage - assert(errorMessage.contains("400 Bad Request")) - assert(errorMessage.contains("Reading table by version or timestamp is not supported")) - } finally { - client.close() + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat + ) + try { + val errorMessage = intercept[UnexpectedHttpStatus] { + client.getTableVersion(Table(name = "table1", schema = "default", share = "share1"), + startingTimestamp = Some("2020-01-01T00:00:00Z")) + }.getMessage + assert(errorMessage.contains("400 Bad Request")) + assert(errorMessage.contains("Reading table by version or timestamp is not supported")) + } finally { + client.close() + } } } @@ -238,62 +247,89 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } integrationTest("getFiles with version exception") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - val errorMessage = intercept[UnexpectedHttpStatus] { - client.getFiles( - Table(name = "table1", schema = "default", share = "share1"), - Nil, - None, - Some(1L), - None, - None + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat ) - }.getMessage - assert(errorMessage.contains("Reading table by version or timestamp is not supported because change data feed is not enabled on table: share1.default.table1")) - } finally { - client.close() + try { + val errorMessage = intercept[UnexpectedHttpStatus] { + client.getFiles( + Table(name = "table1", schema = "default", share = "share1"), + Nil, + None, + Some(1L), + None, + None + ) + }.getMessage + assert(errorMessage.contains("Reading table by version or timestamp is not supported because change data feed is not enabled on table: share1.default.table1")) + } finally { + client.close() + } } } integrationTest("getFiles with timestamp parsed, but too early") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - // This is to test that timestamp is correctly passed to the server and parsed. - // The error message is expected as we are using a timestamp much smaller than the earliest - // version of the table. - // Because with undecided timezone, the timestamp string can be mapped to different versions - val errorMessage = intercept[UnexpectedHttpStatus] { - client.getFiles( - Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), - Nil, - None, - None, - Some("2000-01-01T00:00:00Z"), - None) - }.getMessage - assert(errorMessage.contains("The provided timestamp")) - } finally { - client.close() + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat + ) + try { + // This is to test that timestamp is correctly passed to the server and parsed. + // The error message is expected as we are using a timestamp much smaller than the earliest + // version of the table. + // Because with undecided timezone, the timestamp string can be mapped to different versions + val errorMessage = intercept[UnexpectedHttpStatus] { + client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), + Nil, + None, + None, + Some("2000-01-01T00:00:00Z"), + None) + }.getMessage + assert(errorMessage.contains("The provided timestamp")) + } finally { + client.close() + } } } integrationTest("getFiles with timestamp not supported on table1") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - val errorMessage = intercept[UnexpectedHttpStatus] { - client.getFiles( - Table(name = "table1", schema = "default", share = "share1"), - Nil, - None, - None, - Some("abc"), - None + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat ) - }.getMessage - assert(errorMessage.contains("Reading table by version or timestamp is not supported because change data feed is not enabled on table: share1.default.table1")) - } finally { - client.close() + try { + val errorMessage = intercept[UnexpectedHttpStatus] { + client.getFiles( + Table(name = "table1", schema = "default", share = "share1"), + Nil, + None, + None, + Some("abc"), + None + ) + }.getMessage + assert(errorMessage.contains("Reading table by version or timestamp is not supported because change data feed is not enabled on table: share1.default.table1")) + } finally { + client.close() + } } } @@ -514,37 +550,46 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } integrationTest("getFiles with startingVersion/endingVersion - exception") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - var errorMessage = intercept[UnexpectedHttpStatus] { - client.getFiles( - Table(name = "table1", schema = "default", share = "share1"), - 1, - None - ) - }.getMessage - assert(errorMessage.contains("Reading table by version or timestamp is not supported because change data feed is not enabled on table: share1.default.table1")) - - errorMessage = intercept[UnexpectedHttpStatus] { - client.getFiles( - Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), - -1, - None + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat ) - }.getMessage - assert(errorMessage.contains("startingVersion cannot be negative")) - - errorMessage = intercept[UnexpectedHttpStatus] { - client.getFiles( - Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), - 2, - Some(1) - ) - }.getMessage - assert(errorMessage.contains("startingVersion(2) must be smaller than or equal to " + - "endingVersion(1)")) - } finally { - client.close() + try { + var errorMessage = intercept[UnexpectedHttpStatus] { + client.getFiles( + Table(name = "table1", schema = "default", share = "share1"), + 1, + None + ) + }.getMessage + assert(errorMessage.contains("Reading table by version or timestamp is not supported because change data feed is not enabled on table: share1.default.table1")) + + errorMessage = intercept[UnexpectedHttpStatus] { + client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), + -1, + None + ) + }.getMessage + assert(errorMessage.contains("startingVersion cannot be negative")) + + errorMessage = intercept[UnexpectedHttpStatus] { + client.getFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), + 2, + Some(1) + ) + }.getMessage + assert(errorMessage.contains("startingVersion(2) must be smaller than or equal to " + + "endingVersion(1)")) + } finally { + client.close() + } } } @@ -717,77 +762,113 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } integrationTest("getCDFFiles: cdf_table_missing_log") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - val errorMessage = intercept[UnexpectedHttpStatus] { - val cdfOptions = Map("startingVersion" -> "1") - client.getCDFFiles( - Table(name = "cdf_table_missing_log", schema = "default", share = "share8"), - cdfOptions, - false + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat ) - }.getMessage - assert(errorMessage.contains("""400 Bad Request {"errorCode":"RESOURCE_DOES_NOT_EXIST"""")) - assert(errorMessage.contains("table files missing")) - } finally { - client.close() + try { + val errorMessage = intercept[UnexpectedHttpStatus] { + val cdfOptions = Map("startingVersion" -> "1") + client.getCDFFiles( + Table(name = "cdf_table_missing_log", schema = "default", share = "share8"), + cdfOptions, + false + ) + }.getMessage + assert(errorMessage.contains("""400 Bad Request {"errorCode":"RESOURCE_DOES_NOT_EXIST"""")) + assert(errorMessage.contains("table files missing")) + } finally { + client.close() + } } } integrationTest("getCDFFiles with startingTimestamp") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - // This is to test that timestamp is correctly passed to the server and parsed. - // The error message is expected as we are using a timestamp much smaller than the earliest - // version of the table. - val cdfOptions = Map("startingTimestamp" -> "2000-01-01T00:00:00Z") - val errorMessage = intercept[UnexpectedHttpStatus] { - val tableFiles = client.getCDFFiles( - Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), - cdfOptions, - false + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat ) - }.getMessage - assert(errorMessage.contains("Please use a timestamp greater")) - } finally { - client.close() + try { + // This is to test that timestamp is correctly passed to the server and parsed. + // The error message is expected as we are using a timestamp much smaller than the earliest + // version of the table. + val cdfOptions = Map("startingTimestamp" -> "2000-01-01T00:00:00Z") + val errorMessage = intercept[UnexpectedHttpStatus] { + val tableFiles = client.getCDFFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), + cdfOptions, + false + ) + }.getMessage + assert(errorMessage.contains("Please use a timestamp greater")) + } finally { + client.close() + } } } integrationTest("getCDFFiles with endingTimestamp") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - // This is to test that timestamp is correctly passed to the server and parsed. - // The error message is expected as we are using a timestamp much larger than the latest - // version of the table. - val cdfOptions = Map("startingVersion" -> "0", "endingTimestamp" -> "2100-01-01T00:00:00Z") - val errorMessage = intercept[UnexpectedHttpStatus] { - val tableFiles = client.getCDFFiles( - Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), - cdfOptions, - false + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat ) - }.getMessage - assert(errorMessage.contains("Please use a timestamp less than or equal to")) - } finally { - client.close() + try { + // This is to test that timestamp is correctly passed to the server and parsed. + // The error message is expected as we are using a timestamp much larger than the latest + // version of the table. + val cdfOptions = Map("startingVersion" -> "0", "endingTimestamp" -> "2100-01-01T00:00:00Z") + val errorMessage = intercept[UnexpectedHttpStatus] { + val tableFiles = client.getCDFFiles( + Table(name = "cdf_table_cdf_enabled", schema = "default", share = "share8"), + cdfOptions, + false + ) + }.getMessage + assert(errorMessage.contains("Please use a timestamp less than or equal to")) + } finally { + client.close() + } } } integrationTest("getCDFFiles_exceptions") { - val client = new DeltaSharingRestClient(testProfileProvider, sslTrustAll = true) - try { - val cdfOptions = Map("startingVersion" -> "0") - val errorMessage = intercept[UnexpectedHttpStatus] { - client.getCDFFiles( - Table(name = "table1", schema = "default", share = "share1"), - cdfOptions, - false + Seq( + DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ).foreach { + responseFormat => + val client = new DeltaSharingRestClient( + testProfileProvider, + sslTrustAll = true, + responseFormat = responseFormat ) - }.getMessage - assert(errorMessage.contains("cdf is not enabled on table share1.default.table1")) - } finally { - client.close() + try { + val cdfOptions = Map("startingVersion" -> "0") + val errorMessage = intercept[UnexpectedHttpStatus] { + client.getCDFFiles( + Table(name = "table1", schema = "default", share = "share1"), + cdfOptions, + false + ) + }.getMessage + assert(errorMessage.contains("cdf is not enabled on table share1.default.table1")) + } finally { + client.close() + } } } } diff --git a/server/src/main/protobuf/protocol.proto b/server/src/main/protobuf/protocol.proto index 07e273fdd..9d31e5b30 100644 --- a/server/src/main/protobuf/protocol.proto +++ b/server/src/main/protobuf/protocol.proto @@ -63,6 +63,9 @@ message QueryTableRequest { // Query all data change files until endingVersion, inclusive. Only used when startingVersion // is set, otherwise will be ignored. optional int64 endingVersion = 7; + + // TODO: update this. The format of the response, supported formats: parquet, delta. + optional bool queryDeltaLog = 8; } message ListSharesResponse { diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index 709a24a0c..f8da4925d 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -49,7 +49,9 @@ private[sharing] class DeltaSharingDataSource val options = new DeltaSharingOptions(parameters) val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - val deltaLog = RemoteDeltaLog(path) + val deltaLog = RemoteDeltaLog( + path, forStreaming = false, responseFormat = options.responseFormat + ) deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions) } @@ -68,7 +70,9 @@ private[sharing] class DeltaSharingDataSource } val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - val deltaLog = RemoteDeltaLog(path, forStreaming = true) + val deltaLog = RemoteDeltaLog( + path, forStreaming = true, responseFormat = options.responseFormat + ) val schemaToUse = deltaLog.snapshot().schema if (schemaToUse.isEmpty) { throw DeltaSharingErrors.schemaNotSetException @@ -93,7 +97,7 @@ private[sharing] class DeltaSharingDataSource } val options = new DeltaSharingOptions(parameters) val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - val deltaLog = RemoteDeltaLog(path, forStreaming = true) + val deltaLog = RemoteDeltaLog(path, forStreaming = true, options.responseFormat) DeltaSharingSource(SparkSession.active, deltaLog, options) } diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala index a6b29e337..5aa425d0b 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala @@ -95,6 +95,8 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser { val timestampAsOf = options.get(TIME_TRAVEL_TIMESTAMP).map(getFormattedTimestamp(_)) + val responseFormat = options.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET) + def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined // Parse the input timestamp string and TimestampType, and generate a formatted timestamp string @@ -182,6 +184,11 @@ object DeltaSharingOptions extends Logging { val TIME_TRAVEL_VERSION = "versionAsOf" val TIME_TRAVEL_TIMESTAMP = "timestampAsOf" + val RESPONSE_FORMAT = "responseFormat" + + val RESPONSE_FORMAT_PARQUET = "parquet" + val RESPONSE_FORMAT_DELTA = "delta" + val validCdfOptions = Map( CDF_READ_OPTION -> "", CDF_READ_OPTION_LEGACY -> "", diff --git a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala index c76677f12..19f28540d 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala @@ -130,9 +130,12 @@ private[sharing] object RemoteDeltaLog { (profileFile, tableSplits(0), tableSplits(1), tableSplits(2)) } - def apply(path: String, forStreaming: Boolean = false): RemoteDeltaLog = { + def apply( + path: String, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET): RemoteDeltaLog = { val (profileFile, share, schema, table) = parsePath(path) - val client = DeltaSharingRestClient(profileFile, forStreaming) + val client = DeltaSharingRestClient(profileFile, forStreaming, responseFormat) val deltaSharingTable = DeltaSharingTable(name = table, schema = schema, share = share) new RemoteDeltaLog(deltaSharingTable, new Path(path), client) } diff --git a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala index 0bbf2c3b0..08a788868 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala @@ -41,7 +41,9 @@ class TestDeltaSharingClient( numRetries: Int = 10, maxRetryDuration: Long = Long.MaxValue, sslTrustAll: Boolean = false, - includeHistoricalMetadata: Boolean = false) extends DeltaSharingClient { + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET + ) extends DeltaSharingClient { private val metadataString = """{"metaData":{"id":"93351cf1-c931-4326-88f0-d10e29e71b21","format":