Skip to content

Commit

Permalink
pagination for queryTable from startingVersion
Browse files Browse the repository at this point in the history
  • Loading branch information
charlenelyu-db committed Jul 25, 2023
1 parent d0abd45 commit 664b554
Show file tree
Hide file tree
Showing 2 changed files with 314 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,14 @@ class DeltaSharedTable(
if (startingVersion.isDefined) {
// Only read changes up to snapshot.version, and ignore changes that are committed during
// queryDataChangeSinceStartVersion.
queryDataChangeSinceStartVersion(startingVersion.get, endingVersion, responseFormat)
queryDataChangeSinceStartVersion(
startingVersion.get,
endingVersion,
maxFiles,
pageTokenOpt,
queryParamChecksum,
responseFormat
)
} else if (includeFiles) {
val ts = if (isVersionQuery) {
val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion(
Expand Down Expand Up @@ -530,66 +537,125 @@ class DeltaSharedTable(
private def queryDataChangeSinceStartVersion(
startingVersion: Long,
endingVersion: Option[Long],
maxFilesOpt: Option[Int],
pageTokenOpt: Option[QueryTablePageToken],
queryParamChecksum: String,
responseFormat: String
): Seq[Object] = {
var latestVersion = tableVersion
): Seq[Object] = {
// For subsequent page calls, instead of using the current latestVersion, use latestVersion in
// the pageToken (which is equal to the latestVersion when the first page call is received),
// in case the latestVersion changes after the first page call.
val latestVersion = pageTokenOpt.map(_.getLatestVersion).getOrElse(tableVersion)
if (startingVersion > latestVersion) {
throw DeltaCDFErrors.startVersionAfterLatestVersion(startingVersion, latestVersion)
}
if (endingVersion.isDefined && endingVersion.get > latestVersion) {
throw DeltaCDFErrors.endVersionAfterLatestVersion(endingVersion.get, latestVersion)
}
latestVersion = latestVersion.min(endingVersion.getOrElse(latestVersion))
// We override (start, end) in subsequent page calls because:
// - Versions that are processed in previous pages can be skipped.
// - Versions that are committed after the first page call should be ignored, especially
// when the endingVersion is not specified and resolved to latestVersion.
val start = pageTokenOpt.map(_.getStartingVersion).getOrElse(startingVersion)
val end = pageTokenOpt
.map(_.getEndingVersion)
.orElse(endingVersion)
.getOrElse(latestVersion)
.min(latestVersion)
val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion(
deltaLog.store,
deltaLog.logPath,
startingVersion,
latestVersion + 1,
start,
end + 1,
conf
)

// Enforce page size only when `maxFiles` is specified for backwards compatibility.
val pageSizeOpt = maxFilesOpt.map(_.min(queryTablePageSizeLimit))
val tokenGenerator = { (v: Long, idx: Int) =>
val nextPageTokenStr = encodeQueryTablePageToken(
QueryTablePageToken(
id = Some(tableConfig.id),
startingVersion = Some(v),
endingVersion = Some(end),
latestVersion = Some(latestVersion),
checksum = Some(queryParamChecksum),
startingActionIndex = Some(idx),
expirationTimestamp = Some(System.currentTimeMillis() + queryTablePageTokenTtlMs)
)
)
model.NextPageToken(nextPageTokenStr).wrap
}
var numSignedFiles = 0
val actions = ListBuffer[Object]()
deltaLog.getChanges(startingVersion, true).asScala.toSeq
.filter(_.getVersion <= latestVersion).foreach{ versionLog =>
val v = versionLog.getVersion
val versionActions = versionLog.getActions.asScala.map(x => ConversionUtils.convertActionJ(x))
val ts = timestampsByVersion.get(v).orNull
versionActions.foreach {
case a: AddFile if a.dataChange =>
actions.append(
getResponseAddFile(
a,
fileSigner.sign(absolutePath(deltaLog.dataPath, a.path)),
v,
ts.getTime,
responseFormat,
true
)
)
case r: RemoveFile if r.dataChange =>
actions.append(
getResponseRemoveFile(
r,
fileSigner.sign(absolutePath(deltaLog.dataPath, r.path)),
v,
ts.getTime,
responseFormat
deltaLog
.getChanges(start, true)
.asScala
.toSeq
.filter(_.getVersion <= end)
.foreach { versionLog =>
val v = versionLog.getVersion
var indexedVersionActions =
versionLog.getActions.asScala.map(x => ConversionUtils.convertActionJ(x)).zipWithIndex
val ts = timestampsByVersion.get(v).orNull
if (pageTokenOpt.exists(_.getStartingVersion == v)) {
// Skip actions that are already processed in previous pages
indexedVersionActions =
indexedVersionActions.drop(pageTokenOpt.get.getStartingActionIndex)
}
indexedVersionActions.foreach {
case (a: AddFile, idx) if a.dataChange =>
// Return early if we already have enough files in the current page
if (pageSizeOpt.contains(numSignedFiles)) {
actions.append(tokenGenerator(v, idx))
return actions.toSeq
}
actions.append(
getResponseAddFile(
a,
fileSigner.sign(absolutePath(deltaLog.dataPath, a.path)),
v,
ts.getTime,
responseFormat,
true
)
)
)
case p: Protocol =>
assertProtocolRead(p)
case m: Metadata =>
if (v > startingVersion) {
numSignedFiles += 1
case (r: RemoveFile, idx) if r.dataChange =>
// Return early if we already have enough files in the current page
if (pageSizeOpt.contains(numSignedFiles)) {
actions.append(tokenGenerator(v, idx))
return actions.toSeq
}
actions.append(
getResponseMetadata(
m,
Some(v),
getResponseRemoveFile(
r,
fileSigner.sign(absolutePath(deltaLog.dataPath, r.path)),
v,
ts.getTime,
responseFormat
)
)
}
case _ => ()
numSignedFiles += 1
case (p: Protocol, _) =>
assertProtocolRead(p)
case (m: Metadata, _) =>
if (v > startingVersion) {
actions.append(
getResponseMetadata(
m,
Some(v),
responseFormat
)
)
}
case _ => ()
}
}
// Return an empty `nextPageToken` object only when `maxFiles` is specified for
// backwards compatibility.
if (maxFilesOpt.isDefined) {
actions.append(model.NextPageToken(null).wrap)
}
actions.toSeq
}
Expand Down
Loading

0 comments on commit 664b554

Please sign in to comment.