diff --git a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala index d01ae7873..f8baa690b 100644 --- a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala +++ b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala @@ -444,7 +444,14 @@ class DeltaSharedTable( if (startingVersion.isDefined) { // Only read changes up to snapshot.version, and ignore changes that are committed during // queryDataChangeSinceStartVersion. - queryDataChangeSinceStartVersion(startingVersion.get, endingVersion, responseFormat) + queryDataChangeSinceStartVersion( + startingVersion.get, + endingVersion, + maxFiles, + pageTokenOpt, + queryParamChecksum, + responseFormat + ) } else if (includeFiles) { val ts = if (isVersionQuery) { val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion( @@ -530,66 +537,125 @@ class DeltaSharedTable( private def queryDataChangeSinceStartVersion( startingVersion: Long, endingVersion: Option[Long], + maxFilesOpt: Option[Int], + pageTokenOpt: Option[QueryTablePageToken], + queryParamChecksum: String, responseFormat: String - ): Seq[Object] = { - var latestVersion = tableVersion + ): Seq[Object] = { + // For subsequent page calls, instead of using the current latestVersion, use latestVersion in + // the pageToken (which is equal to the latestVersion when the first page call is received), + // in case the latestVersion changes after the first page call. + val latestVersion = pageTokenOpt.map(_.getLatestVersion).getOrElse(tableVersion) if (startingVersion > latestVersion) { throw DeltaCDFErrors.startVersionAfterLatestVersion(startingVersion, latestVersion) } if (endingVersion.isDefined && endingVersion.get > latestVersion) { throw DeltaCDFErrors.endVersionAfterLatestVersion(endingVersion.get, latestVersion) } - latestVersion = latestVersion.min(endingVersion.getOrElse(latestVersion)) + // We override (start, end) in subsequent page calls because: + // - Versions that are processed in previous pages can be skipped. + // - Versions that are committed after the first page call should be ignored, especially + // when the endingVersion is not specified and resolved to latestVersion. + val start = pageTokenOpt.map(_.getStartingVersion).getOrElse(startingVersion) + val end = pageTokenOpt + .map(_.getEndingVersion) + .orElse(endingVersion) + .getOrElse(latestVersion) + .min(latestVersion) val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion( deltaLog.store, deltaLog.logPath, - startingVersion, - latestVersion + 1, + start, + end + 1, conf ) + // Enforce page size only when `maxFiles` is specified for backwards compatibility. + val pageSizeOpt = maxFilesOpt.map(_.min(queryTablePageSizeLimit)) + val tokenGenerator = { (v: Long, idx: Int) => + val nextPageTokenStr = encodeQueryTablePageToken( + QueryTablePageToken( + id = Some(tableConfig.id), + startingVersion = Some(v), + endingVersion = Some(end), + latestVersion = Some(latestVersion), + checksum = Some(queryParamChecksum), + startingActionIndex = Some(idx), + expirationTimestamp = Some(System.currentTimeMillis() + queryTablePageTokenTtlMs) + ) + ) + model.NextPageToken(nextPageTokenStr).wrap + } + var numSignedFiles = 0 val actions = ListBuffer[Object]() - deltaLog.getChanges(startingVersion, true).asScala.toSeq - .filter(_.getVersion <= latestVersion).foreach{ versionLog => - val v = versionLog.getVersion - val versionActions = versionLog.getActions.asScala.map(x => ConversionUtils.convertActionJ(x)) - val ts = timestampsByVersion.get(v).orNull - versionActions.foreach { - case a: AddFile if a.dataChange => - actions.append( - getResponseAddFile( - a, - fileSigner.sign(absolutePath(deltaLog.dataPath, a.path)), - v, - ts.getTime, - responseFormat, - true - ) - ) - case r: RemoveFile if r.dataChange => - actions.append( - getResponseRemoveFile( - r, - fileSigner.sign(absolutePath(deltaLog.dataPath, r.path)), - v, - ts.getTime, - responseFormat + deltaLog + .getChanges(start, true) + .asScala + .toSeq + .filter(_.getVersion <= end) + .foreach { versionLog => + val v = versionLog.getVersion + var indexedVersionActions = + versionLog.getActions.asScala.map(x => ConversionUtils.convertActionJ(x)).zipWithIndex + val ts = timestampsByVersion.get(v).orNull + if (pageTokenOpt.exists(_.getStartingVersion == v)) { + // Skip actions that are already processed in previous pages + indexedVersionActions = + indexedVersionActions.drop(pageTokenOpt.get.getStartingActionIndex) + } + indexedVersionActions.foreach { + case (a: AddFile, idx) if a.dataChange => + // Return early if we already have enough files in the current page + if (pageSizeOpt.contains(numSignedFiles)) { + actions.append(tokenGenerator(v, idx)) + return actions.toSeq + } + actions.append( + getResponseAddFile( + a, + fileSigner.sign(absolutePath(deltaLog.dataPath, a.path)), + v, + ts.getTime, + responseFormat, + true + ) ) - ) - case p: Protocol => - assertProtocolRead(p) - case m: Metadata => - if (v > startingVersion) { + numSignedFiles += 1 + case (r: RemoveFile, idx) if r.dataChange => + // Return early if we already have enough files in the current page + if (pageSizeOpt.contains(numSignedFiles)) { + actions.append(tokenGenerator(v, idx)) + return actions.toSeq + } actions.append( - getResponseMetadata( - m, - Some(v), + getResponseRemoveFile( + r, + fileSigner.sign(absolutePath(deltaLog.dataPath, r.path)), + v, + ts.getTime, responseFormat ) ) - } - case _ => () + numSignedFiles += 1 + case (p: Protocol, _) => + assertProtocolRead(p) + case (m: Metadata, _) => + if (v > startingVersion) { + actions.append( + getResponseMetadata( + m, + Some(v), + responseFormat + ) + ) + } + case _ => () + } } + // Return an empty `nextPageToken` object only when `maxFiles` is specified for + // backwards compatibility. + if (maxFilesOpt.isDefined) { + actions.append(model.NextPageToken(null).wrap) } actions.toSeq } diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 67707c680..935b886c3 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -1483,6 +1483,159 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } } + integrationTest("streaming_table_with_optimize - paginated query with startingVersion") { + Seq(RESPONSE_FORMAT_PARQUET, RESPONSE_FORMAT_DELTA).foreach { responseFormat => + // version 6: 1 REMOVE + 1 ADD + var response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/streaming_table_with_optimize/query"), + Some("POST"), + Some("""{"startingVersion": 6, "maxFiles": 1}"""), + Some(6), + responseFormat + ) + var lines = response.split("\n") + assert(lines.length == 4) + val protocol = lines(0) + val metadata = lines(1) + val files = ArrayBuffer[String]() + files.append(lines(2)) + var nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + var numPages = 1 + while (nextPageToken != null && nextPageToken.token != null) { + numPages += 1 + response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/streaming_table_with_optimize/query"), + Some("POST"), + Some(s"""{"startingVersion": 6, "maxFiles": 1, "pageToken": "${nextPageToken.token}"}"""), + Some(6), + responseFormat + ) + lines = response.split("\n") + assert(lines.length == 4) + assert(protocol == lines(0)) + assert(metadata == lines(1)) + files.append(lines(2)) + nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + } + assert(numPages == 2) + + if (responseFormat == RESPONSE_FORMAT_DELTA) { + val expectedProtocol = DeltaProtocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[DeltaSingleAction](protocol)) + val expectedMetadata = DeltaMetadata( + id = "4929d09e-b085-4d22-a95e-7416fb2f78ab", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]}""", + configuration = Map("delta.enableChangeDataFeed" -> "true"), + partitionColumns = Seq.empty, + createdTime = Some(1664325322573L), + version = 6 + ).wrap + assert(expectedMetadata == JsonUtils.fromJson[DeltaSingleAction](metadata)) + } else { + val expectedProtocol = Protocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) + val expectedMetadata = Metadata( + id = "4929d09e-b085-4d22-a95e-7416fb2f78ab", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]}""", + configuration = Map("enableChangeDataFeed" -> "true"), + partitionColumns = Nil, + version = 6 + ).wrap + assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) + } + assert(files.size == 2) + verifyRemove( + files(0), + size = 1283, + partitionValues = Map.empty, + version = 6, + timestamp = 1664325549000L, + responseFormat + ) + verifyAddFile( + files(1), + size = 1247, + stats = + """{"numRecords":1,"minValues":{"name":"3","age":3,"birthday":"2020-01-01"},"maxValues":{"name":"3","age":3,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0,"_change_type":1}}""", + partitionValues = Map.empty, + version = 6, + timestamp = 1664325549000L, + responseFormat + ) + } + } + + integrationTest("streaming_table_with_optimize - paginated query with startingVersion and endingVersion") { + // version 2: Add + // version 3: Add + var response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/streaming_table_with_optimize/query"), + Some("POST"), + Some("""{"startingVersion": 2, "endingVersion": 3, "maxFiles": 1}"""), + Some(2) + ) + var lines = response.split("\n") + assert(lines.length == 4) + val protocol = lines(0) + val metadata = lines(1) + val files = ArrayBuffer[String]() + files.append(lines(2)) + var nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + var numPages = 1 + while (nextPageToken != null && nextPageToken.token != null) { + numPages += 1 + response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/streaming_table_with_optimize/query"), + Some("POST"), + Some(s"""{"startingVersion": 2, "endingVersion": 3, "maxFiles": 1, "pageToken": "${nextPageToken.token}"}"""), + Some(2) + ) + lines = response.split("\n") + assert(lines.length == 4) + assert(protocol == lines(0)) + assert(metadata == lines(1)) + files.append(lines(2)) + nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + } + assert(numPages == 2) + + val expectedProtocol = Protocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) + val expectedMetadata = Metadata( + id = "4929d09e-b085-4d22-a95e-7416fb2f78ab", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]}""", + configuration = Map("enableChangeDataFeed" -> "true"), + partitionColumns = Nil, + version = 2 + ).wrap + assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) + assert(files.length == 2) + verifyAddFile( + files(0), + size = 1030, + stats = + """{"numRecords":1,"minValues":{"name":"2","age":2,"birthday":"2020-01-01"},"maxValues":{"name":"2","age":2,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", + partitionValues = Map.empty, + version = 2, + timestamp = 1664325372000L + ) + verifyAddFile( + files(1), + size = 1030, + stats = + """{"numRecords":1,"minValues":{"name":"3","age":3,"birthday":"2020-01-01"},"maxValues":{"name":"3","age":3,"birthday":"2020-01-01"},"nullCount":{"name":0,"age":0,"birthday":0}}""", + partitionValues = Map.empty, + version = 3, + timestamp = 1664325375000L + ) + } + integrationTest("streaming_table_metadata_protocol - startingVersion 0 success") { val p = s""" @@ -1526,6 +1679,60 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { assert(actions(5).add != null) } + integrationTest("streaming_table_metadata_protocol - paginated query") { + // version 0: CREATE TABLE, protocol/metadata + // version 1: INSERT + // version 2: ALTER TABLE, metadata + // version 3: ALTER TABLE, metadata + // version 4: INSERT + val expectedProtocol = Protocol(minReaderVersion = 1) + val expectedMetadata = Metadata( + id = "eaca659e-28ac-4c68-8c72-0c96205c8160", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]}""", + partitionColumns = Nil, + version = 0 + ) + + // Page 1 + var response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/streaming_table_metadata_protocol/query"), + Some("POST"), + Some("""{"startingVersion": 0, "maxFiles": 1}"""), + Some(0) + ) + var actions = response.split("\n").map(JsonUtils.fromJson[SingleAction](_)) + assert(actions.length == 6) + assert(expectedProtocol == actions(0).protocol) + assert(expectedMetadata == actions(1).metaData) + assert(actions(2).add != null) + // Check metadata for version 2. + assert(expectedMetadata.copy(configuration = Map("enableChangeDataFeed" -> "true"), version = 2) == actions(3).metaData) + // Check metadata for version 3. + assert(expectedMetadata.copy(configuration = Map.empty, version = 3) == actions(4).metaData) + var nextPageToken = actions(5).nextPageToken + assert(nextPageToken != null) + assert(nextPageToken.token != null) + + // Page 2 + response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/streaming_table_metadata_protocol/query"), + Some("POST"), + Some(s"""{"startingVersion": 0, "maxFiles": 1, "pageToken": "${nextPageToken.token}"}"""), + Some(0) + ) + actions = response.split("\n").map(JsonUtils.fromJson[SingleAction](_)) + assert(actions.length == 4) + assert(expectedProtocol == actions(0).protocol) + assert(expectedMetadata == actions(1).metaData) + assert(actions(2).add != null) + // Check this is the last page (token is empty) + nextPageToken = actions(3).nextPageToken + assert(nextPageToken != null) + assert(nextPageToken.token == null) + } + integrationTest("streaming_table_metadata_protocol - startingVersion 2 success") { val p = s"""