From b5170d47e9a9a46ccacde27447bf9505b9ca5af9 Mon Sep 17 00:00:00 2001 From: Charlene Lyu Date: Sat, 22 Jul 2023 00:10:11 -0700 Subject: [PATCH] pagination for queryTable at snapshot --- server/src/main/protobuf/protocol.proto | 35 +++ .../sharing/server/DeltaSharingService.scala | 7 + .../sharing/server/config/ServerConfig.scala | 10 +- .../scala/io/delta/sharing/server/model.scala | 14 +- .../internal/DeltaSharedTableLoader.scala | 208 +++++++++++--- .../internal/JsonPredicateFilterUtils.scala | 11 +- .../internal/PartitionFilterUtils.scala | 9 +- .../server/DeltaSharingServiceSuite.scala | 266 ++++++++++++++++++ .../JsonPredicateFilterUtilsSuite.scala | 6 +- .../internal/PartitionFilterUtilsSuite.scala | 18 +- .../conf/delta-sharing-server.yaml.template | 4 + 11 files changed, 523 insertions(+), 65 deletions(-) diff --git a/server/src/main/protobuf/protocol.proto b/server/src/main/protobuf/protocol.proto index 9d31e5b30..8ec9a3de4 100644 --- a/server/src/main/protobuf/protocol.proto +++ b/server/src/main/protobuf/protocol.proto @@ -47,6 +47,13 @@ message QueryTableRequest { repeated string predicateHints = 1; optional string jsonPredicateHints = 6; optional int64 limitHint = 2; + // The maximum number of files to return in one page. This is a hint for the server, + // and the server may not honor it. The server that supports pagination should return + // no more than this limit, but it can return fewer. If there are more files available, + // a nextPageToken will be returned to the user to retrieve the next page. + optional int32 maxFiles = 9; + // The page token used to retrieve the subsequent page. + optional string pageToken = 10; // Only one of the three parameters can be supported in a single query. // If none of them is specified, the query is for the latest version. @@ -101,3 +108,31 @@ message PageToken { optional string share = 2; optional string schema = 3; } + +// Define a special class to generate the page token for pagination. It includes the information +// about where we should start to query, and checks whether the paginated request is valid. +message QueryTablePageToken { + // Id of the table being queried. + optional string id = 1; + // Only used in queryTable at snapshot, refers to the version being queried. + optional int64 version = 2; + // Only used in queryTable from starting_version and queryTableChanges. + // The table version where the querying process starts in next page, inclusive. + optional int64 starting_version = 3; + // Only used in queryTable from starting_version and queryTableChanges. + // The table version where the querying process ends in next page, inclusive. + optional int64 ending_version = 4; + // The checksum of other query parameters. This is used for validation purpose, as the user + // is expected to keep all other parameters to the request the same. + optional string checksum = 5; + // The local index of the first action in next page. + // For queryTable at snapshot, (version, starting_action_index) determines the entry point + // of the next page. + // For queryTable from starting_version and queryTableChanges, + // (starting_version, starting_action_index) determines the entry point of the next page. + optional int32 starting_action_index = 6; + // The expiration timestamp of the page token in milliseconds. + optional int64 expiration_timestamp = 7; + // The latest version of the table when the first page request is received. + optional int64 latest_version = 8; +} diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 8010eac93..d4518200d 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -304,6 +304,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { timestamp = None, startingVersion = None, endingVersion = None, + maxFiles = None, + pageToken = None, responseFormat = responseFormat) streamingOutput(Some(v), responseFormat, actions) } @@ -332,6 +334,9 @@ class DeltaSharingService(serverConfig: ServerConfig) { if (request.startingVersion.isDefined && request.startingVersion.get < 0) { throw new DeltaSharingIllegalArgumentException("startingVersion cannot be negative.") } + if (request.maxFiles.exists(_ <= 0)) { + throw new DeltaSharingIllegalArgumentException("maxFiles must be positive.") + } val start = System.currentTimeMillis val tableConfig = sharedTableManager.getTable(share, schema, table) @@ -365,6 +370,8 @@ class DeltaSharingService(serverConfig: ServerConfig) { request.timestamp, request.startingVersion, request.endingVersion, + request.maxFiles, + request.pageToken, responseFormat = responseFormat) if (version < tableConfig.startVersion) { throw new DeltaSharingIllegalArgumentException( diff --git a/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala b/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala index 13a149f19..7c38996e1 100644 --- a/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala +++ b/server/src/main/scala/io/delta/sharing/server/config/ServerConfig.scala @@ -57,7 +57,11 @@ case class ServerConfig( // Whether to evaluate user provided `jsonPredicateHints` @BeanProperty var evaluateJsonPredicateHints: Boolean, // The timeout of an incoming web request in seconds. Set to 0 for no timeout - @BeanProperty var requestTimeoutSeconds: Long + @BeanProperty var requestTimeoutSeconds: Long, + // The maximum page size permitted by queryTable/queryTableChanges API. + @BeanProperty var queryTablePageSizeLimit: Int, + // The TTL of the page token generated in queryTable/queryTableChanges API (in milliseconds). + @BeanProperty var queryTablePageTokenTtlMs: Int ) extends ConfigItem { import ServerConfig._ @@ -76,7 +80,9 @@ case class ServerConfig( stalenessAcceptable = false, evaluatePredicateHints = false, evaluateJsonPredicateHints = false, - requestTimeoutSeconds = 30 + requestTimeoutSeconds = 30, + queryTablePageSizeLimit = 500, + queryTablePageTokenTtlMs = 259200000 // 3 days ) } diff --git a/server/src/main/scala/io/delta/sharing/server/model.scala b/server/src/main/scala/io/delta/sharing/server/model.scala index 5d974f10d..073fd55ca 100644 --- a/server/src/main/scala/io/delta/sharing/server/model.scala +++ b/server/src/main/scala/io/delta/sharing/server/model.scala @@ -25,7 +25,8 @@ case class SingleAction( cdf: AddCDCFile = null, remove: RemoveFile = null, metaData: Metadata = null, - protocol: Protocol = null) { + protocol: Protocol = null, + nextPageToken: NextPageToken = null) { def unwrap: Action = { if (file != null) { @@ -40,6 +41,8 @@ case class SingleAction( metaData } else if (protocol != null) { protocol + } else if (nextPageToken != null) { + nextPageToken } else { null } @@ -130,6 +133,15 @@ case class RemoveFile( override def wrap: SingleAction = SingleAction(remove = this) } +/** + * A token to retrieve the subsequent page of a query. The server that supports pagination will + * return a nextPageToken at the end of the response when there are more files available than + * the page size specified by the user. + */ +case class NextPageToken(token: String) extends Action { + override def wrap: SingleAction = SingleAction(nextPageToken = this) +} + object Action { // The maximum version of the protocol that this version of Delta Standalone understands. val maxReaderVersion = 1 diff --git a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala index e317fb602..d01ae7873 100644 --- a/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala +++ b/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala @@ -19,6 +19,8 @@ package io.delta.standalone.internal import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 +import java.text.SimpleDateFormat +import java.util.{Base64, TimeZone} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -31,6 +33,7 @@ import io.delta.standalone.DeltaLog import io.delta.standalone.internal.actions.{AddCDCFile, AddFile, Metadata, Protocol, RemoveFile} import io.delta.standalone.internal.exception.DeltaErrors import io.delta.standalone.internal.util.ConversionUtils +import org.apache.commons.codec.digest.DigestUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.azure.NativeAzureFileSystem @@ -52,6 +55,8 @@ import io.delta.sharing.server.{ WasbFileSigner } import io.delta.sharing.server.config.{ServerConfig, TableConfig} +import io.delta.sharing.server.protocol.QueryTablePageToken +import io.delta.sharing.server.util.JsonUtils /** * A class to load Delta tables from `TableConfig`. It also caches the loaded tables internally @@ -68,25 +73,46 @@ class DeltaSharedTableLoader(serverConfig: ServerConfig) { def loadTable(tableConfig: TableConfig): DeltaSharedTable = { try { val deltaSharedTable = - deltaSharedTableCache.get(tableConfig.location, () => { - new DeltaSharedTable( - tableConfig, - serverConfig.preSignedUrlTimeoutSeconds, - serverConfig.evaluatePredicateHints, - serverConfig.evaluateJsonPredicateHints) - }) + deltaSharedTableCache.get( + tableConfig.location, + () => { + new DeltaSharedTable( + tableConfig, + serverConfig.preSignedUrlTimeoutSeconds, + serverConfig.evaluatePredicateHints, + serverConfig.evaluateJsonPredicateHints, + serverConfig.queryTablePageSizeLimit, + serverConfig.queryTablePageTokenTtlMs + ) + } + ) if (!serverConfig.stalenessAcceptable) { deltaSharedTable.update() } deltaSharedTable - } - catch { + } catch { case CausedBy(e: DeltaSharingUnsupportedOperationException) => throw e case e: Throwable => throw e } } } +/** + * A util class stores all query parameters. Used to compute the checksum in the page token for + * query validation. + */ +private case class QueryParamChecksum( + version: Option[Long], + timestamp: Option[String], + startingVersion: Option[Long], + startingTimestamp: Option[String], + endingVersion: Option[Long], + endingTimestamp: Option[String], + predicateHints: Seq[String], + jsonPredicateHints: Option[String], + limitHint: Option[Long], + includeHistoricalMetadata: Option[Boolean]) + /** * A table class that wraps `DeltaLog` to provide the methods used by the server. */ @@ -94,7 +120,9 @@ class DeltaSharedTable( tableConfig: TableConfig, preSignedUrlTimeoutSeconds: Long, evaluatePredicateHints: Boolean, - evaluateJsonPredicateHints: Boolean) { + evaluateJsonPredicateHints: Boolean, + queryTablePageSizeLimit: Int, + queryTablePageTokenTtlMs: Int) { private val conf = withClassLoader { new Configuration() @@ -345,6 +373,7 @@ class DeltaSharedTable( } } + // scalastyle:off argcount def query( includeFiles: Boolean, predicateHints: Seq[String], @@ -354,28 +383,55 @@ class DeltaSharedTable( timestamp: Option[String], startingVersion: Option[Long], endingVersion: Option[Long], + maxFiles: Option[Int], + pageToken: Option[String], responseFormat: String): (Long, Seq[Object]) = withClassLoader { + // scalastyle:on argcount // TODO Support `limitHint` if (Seq(version, timestamp, startingVersion).filter(_.isDefined).size >= 2) { throw new DeltaSharingIllegalArgumentException( ErrorStrings.multipleParametersSetErrorMsg(Seq("version", "timestamp", "startingVersion")) ) } - val snapshot = if (version.orElse(startingVersion).isDefined) { - deltaLog.getSnapshotForVersionAsOf(version.orElse(startingVersion).get) - } else if (timestamp.isDefined) { - val ts = DeltaSharingHistoryManager.getTimestamp("timestamp", timestamp.get) - try { - deltaLog.getSnapshotForTimestampAsOf(ts.getTime()) - } catch { - // Convert to DeltaSharingIllegalArgumentException to return 4xx instead of 5xx error code - // Only convert known exceptions around timestamp too late or too early - case e: IllegalArgumentException => - throw new DeltaSharingIllegalArgumentException(e.getMessage) + // Validate pageToken if it's specified + lazy val queryParamChecksum = computeChecksum( + QueryParamChecksum( + version = version, + timestamp = timestamp, + startingVersion = startingVersion, + startingTimestamp = None, + endingVersion = endingVersion, + endingTimestamp = None, + predicateHints = predicateHints, + jsonPredicateHints = jsonPredicateHints, + limitHint = limitHint, + includeHistoricalMetadata = None + ) + ) + val pageTokenOpt = pageToken.map(decodeAndValidatePageToken(_, queryParamChecksum)) + // For queryTable at snapshot, override version in subsequent page calls using the version + // in the pageToken to make sure we're querying the same version across pages. Especially + // when the first page is querying the latest snapshot, table changes that are committed + // after the first page call should be ignored. + val versionFromPageToken = pageTokenOpt.flatMap(_.version) + val snapshot = + if (versionFromPageToken.orElse(version).orElse(startingVersion).isDefined) { + deltaLog.getSnapshotForVersionAsOf( + versionFromPageToken.orElse(version).orElse(startingVersion).get + ) + } else if (timestamp.isDefined) { + val ts = DeltaSharingHistoryManager.getTimestamp("timestamp", timestamp.get) + try { + deltaLog.getSnapshotForTimestampAsOf(ts.getTime()) + } catch { + // Convert to DeltaSharingIllegalArgumentException to return 4xx instead of 5xx error code + // Only convert known exceptions around timestamp too late or too early + case e: IllegalArgumentException => + throw new DeltaSharingIllegalArgumentException(e.getMessage) + } + } else { + deltaLog.snapshot } - } else { - deltaLog.snapshot - } // TODO Open the `state` field in Delta Standalone library. val stateMethod = snapshot.getClass.getMethod("state") val state = stateMethod.invoke(snapshot).asInstanceOf[SnapshotImpl.State] @@ -402,35 +458,66 @@ class DeltaSharedTable( } else { None } - - var selectedFiles = state.activeFiles.toSeq - var filteredFiles = + // Enforce page size only when `maxFiles` is specified for backwards compatibility. + val pageSizeOpt = maxFiles.map(_.min(queryTablePageSizeLimit)) + var nextPageTokenStr: String = null + + // Skip files that are already processed in previous pages + val selectedIndexedFiles = state.activeFiles.toSeq.zipWithIndex + .drop(pageTokenOpt.map(_.getStartingActionIndex).getOrElse(0)) + // Select files that satisfy partition and predicate hints + var filteredIndexedFiles = if (evaluateJsonPredicateHints && snapshot.metadataScala.partitionColumns.nonEmpty) { - JsonPredicateFilterUtils.evaluatePredicate(jsonPredicateHints, selectedFiles) + JsonPredicateFilterUtils.evaluatePredicate(jsonPredicateHints, selectedIndexedFiles) } else { - selectedFiles + selectedIndexedFiles } - filteredFiles = + filteredIndexedFiles = if (evaluatePredicateHints && snapshot.metadataScala.partitionColumns.nonEmpty) { PartitionFilterUtils.evaluatePredicate( snapshot.metadataScala.schemaString, snapshot.metadataScala.partitionColumns, predicateHints, - filteredFiles + filteredIndexedFiles ) } else { - filteredFiles + filteredIndexedFiles } - filteredFiles.map { addFile => - val cloudPath = absolutePath(deltaLog.dataPath, addFile.path) - val signedUrl = fileSigner.sign(cloudPath) - getResponseAddFile( - addFile, - signedUrl, - if (isVersionQuery) { snapshot.version } else null, - if (isVersionQuery) { ts.get } else null, - responseFormat + // If number of valid files is greater than page size, generate nextPageToken and + // drop additional files. + if (pageSizeOpt.exists(_ < filteredIndexedFiles.length)) { + nextPageTokenStr = encodeQueryTablePageToken( + QueryTablePageToken( + id = Some(tableConfig.id), + version = Some(snapshot.version), + checksum = Some(queryParamChecksum), + startingActionIndex = Some(filteredIndexedFiles(pageSizeOpt.get)._2), + expirationTimestamp = Some(System.currentTimeMillis() + queryTablePageTokenTtlMs) + ) ) + filteredIndexedFiles = filteredIndexedFiles.take(pageSizeOpt.get) + } + val filteredFiles = filteredIndexedFiles.map { + case (addFile, _) => + val cloudPath = absolutePath(deltaLog.dataPath, addFile.path) + val signedUrl = fileSigner.sign(cloudPath) + getResponseAddFile( + addFile, + signedUrl, + if (isVersionQuery) { snapshot.version } else null, + if (isVersionQuery) { ts.get } else null, + responseFormat + ) + } + // Return `nextPageToken` object only when `maxFiles` is specified for backwards + // compatibility. If this is the last page, an empty `nextPageToken` object will + // be returned to explicitly indicate that there are no more pages. + filteredFiles ++ { + if (maxFiles.isDefined) { + Seq(model.NextPageToken(nextPageTokenStr).wrap) + } else { + Nil + } } } else { Nil @@ -643,6 +730,45 @@ class DeltaSharedTable( new Path(path, p) } } + + private def computeChecksum(queryParamChecksum: QueryParamChecksum): String = { + DigestUtils.sha256Hex(JsonUtils.toJson(queryParamChecksum)) + } + + private def decodeAndValidatePageToken( + tokenStr: String, + expectedChecksum: String): QueryTablePageToken = { + val token = decodeQueryTablePageToken(tokenStr) + val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z") + timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) + if (token.getExpirationTimestamp < System.currentTimeMillis()) { + throw new DeltaSharingIllegalArgumentException( + s"""The next page token has already expired at + |${timestampFormat.format(token.getExpirationTimestamp)}. Please restart the + |query.""".stripMargin + ) + } + if (token.getId != tableConfig.id) { + throw new DeltaSharingIllegalArgumentException( + "The table specified in the next page token does not match the table being queried." + ) + } + if (token.getChecksum != expectedChecksum) { + throw new DeltaSharingIllegalArgumentException( + """Query parameter mismatch detected for the next page token query. The query parameter + |cannot change when querying the next page results.""".stripMargin + ) + } + token + } + + private def encodeQueryTablePageToken(token: QueryTablePageToken): String = { + Base64.getUrlEncoder.encodeToString(token.toByteArray) + } + + private def decodeQueryTablePageToken(tokenStr: String): QueryTablePageToken = { + QueryTablePageToken.parseFrom(Base64.getUrlDecoder.decode(tokenStr)) + } } object DeltaSharedTable { diff --git a/server/src/main/scala/io/delta/standalone/internal/JsonPredicateFilterUtils.scala b/server/src/main/scala/io/delta/standalone/internal/JsonPredicateFilterUtils.scala index fb068c995..f90520960 100644 --- a/server/src/main/scala/io/delta/standalone/internal/JsonPredicateFilterUtils.scala +++ b/server/src/main/scala/io/delta/standalone/internal/JsonPredicateFilterUtils.scala @@ -37,14 +37,15 @@ object JsonPredicateFilterUtils { // Returns the add files that match json predicates. def evaluatePredicate( jsonPredicateHints: Option[String], - addFiles: Seq[AddFile]): Seq[AddFile] = { + addFiles: Seq[(AddFile, Int)]): Seq[(AddFile, Int)] = { if (!jsonPredicateHints.isDefined) { return addFiles } val op = maybeCreateJsonPredicateOp(jsonPredicateHints) - addFiles.filter(addFile => { - matchJsonPredicate(op, addFile.partitionValues) - }) + addFiles.filter { + case (addFile, _) => + matchJsonPredicate(op, addFile.partitionValues) + } } // Creates a json predicate op from the specified jsonPredicateHints. @@ -58,7 +59,7 @@ object JsonPredicateFilterUtils { if (opJson.size > jsonPredicateHintsSizeLimit) { throw new IllegalArgumentException( "The jsonPredicateHints size is " + opJson.size + - " which exceeds the limit of " + jsonPredicateHintsSizeLimit + " which exceeds the limit of " + jsonPredicateHintsSizeLimit ) } val op = JsonUtils.fromJson[BaseOp](opJson) diff --git a/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala b/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala index a2b34040b..e8af1f70b 100644 --- a/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala +++ b/server/src/main/scala/io/delta/standalone/internal/PartitionFilterUtils.scala @@ -38,7 +38,7 @@ object PartitionFilterUtils { schemaString: String, partitionColumns: Seq[String], partitionFilters: Seq[String], - addFiles: Seq[AddFile]): Seq[AddFile] = { + addFiles: Seq[(AddFile, Int)]): Seq[(AddFile, Int)] = { try { val tableSchema = DataType.fromJson(schemaString).asInstanceOf[StructType] val partitionSchema = new StructType(partitionColumns.map(c => tableSchema(c)).toArray) @@ -58,9 +58,10 @@ object PartitionFilterUtils { } else { val predicate = InterpretedPredicate.create(exprs.reduce(And), attrs) predicate.initialize(0) - addFiles.filter { addFile => - val converter = CatalystTypeConverters.createToCatalystConverter(addSchema) - predicate.eval(converter(addFile).asInstanceOf[InternalRow]) + addFiles.filter { + case (addFile, _) => + val converter = CatalystTypeConverters.createToCatalystConverter(addSchema) + predicate.eval(converter(addFile).asInstanceOf[InternalRow]) } } } catch { diff --git a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala index 2f81c2e4c..67707c680 100644 --- a/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala +++ b/server/src/test/scala/io/delta/sharing/server/DeltaSharingServiceSuite.scala @@ -621,6 +621,122 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { } } + integrationTest("table1 - non partitioned - paginated query") { + Seq(RESPONSE_FORMAT_PARQUET, RESPONSE_FORMAT_DELTA).foreach { responseFormat => + var response = readNDJson( + requestPath("/shares/share1/schemas/default/tables/table1/query"), + Some("POST"), + Some("""{"maxFiles": 1}"""), + Some(2), + responseFormat + ) + var lines = response.split("\n") + assert(lines.length == 4) + val protocol = lines(0) + val metadata = lines(1) + val files = ArrayBuffer[String]() + files.append(lines(2)) + var nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + var numPages = 1 + while (nextPageToken != null && nextPageToken.token != null) { + numPages += 1 + response = readNDJson( + requestPath("/shares/share1/schemas/default/tables/table1/query"), + Some("POST"), + Some(s"""{"maxFiles": 1, "pageToken": "${nextPageToken.token}"}"""), + Some(2), + responseFormat + ) + lines = response.split("\n") + assert(lines.length == 4) + assert(protocol == lines(0)) + assert(metadata == lines(1)) + files.append(lines(2)) + nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + } + assert(numPages == 2) + + if (responseFormat == RESPONSE_FORMAT_DELTA) { + val expectedProtocol = DeltaProtocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[DeltaSingleAction](protocol)) + val expectedMetadata = DeltaMetadata( + id = "ed96aa41-1d81-4b7f-8fb5-846878b4b0cf", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"eventTime","type":"timestamp","nullable":true,"metadata":{}},{"name":"date","type":"date","nullable":true,"metadata":{}}]}""", + partitionColumns = Seq.empty, + createdTime = Some(1619591469476L) + ).wrap + assert(expectedMetadata == JsonUtils.fromJson[DeltaSingleAction](metadata)) + val actualFiles = files.map(f => JsonUtils.fromJson[DeltaSingleAction](f).add) + val expectedFiles = Seq( + DeltaAddFile( + path = actualFiles(0).path, + expirationTimestamp = actualFiles(0).expirationTimestamp, + id = "061cb3683a467066995f8cdaabd8667d", + partitionValues = Map.empty, + size = 781, + modificationTime = 1619591543000L, + dataChange = false, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" + ), + DeltaAddFile( + path = actualFiles(1).path, + expirationTimestamp = actualFiles(1).expirationTimestamp, + id = "e268cbf70dbaa6143e7e9fa3e2d3b00e", + partitionValues = Map.empty, + size = 781, + modificationTime = 1619591525000L, + dataChange = false, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" + ) + ) + assert(actualFiles.count(_.expirationTimestamp > System.currentTimeMillis()) == 2) + assert(expectedFiles == actualFiles.toList) + verifyPreSignedUrl(actualFiles(0).path, 781) + verifyPreSignedUrl(actualFiles(1).path, 781) + } else { + val expectedProtocol = Protocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) + val expectedMetadata = Metadata( + id = "ed96aa41-1d81-4b7f-8fb5-846878b4b0cf", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"eventTime","type":"timestamp","nullable":true,"metadata":{}},{"name":"date","type":"date","nullable":true,"metadata":{}}]}""", + partitionColumns = Nil + ).wrap + assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) + val actualFiles = files.map(f => JsonUtils.fromJson[SingleAction](f).file) + val expectedFiles = Seq( + AddFile( + url = actualFiles(0).url, + expirationTimestamp = actualFiles(0).expirationTimestamp, + id = "061cb3683a467066995f8cdaabd8667d", + partitionValues = Map.empty, + size = 781, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:22.421Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" + ), + AddFile( + url = actualFiles(1).url, + expirationTimestamp = actualFiles(1).expirationTimestamp, + id = "e268cbf70dbaa6143e7e9fa3e2d3b00e", + partitionValues = Map.empty, + size = 781, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"maxValues":{"eventTime":"2021-04-28T06:32:02.070Z","date":"2021-04-28"},"nullCount":{"eventTime":0,"date":0}}""" + ) + ) + assert(actualFiles.count(_.expirationTimestamp != null) == 2) + assert(expectedFiles == actualFiles.toList) + verifyPreSignedUrl(actualFiles(0).url, 781) + verifyPreSignedUrl(actualFiles(1).url, 781) + } + } + } + integrationTest("table2 - partitioned - /shares/{share}/schemas/{schema}/tables/{table}/metadata") { val response = readNDJson(requestPath("/shares/share2/schemas/default/tables/table2/metadata"), expectedTableVersion = Some(2)) val Array(protocol, metadata) = response.split("\n") @@ -714,6 +830,94 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { verifyPreSignedUrl(actualFiles(1).url, 573) } + integrationTest("table2 - partitioned - paginated query") { + var body = + """ + |{ + | "predicateHints": [ + | "date = CAST('2021-04-28' AS DATE)" + | ], + | "maxFiles": 1 + |} + |""".stripMargin + var response = readNDJson( + requestPath("/shares/share2/schemas/default/tables/table2/query"), + Some("POST"), + Some(body), + Some(2) + ) + var lines = response.split("\n") + assert(lines.length == 4) + val protocol = lines(0) + val metadata = lines(1) + val files = ArrayBuffer[String]() + files.append(lines(2)) + var nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + var numPages = 1 + while (nextPageToken != null && nextPageToken.token != null) { + numPages += 1 + body = + s""" + |{ + | "predicateHints": [ + | "date = CAST('2021-04-28' AS DATE)" + | ], + | "maxFiles": 1, + | "pageToken": "${nextPageToken.token}" + |} + |""".stripMargin + response = readNDJson( + requestPath("/shares/share2/schemas/default/tables/table2/query"), + Some("POST"), + Some(body), + Some(2) + ) + lines = response.split("\n") + assert(lines.length == 4) + assert(protocol == lines(0)) + assert(metadata == lines(1)) + files.append(lines(2)) + nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken + } + assert(numPages == 2) + + val expectedProtocol = Protocol(minReaderVersion = 1).wrap + assert(expectedProtocol == JsonUtils.fromJson[SingleAction](protocol)) + val expectedMetadata = Metadata( + id = "f8d5c169-3d01-4ca3-ad9e-7dc3355aedb2", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"eventTime","type":"timestamp","nullable":true,"metadata":{}},{"name":"date","type":"date","nullable":true,"metadata":{}}]}""", + partitionColumns = Seq("date") + ).wrap + assert(expectedMetadata == JsonUtils.fromJson[SingleAction](metadata)) + val actualFiles = files.map(f => JsonUtils.fromJson[SingleAction](f).file) + val expectedFiles = Seq( + AddFile( + url = actualFiles(0).url, + expirationTimestamp = actualFiles(0).expirationTimestamp, + id = "9f1a49539c5cffe1ea7f9e055d5c003c", + partitionValues = Map("date" -> "2021-04-28"), + size = 573, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T23:33:57.955Z"},"maxValues":{"eventTime":"2021-04-28T23:33:57.955Z"},"nullCount":{"eventTime":0}}""" + ), + AddFile( + url = actualFiles(1).url, + expirationTimestamp = actualFiles(1).expirationTimestamp, + id = "cd2209b32f5ed5305922dd50f5908a75", + partitionValues = Map("date" -> "2021-04-28"), + size = 573, + stats = + """{"numRecords":1,"minValues":{"eventTime":"2021-04-28T23:33:48.719Z"},"maxValues":{"eventTime":"2021-04-28T23:33:48.719Z"},"nullCount":{"eventTime":0}}""" + ) + ) + assert(actualFiles.count(_.expirationTimestamp != null) == 2) + assert(expectedFiles == actualFiles.toList) + verifyPreSignedUrl(actualFiles(0).url, 573) + verifyPreSignedUrl(actualFiles(1).url, 573) + } + integrationTest("jsonPredicateTest") { // A test function that applies specified predicate hints on cdf_table_with_partition // table which has two files with dates (2020-01-01, 2020-02-02) @@ -774,6 +978,51 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { testPredicateHints(hints3, Seq("2020-01-01", "2020-02-02")) } + integrationTest("paginated query with jsonPredicates") { + // cdf_table_with_partition has two files with dates (2020-01-01, 2020-02-02) + val hints = + """{"op":"and","children":[ + | {"op":"not","children":[ + | {"op":"isNull","children":[ + | {"op":"column","name":"birthday","valueType":"date"}]}]}, + | {"op":"equal","children":[ + | {"op":"column","name":"birthday","valueType":"date"}, + | {"op":"literal","value":"2020-01-01","valueType":"date"}]} + |]}""".stripMargin.replaceAll("\n", "").replaceAll(" ", "") + val response = readNDJson( + requestPath("/shares/share8/schemas/default/tables/cdf_table_with_partition/query"), + Some("POST"), + Some(JsonUtils.toJson(Map("jsonPredicateHints" -> hints, "maxFiles" -> 1))), + Some(3) + ) + val actions = response.split("\n").map(JsonUtils.fromJson[SingleAction](_)) + assert(actions.length == 4) + val expectedProtocol = Protocol(minReaderVersion = 1) + assert(expectedProtocol == actions(0).protocol) + val expectedMetadata = Metadata( + id = "e21eb083-6976-4159-90f2-ad88d06b7c7f", + format = Format(), + schemaString = + """{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]}""", + configuration = Map("enableChangeDataFeed" -> "true"), + partitionColumns = Seq("birthday") + ) + assert(expectedMetadata == actions(1).metaData) + val actualFile = actions(2).file + val expectedAddFile = AddFile( + url = actualFile.url, + expirationTimestamp = actualFile.expirationTimestamp, + id = "a04d61f17541fac1f9b5df5b8d26fff8", + partitionValues = Map("birthday" -> "2020-01-01"), + size = 791, + stats = + """{"numRecords":1,"minValues":{"name":"1","age":1},"maxValues":{"name":"1","age":1},"nullCount":{"name":0,"age":0}}""" + ) + assert(expectedAddFile == actualFile) + val expectedNextPageToken = NextPageToken(token = null) + assert(expectedNextPageToken == actions(3).nextPageToken) + } + integrationTest("table3 - different data file schemas - /shares/{share}/schemas/{schema}/tables/{table}/metadata") { val response = readNDJson(requestPath("/shares/share1/schemas/default/tables/table3/metadata"), expectedTableVersion = Some(4)) val Array(protocol, metadata) = response.split("\n") @@ -1952,6 +2201,23 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll { ) } + integrationTest("invalid 'maxFiles' value") { + assertHttpError( + url = requestPath("/shares/share1/schemas/default/tables/table1/query"), + method = "POST", + data = Some("""{"maxFiles": 0}"""), + expectedErrorCode = 400, + expectedErrorMessage = "maxFiles must be positive" + ) + assertHttpError( + url = requestPath("/shares/share1/schemas/default/tables/table1/query"), + method = "POST", + data = Some("""{"maxFiles": 3000000000}"""), + expectedErrorCode = 400, + expectedErrorMessage = "Not an int32 value" + ) + } + integrationTest("wrong 'maxResults' type") { assertHttpError( url = requestPath("/shares?maxResults=string"), diff --git a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala index 94c1c4291..dfceb4e2b 100644 --- a/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/JsonPredicateFilterUtilsSuite.scala @@ -28,7 +28,7 @@ class JsonPredicateFilterUtilsSuite extends FunSuite { test("evaluatePredicate") { val add1 = AddFile("foo1", Map("c2" -> "0"), 1, 1, true) val add2 = AddFile("foo2", Map("c2" -> "1"), 1, 1, true) - val addFiles = add1 :: add2 :: Nil + val addFiles = Seq(add1, add2).zipWithIndex val hints1 = """{"op":"and","children":[ @@ -39,7 +39,7 @@ class JsonPredicateFilterUtilsSuite extends FunSuite { | {"op":"column","name":"c2","valueType":"int"}, | {"op":"literal","value":"0","valueType":"int"}]} |]}""".stripMargin.replaceAll("\n", "").replaceAll(" ", "") - assert(add1 :: Nil == evaluatePredicate(Some(hints1), addFiles)) + assert(Seq(addFiles(0)) == evaluatePredicate(Some(hints1), addFiles)) val hints2 = """{"op":"and","children":[ @@ -50,7 +50,7 @@ class JsonPredicateFilterUtilsSuite extends FunSuite { | {"op":"column","name":"c2","valueType":"int"}, | {"op":"literal","value":"1","valueType":"int"}]} |]}""".stripMargin.replaceAll("\n", "").replaceAll(" ", "") - assert(add2 :: Nil == evaluatePredicate(Some(hints2), addFiles)) + assert(Seq(addFiles(1)) == evaluatePredicate(Some(hints2), addFiles)) val hints3 = """{"op":"and","children":[ diff --git a/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala b/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala index 13dec780e..bd3399b9d 100644 --- a/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala +++ b/server/src/test/scala/io/delta/standalone/internal/PartitionFilterUtilsSuite.scala @@ -28,15 +28,15 @@ class PartitionFilterUtilsSuite extends FunSuite { val schema = StructType.fromDDL("c1 INT, c2 INT").json val add1 = AddFile("foo1", Map("c2" -> "0"), 1, 1, true) val add2 = AddFile("foo2", Map("c2" -> "1"), 1, 1, true) - val addFiles = add1 :: add2 :: Nil - assert(add1 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 = 0" :: Nil, addFiles)) - assert(add2 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 = 1" :: Nil, addFiles)) - assert(add2 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 > 0" :: Nil, addFiles)) - assert(add1 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 < 1" :: Nil, addFiles)) - assert(add2 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 >= 1" :: Nil, addFiles)) - assert(add1 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 <= 0" :: Nil, addFiles)) - assert(add2 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 <> 0" :: Nil, addFiles)) - assert(add1 :: Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 <> 1" :: Nil, addFiles)) + val addFiles = Seq(add1, add2).zipWithIndex + assert(Seq(addFiles(0)) == evaluatePredicate(schema, "c2" :: Nil, "c2 = 0" :: Nil, addFiles)) + assert(Seq(addFiles(1)) == evaluatePredicate(schema, "c2" :: Nil, "c2 = 1" :: Nil, addFiles)) + assert(Seq(addFiles(1)) == evaluatePredicate(schema, "c2" :: Nil, "c2 > 0" :: Nil, addFiles)) + assert(Seq(addFiles(0)) == evaluatePredicate(schema, "c2" :: Nil, "c2 < 1" :: Nil, addFiles)) + assert(Seq(addFiles(1)) == evaluatePredicate(schema, "c2" :: Nil, "c2 >= 1" :: Nil, addFiles)) + assert(Seq(addFiles(0)) == evaluatePredicate(schema, "c2" :: Nil, "c2 <= 0" :: Nil, addFiles)) + assert(Seq(addFiles(1)) == evaluatePredicate(schema, "c2" :: Nil, "c2 <> 0" :: Nil, addFiles)) + assert(Seq(addFiles(0)) == evaluatePredicate(schema, "c2" :: Nil, "c2 <> 1" :: Nil, addFiles)) assert(Nil == evaluatePredicate(schema, "c2" :: Nil, "c2 is null" :: Nil, addFiles)) assert(addFiles == evaluatePredicate(schema, "c2" :: Nil, "c2 is not null" :: Nil, addFiles)) assert(addFiles == evaluatePredicate(schema, "c2" :: Nil, "c2 is not null" :: Nil, addFiles)) diff --git a/server/src/universal/conf/delta-sharing-server.yaml.template b/server/src/universal/conf/delta-sharing-server.yaml.template index 8ed0557a2..b4a707b2b 100644 --- a/server/src/universal/conf/delta-sharing-server.yaml.template +++ b/server/src/universal/conf/delta-sharing-server.yaml.template @@ -57,3 +57,7 @@ stalenessAcceptable: false evaluatePredicateHints: false # Whether to evaluate user provided `jsonPredicateHints` evaluateJsonPredicateHints: false +# The maximum page size permitted by queryTable/queryTableChanges API. +queryTablePageSizeLimit: 500 +# The TTL of the page token generated in queryTable/queryTableChanges API (in milliseconds). +queryTablePageTokenTtlMs: 259200000