Skip to content

Commit

Permalink
pagination for queryTableChanges (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
charlenelyu-db authored Jul 31, 2023
1 parent 2b39066 commit 7a5523c
Show file tree
Hide file tree
Showing 4 changed files with 517 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class DeltaSharingServiceExceptionHandler extends ExceptionHandlerFunction {
"errorCode" -> ErrorCode.MALFORMED_REQUEST,
"message" -> cause.getMessage)))
case _: NumberFormatException =>
// `maxResults` is not an int.
// `maxResults`/`maxFiles` is not an int.
HttpResponse.of(
HttpStatus.BAD_REQUEST,
MediaType.JSON_UTF_8,
Expand Down Expand Up @@ -383,6 +383,7 @@ class DeltaSharingService(serverConfig: ServerConfig) {
streamingOutput(Some(version), responseFormat, actions)
}

// scalastyle:off argcount
@Get("/shares/{share}/schemas/{schema}/tables/{table}/changes")
@ConsumesJson
def listCdfFiles(
Expand All @@ -394,8 +395,14 @@ class DeltaSharingService(serverConfig: ServerConfig) {
@Param("endingVersion") @Nullable endingVersion: String,
@Param("startingTimestamp") @Nullable startingTimestamp: String,
@Param("endingTimestamp") @Nullable endingTimestamp: String,
@Param("includeHistoricalMetadata") @Nullable includeHistoricalMetadata: String
@Param("includeHistoricalMetadata") @Nullable includeHistoricalMetadata: String,
@Param("maxFiles") @Nullable maxFiles: java.lang.Integer,
@Param("pageToken") @Nullable pageToken: String
): HttpResponse = processRequest {
// scalastyle:on argcount
if (maxFiles != null && maxFiles <= 0) {
throw new DeltaSharingIllegalArgumentException("maxFiles must be positive.")
}
val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
Expand All @@ -415,6 +422,8 @@ class DeltaSharingService(serverConfig: ServerConfig) {
Option(endingTimestamp)
),
includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false),
Option(maxFiles).map(_.toInt),
Option(pageToken),
responseFormat = responseFormat
)
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table cdf " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,22 +663,43 @@ class DeltaSharedTable(
def queryCDF(
cdfOptions: Map[String, String],
includeHistoricalMetadata: Boolean = false,
maxFiles: Option[Int],
pageToken: Option[String],
responseFormat: String = DeltaSharedTable.RESPONSE_FORMAT_PARQUET
): (Long, Seq[Object]) = withClassLoader {
val actions = ListBuffer[Object]()
// Step 1: validate pageToken if it's specified
lazy val queryParamChecksum = computeChecksum(
QueryParamChecksum(
version = None,
timestamp = None,
startingVersion = cdfOptions.get(DeltaDataSource.CDF_START_VERSION_KEY).map(_.toLong),
startingTimestamp = cdfOptions.get(DeltaDataSource.CDF_START_TIMESTAMP_KEY),
endingVersion = cdfOptions.get(DeltaDataSource.CDF_END_VERSION_KEY).map(_.toLong),
endingTimestamp = cdfOptions.get(DeltaDataSource.CDF_END_TIMESTAMP_KEY),
predicateHints = Nil,
jsonPredicateHints = None,
limitHint = None,
includeHistoricalMetadata = Some(includeHistoricalMetadata)
)
)
val pageTokenOpt = pageToken.map(decodeAndValidatePageToken(_, queryParamChecksum))

// First: validate cdf options are greater than startVersion
// Step 2: validate cdfOptions
val cdcReader = new DeltaSharingCDCReader(deltaLog, conf)
val latestVersion = tableVersion
// For subsequent page calls, instead of using the current latestVersion, use latestVersion in
// the pageToken (which is equal to the latestVersion when the first page call is received),
// in case the latestVersion changes after the first page call.
val latestVersion = pageTokenOpt.map(_.getLatestVersion).getOrElse(tableVersion)
val (start, end) = cdcReader.validateCdfOptions(
cdfOptions, latestVersion, tableConfig.startVersion)

// Second: get Protocol and Metadata
// Step 3: get Protocol and Metadata
val snapshot = if (includeHistoricalMetadata) {
deltaLog.getSnapshotForVersionAsOf(start)
} else {
deltaLog.snapshot
deltaLog.getSnapshotForVersionAsOf(latestVersion)
}
val actions = ListBuffer[Object]()
actions.append(getResponseProtocol(snapshot.protocolScala, responseFormat))
actions.append(
getResponseMetadata(
Expand All @@ -688,71 +709,108 @@ class DeltaSharedTable(
)
)

// Third: get files
val (changeFiles, addFiles, removeFiles, metadatas) = cdcReader.queryCDF(
start, end, latestVersion, includeHistoricalMetadata)
// If includeHistoricalMetadata is not true, metadatas will be empty.
metadatas.foreach { cdcDataSpec =>
cdcDataSpec.actions.foreach { action =>
val metadata = action.asInstanceOf[Metadata]
actions.append(
getResponseMetadata(
metadata,
Some(cdcDataSpec.version),
responseFormat
)
// Step 4: get files
// Enforce page size only when `maxFiles` is specified for backwards compatibility.
val pageSizeOpt = maxFiles.map(_.min(queryTablePageSizeLimit))
val tokenGenerator = { (v: Long, idx: Int) =>
val nextPageTokenStr = encodeQueryTablePageToken(
QueryTablePageToken(
id = Some(tableConfig.id),
startingVersion = Some(v),
endingVersion = Some(pageTokenOpt.map(_.getEndingVersion).getOrElse(end)),
latestVersion = Some(latestVersion),
checksum = Some(queryParamChecksum),
startingActionIndex = Some(idx),
expirationTimestamp = Some(System.currentTimeMillis() + queryTablePageTokenTtlMs)
)
}
)
model.NextPageToken(nextPageTokenStr).wrap
}
changeFiles.foreach { cdcDataSpec =>
cdcDataSpec.actions.foreach { action =>
val addCDCFile = action.asInstanceOf[AddCDCFile]
val cloudPath = absolutePath(deltaLog.dataPath, addCDCFile.path)
val signedUrl = fileSigner.sign(cloudPath)
actions.append(
getResponseAddCDCFile(
addCDCFile,
signedUrl,
cdcDataSpec.version,
cdcDataSpec.timestamp.getTime,
responseFormat
)
)
var numSignedFiles = 0
// We use (start, end) from the page token instead of the original request because:
// - Versions that are processed in previous pages can be skipped.
// - Versions that are committed after the first page call should be ignored, especially
// when the endingVersion is not specified and resolved to latestVersion.
val changes = cdcReader.queryCDF(
pageTokenOpt.map(_.getStartingVersion).getOrElse(start),
pageTokenOpt.map(_.getEndingVersion).getOrElse(end).min(latestVersion),
latestVersion,
includeHistoricalMetadata
)
changes.foreach { cdcDataSpec =>
val v = cdcDataSpec.version
val ts = cdcDataSpec.timestamp
var indexedActions = cdcDataSpec.actions.zipWithIndex
if (pageTokenOpt.exists(_.getStartingVersion == v)) {
// Skip actions that are already processed in previous pages
indexedActions = indexedActions.drop(pageTokenOpt.get.getStartingActionIndex)
}
}
addFiles.foreach { cdcDataSpec =>
cdcDataSpec.actions.foreach { action =>
val addFile = action.asInstanceOf[AddFile]
val cloudPath = absolutePath(deltaLog.dataPath, addFile.path)
val signedUrl = fileSigner.sign(cloudPath)
actions.append(
getResponseAddFile(
addFile,
signedUrl,
cdcDataSpec.version,
cdcDataSpec.timestamp.getTime,
responseFormat,
returnAddFileForCDF = true
indexedActions.foreach {
case (m: Metadata, _) =>
actions.append(
getResponseMetadata(
m,
Some(v),
responseFormat
)
)
)
}
}
removeFiles.foreach { cdcDataSpec =>
cdcDataSpec.actions.foreach { action =>
val removeFile = action.asInstanceOf[RemoveFile]
val cloudPath = absolutePath(deltaLog.dataPath, removeFile.path)
val signedUrl = fileSigner.sign(cloudPath)
actions.append(
getResponseRemoveFile(
removeFile,
signedUrl,
cdcDataSpec.version,
cdcDataSpec.timestamp.getTime,
responseFormat
case (c: AddCDCFile, idx) =>
// Return early if we already have enough files in the current page
if (pageSizeOpt.contains(numSignedFiles)) {
actions.append(tokenGenerator(v, idx))
return start -> actions.toSeq
}
actions.append(
getResponseAddCDCFile(
c,
fileSigner.sign(absolutePath(deltaLog.dataPath, c.path)),
v,
ts.getTime,
responseFormat
)
)
)
numSignedFiles += 1
case (a: AddFile, idx) =>
// Return early if we already have enough files in the current page
if (pageSizeOpt.contains(numSignedFiles)) {
actions.append(tokenGenerator(v, idx))
return start -> actions.toSeq
}
actions.append(
getResponseAddFile(
a,
fileSigner.sign(absolutePath(deltaLog.dataPath, a.path)),
v,
ts.getTime,
responseFormat,
returnAddFileForCDF = true
)
)
numSignedFiles += 1
case (r: RemoveFile, idx) =>
// Return early if we already have enough files in the current page
if (pageSizeOpt.contains(numSignedFiles)) {
actions.append(tokenGenerator(v, idx))
return start -> actions.toSeq
}
actions.append(
getResponseRemoveFile(
r,
fileSigner.sign(absolutePath(deltaLog.dataPath, r.path)),
v,
ts.getTime,
responseFormat
)
)
numSignedFiles += 1
case _ => ()
}
}
// Return an empty `nextPageToken` object only when `maxFiles` is specified for
// backwards compatibility.
if (maxFiles.isDefined) {
actions.append(model.NextPageToken(null).wrap)
}
start -> actions.toSeq
}

Expand Down
Loading

0 comments on commit 7a5523c

Please sign in to comment.