Skip to content

Commit

Permalink
pagination for queryTable at snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
charlenelyu-db committed Jul 24, 2023
1 parent 228f22e commit b5170d4
Show file tree
Hide file tree
Showing 11 changed files with 523 additions and 65 deletions.
35 changes: 35 additions & 0 deletions server/src/main/protobuf/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ message QueryTableRequest {
repeated string predicateHints = 1;
optional string jsonPredicateHints = 6;
optional int64 limitHint = 2;
// The maximum number of files to return in one page. This is a hint for the server,
// and the server may not honor it. The server that supports pagination should return
// no more than this limit, but it can return fewer. If there are more files available,
// a nextPageToken will be returned to the user to retrieve the next page.
optional int32 maxFiles = 9;
// The page token used to retrieve the subsequent page.
optional string pageToken = 10;

// Only one of the three parameters can be supported in a single query.
// If none of them is specified, the query is for the latest version.
Expand Down Expand Up @@ -101,3 +108,31 @@ message PageToken {
optional string share = 2;
optional string schema = 3;
}

// Define a special class to generate the page token for pagination. It includes the information
// about where we should start to query, and checks whether the paginated request is valid.
message QueryTablePageToken {
// Id of the table being queried.
optional string id = 1;
// Only used in queryTable at snapshot, refers to the version being queried.
optional int64 version = 2;
// Only used in queryTable from starting_version and queryTableChanges.
// The table version where the querying process starts in next page, inclusive.
optional int64 starting_version = 3;
// Only used in queryTable from starting_version and queryTableChanges.
// The table version where the querying process ends in next page, inclusive.
optional int64 ending_version = 4;
// The checksum of other query parameters. This is used for validation purpose, as the user
// is expected to keep all other parameters to the request the same.
optional string checksum = 5;
// The local index of the first action in next page.
// For queryTable at snapshot, (version, starting_action_index) determines the entry point
// of the next page.
// For queryTable from starting_version and queryTableChanges,
// (starting_version, starting_action_index) determines the entry point of the next page.
optional int32 starting_action_index = 6;
// The expiration timestamp of the page token in milliseconds.
optional int64 expiration_timestamp = 7;
// The latest version of the table when the first page request is received.
optional int64 latest_version = 8;
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ class DeltaSharingService(serverConfig: ServerConfig) {
timestamp = None,
startingVersion = None,
endingVersion = None,
maxFiles = None,
pageToken = None,
responseFormat = responseFormat)
streamingOutput(Some(v), responseFormat, actions)
}
Expand Down Expand Up @@ -332,6 +334,9 @@ class DeltaSharingService(serverConfig: ServerConfig) {
if (request.startingVersion.isDefined && request.startingVersion.get < 0) {
throw new DeltaSharingIllegalArgumentException("startingVersion cannot be negative.")
}
if (request.maxFiles.exists(_ <= 0)) {
throw new DeltaSharingIllegalArgumentException("maxFiles must be positive.")
}

val start = System.currentTimeMillis
val tableConfig = sharedTableManager.getTable(share, schema, table)
Expand Down Expand Up @@ -365,6 +370,8 @@ class DeltaSharingService(serverConfig: ServerConfig) {
request.timestamp,
request.startingVersion,
request.endingVersion,
request.maxFiles,
request.pageToken,
responseFormat = responseFormat)
if (version < tableConfig.startVersion) {
throw new DeltaSharingIllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ case class ServerConfig(
// Whether to evaluate user provided `jsonPredicateHints`
@BeanProperty var evaluateJsonPredicateHints: Boolean,
// The timeout of an incoming web request in seconds. Set to 0 for no timeout
@BeanProperty var requestTimeoutSeconds: Long
@BeanProperty var requestTimeoutSeconds: Long,
// The maximum page size permitted by queryTable/queryTableChanges API.
@BeanProperty var queryTablePageSizeLimit: Int,
// The TTL of the page token generated in queryTable/queryTableChanges API (in milliseconds).
@BeanProperty var queryTablePageTokenTtlMs: Int
) extends ConfigItem {
import ServerConfig._

Expand All @@ -76,7 +80,9 @@ case class ServerConfig(
stalenessAcceptable = false,
evaluatePredicateHints = false,
evaluateJsonPredicateHints = false,
requestTimeoutSeconds = 30
requestTimeoutSeconds = 30,
queryTablePageSizeLimit = 500,
queryTablePageTokenTtlMs = 259200000 // 3 days
)
}

Expand Down
14 changes: 13 additions & 1 deletion server/src/main/scala/io/delta/sharing/server/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ case class SingleAction(
cdf: AddCDCFile = null,
remove: RemoveFile = null,
metaData: Metadata = null,
protocol: Protocol = null) {
protocol: Protocol = null,
nextPageToken: NextPageToken = null) {

def unwrap: Action = {
if (file != null) {
Expand All @@ -40,6 +41,8 @@ case class SingleAction(
metaData
} else if (protocol != null) {
protocol
} else if (nextPageToken != null) {
nextPageToken
} else {
null
}
Expand Down Expand Up @@ -130,6 +133,15 @@ case class RemoveFile(
override def wrap: SingleAction = SingleAction(remove = this)
}

/**
* A token to retrieve the subsequent page of a query. The server that supports pagination will
* return a nextPageToken at the end of the response when there are more files available than
* the page size specified by the user.
*/
case class NextPageToken(token: String) extends Action {
override def wrap: SingleAction = SingleAction(nextPageToken = this)
}

object Action {
// The maximum version of the protocol that this version of Delta Standalone understands.
val maxReaderVersion = 1
Expand Down
Loading

0 comments on commit b5170d4

Please sign in to comment.