Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
charlenelyu-db committed Jul 28, 2023
1 parent d0abd45 commit 6085c3a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ case class ServerConfig(
evaluatePredicateHints = false,
evaluateJsonPredicateHints = false,
requestTimeoutSeconds = 30,
queryTablePageSizeLimit = 500,
queryTablePageSizeLimit = 10000,
queryTablePageTokenTtlMs = 259200000 // 3 days
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem
import org.apache.hadoop.fs.s3a.S3AFileSystem
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StructType}
import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal

import io.delta.sharing.server.{
model,
Expand Down Expand Up @@ -736,26 +737,22 @@ class DeltaSharedTable(
}

private def decodeAndValidatePageToken(
tokenStr: String,
expectedChecksum: String): QueryTablePageToken = {
tokenStr: String,
expectedChecksum: String): QueryTablePageToken = {
val token = decodeQueryTablePageToken(tokenStr)
val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z")
timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
if (token.getExpirationTimestamp < System.currentTimeMillis()) {
throw new DeltaSharingIllegalArgumentException(
s"""The next page token has already expired at
|${timestampFormat.format(token.getExpirationTimestamp)}. Please restart the
|query.""".stripMargin
"The page token has expired. Please restart the query."
)
}
if (token.getId != tableConfig.id) {
throw new DeltaSharingIllegalArgumentException(
"The table specified in the next page token does not match the table being queried."
"The table specified in the page token does not match the table being queried."
)
}
if (token.getChecksum != expectedChecksum) {
throw new DeltaSharingIllegalArgumentException(
"""Query parameter mismatch detected for the next page token query. The query parameter
"""Query parameter mismatch detected for the next page query. The query parameter
|cannot change when querying the next page results.""".stripMargin
)
}
Expand All @@ -767,7 +764,14 @@ class DeltaSharedTable(
}

private def decodeQueryTablePageToken(tokenStr: String): QueryTablePageToken = {
QueryTablePageToken.parseFrom(Base64.getUrlDecoder.decode(tokenStr))
try {
QueryTablePageToken.parseFrom(Base64.getUrlDecoder.decode(tokenStr))
} catch {
case NonFatal(_) =>
throw new DeltaSharingIllegalArgumentException(
s"Error decoding the page token: $tokenStr."
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,70 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
assert(expectedNextPageToken == actions(3).nextPageToken)
}

integrationTest("paginated query - exceptions") {
// invalid page token
assertHttpError(
url = requestPath("/shares/share1/schemas/default/tables/table1/query"),
method = "POST",
data = Some("""{"pageToken": "randomPageToken"}"""),
expectedErrorCode = 400,
expectedErrorMessage = "Error decoding the page token"
)

// invalid query parameters
var response = readNDJson(
requestPath("/shares/share1/schemas/default/tables/table1/query"),
Some("POST"),
Some("""{"maxFiles": 1}"""),
Some(2)
)
var lines = response.split("\n")
assert(lines.length == 4)
var nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken
assert(nextPageToken != null && nextPageToken.token != null)

assertHttpError(
url = requestPath("/shares/share2/schemas/default/tables/table2/query"),
method = "POST",
data = Some(s"""{"pageToken": "${nextPageToken.token}"}"""),
expectedErrorCode = 400,
expectedErrorMessage = "The table specified in the page token does not match the table being queried"
)
assertHttpError(
url = requestPath("/shares/share1/schemas/default/tables/table1/query"),
method = "POST",
data = Some(s"""{"limitHint": 123, "pageToken": "${nextPageToken.token}"}"""),
expectedErrorCode = 400,
expectedErrorMessage = "Query parameter mismatch detected for the next page query"
)

// page token expired
val updatedServerConfig = serverConfig.copy(queryTablePageTokenTtlMs = 0)
server.stop().get()
server = DeltaSharingService.start(updatedServerConfig)
response = readNDJson(
requestPath("/shares/share1/schemas/default/tables/table1/query"),
Some("POST"),
Some("""{"maxFiles": 1}"""),
Some(2)
)
lines = response.split("\n")
assert(lines.length == 4)
nextPageToken = JsonUtils.fromJson[SingleAction](lines(3)).nextPageToken
assert(nextPageToken != null && nextPageToken.token != null)

assertHttpError(
url = requestPath("/shares/share1/schemas/default/tables/table1/query"),
method = "POST",
data = Some(s"""{"pageToken": "${nextPageToken.token}"}"""),
expectedErrorCode = 400,
expectedErrorMessage = "The page token has expired"
)

server.stop().get()
server = DeltaSharingService.start(serverConfig)
}

integrationTest("table3 - different data file schemas - /shares/{share}/schemas/{schema}/tables/{table}/metadata") {
val response = readNDJson(requestPath("/shares/share1/schemas/default/tables/table3/metadata"), expectedTableVersion = Some(4))
val Array(protocol, metadata) = response.split("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ evaluatePredicateHints: false
# Whether to evaluate user provided `jsonPredicateHints`
evaluateJsonPredicateHints: false
# The maximum page size permitted by queryTable/queryTableChanges API.
queryTablePageSizeLimit: 500
queryTablePageSizeLimit: 10000
# The TTL of the page token generated in queryTable/queryTableChanges API (in milliseconds).
queryTablePageTokenTtlMs: 259200000

0 comments on commit 6085c3a

Please sign in to comment.