Skip to content

Commit

Permalink
[main] Double check sortedFetchedFiles and set previousOffset (#344)
Browse files Browse the repository at this point in the history
* [main] doublecheck

* fix lint
  • Loading branch information
linzhou-db authored Jul 13, 2023
1 parent 3a2c66a commit 94c406b
Showing 1 changed file with 62 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private[sharing] case class IndexedFile(
add: AddFileForCDF,
remove: RemoveFile = null,
cdc: AddCDCFile = null,
isSnapshot: Boolean = false,
isLast: Boolean = false) {

assert(Seq(add, remove, cdc).filter(_ != null).size <= 1, "There could be at most one non-null " +
Expand Down Expand Up @@ -221,7 +222,24 @@ case class DeltaSharingSource(
fromIndex: Long,
isStartingVersion: Boolean): Unit = {
if (!sortedFetchedFiles.isEmpty) {
return
// Clean up local sortedFileIndex, re-fetch files, to ensure the correct set of files are
// returned for latestOffset and getBatch.
// We need to apply this check because the spark streaming engine assumes the DataSource is
// stateless, and sortedFetchedFiles makes DeltaSharingSource stateful.
val headFile = sortedFetchedFiles.head
if (headFile.version > fromVersion || (
headFile.version == fromVersion && headFile.index > fromIndex && fromIndex != -1) ||
(isStartingVersion != headFile.isSnapshot)) {
val lastFile = sortedFetchedFiles.last
logWarning(s"The asked file(" +
s"$fromVersion, $fromIndex, $isStartingVersion) is not included in sortedFetchedFiles[" +
s"(${headFile.version}, ${headFile.index}, ${headFile.isSnapshot}) to " +
s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})], " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
sortedFetchedFiles = Seq.empty
} else {
return
}
}

val currentLatestVersion = getOrUpdateLatestTableVersion
Expand All @@ -233,10 +251,11 @@ case class DeltaSharingSource(
// using "fromVersion + maxVersionsPerRpc - 1" because the endingVersion is inclusive.
val endingVersionForQuery = currentLatestVersion.min(fromVersion + maxVersionsPerRpc - 1)
if (endingVersionForQuery < currentLatestVersion) {
logInfo(s"Reducing ending version for delta sharing rpc of table " +
s"${deltaLog.table.toString} from currentLatestVersion" +
s"($currentLatestVersion) to endingVersionForQuery($endingVersionForQuery), fromVersion:" +
s"$fromVersion, maxVersionsPerRpc: $maxVersionsPerRpc.")
logInfo(s"Reducing ending version for delta sharing rpc from currentLatestVersion(" +
s"$currentLatestVersion) to endingVersionForQuery($endingVersionForQuery), fromVersion:" +
s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
)
}

if (isStartingVersion || !options.readChangeFeed) {
Expand Down Expand Up @@ -276,12 +295,17 @@ case class DeltaSharingSource(
// a single synchronized wrap, to avoid using old urls with refreshed timestamps when a refresh
// happens after this function and before register().
private def popSortedFetchedFiles(
startVersion: Long,
startIndex: Long,
endOffset: DeltaSharingSourceOffset): (Seq[IndexedFile], Long, Option[Long]) = {
synchronized {
val fileActions = sortedFetchedFiles.takeWhile {
case IndexedFile(version, index, _, _, _, _) =>
version < endOffset.tableVersion ||
(version == endOffset.tableVersion && index <= endOffset.index)
case IndexedFile(version, index, _, _, _, _, _) =>
// Ensure (version, index) is in the range of
// [(startVersion, startIndex), (endVersion, endOffset)]
(version > startVersion || (version == startVersion && (index == -1 ||
index >= startIndex))) && (version < endOffset.tableVersion ||
(version == endOffset.tableVersion && index <= endOffset.index))
}
sortedFetchedFiles = sortedFetchedFiles.drop(fileActions.size)
(fileActions, lastQueryTableTimestamp, minUrlExpirationTimestamp)
Expand All @@ -297,7 +321,7 @@ case class DeltaSharingSource(
): Unit = {
synchronized {
logInfo(s"Refreshing sortedFetchedFiles(size: ${sortedFetchedFiles.size}) with newIdToUrl(" +
s"size: ${newIdToUrl.size}).")
s"size: ${newIdToUrl.size}), for table(id:$tableId, name:${deltaLog.table.toString}).")
lastQueryTableTimestamp = queryTimestamp
minUrlExpirationTimestamp = newMinUrlExpiration
if (!CachedTableManager.INSTANCE.isValidUrlExpirationTime(minUrlExpirationTimestamp)) {
Expand Down Expand Up @@ -342,6 +366,7 @@ case class DeltaSharingSource(
)
indexedFile.cdc.copy(url = newUrl)
},
isSnapshot = indexedFile.isSnapshot,
isLast = indexedFile.isLast
)
}
Expand Down Expand Up @@ -371,6 +396,10 @@ case class DeltaSharingSource(
fromIndex: Long,
isStartingVersion: Boolean,
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
)
resetGlobalTimestamp()
if (isStartingVersion) {
// If isStartingVersion is true, it means to fetch the snapshot at the fromVersion, which may
Expand Down Expand Up @@ -418,6 +447,7 @@ case class DeltaSharingSource(
file.stats,
file.expirationTimestamp
),
isSnapshot = true,
isLast = (index + 1 == numFiles)
),
file.expirationTimestamp
Expand Down Expand Up @@ -457,11 +487,13 @@ case class DeltaSharingSource(
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
val numFiles = vAddFiles.size
appendToSortedFetchedFiles(IndexedFile(v, -1, add = null, isLast = (numFiles == 0)))
appendToSortedFetchedFiles(
IndexedFile(v, -1, add = null, isSnapshot = false, isLast = (numFiles == 0))
)
vAddFiles.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (add, index) if (v > fromVersion || (v == fromVersion && index > fromIndex)) =>
appendToSortedFetchedFiles(
IndexedFile(add.version, index, add, isLast = (index + 1 == numFiles)),
appendToSortedFetchedFiles(IndexedFile(
add.version, index, add, isSnapshot = false, isLast = (index + 1 == numFiles)),
add.expirationTimestamp
)
// For files with v <= fromVersion, skip them, otherwise an exception will be thrown.
Expand Down Expand Up @@ -489,6 +521,9 @@ case class DeltaSharingSource(
fromVersion: Long,
fromIndex: Long,
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching CDF files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString}).")
resetGlobalTimestamp()
val tableFiles = deltaLog.client.getCDFFiles(
deltaLog.table,
Expand Down Expand Up @@ -552,6 +587,7 @@ case class DeltaSharingSource(
index,
add = null,
cdc = cdc,
isSnapshot = false,
isLast = (index + 1 == cdfFiles.size))
)
// For files with v <= fromVersion, skip them, otherwise an exception will be thrown.
Expand All @@ -569,6 +605,7 @@ case class DeltaSharingSource(
v,
index,
add,
isSnapshot = false,
isLast = (index + 1 == numFiles))
)
case (remove: RemoveFile, index) if (
Expand All @@ -578,6 +615,7 @@ case class DeltaSharingSource(
index,
add = null,
remove = remove,
isSnapshot = false,
isLast = (index + 1 == numFiles))
)
// For files with v <= fromVersion, skip them, otherwise an exception will be thrown.
Expand All @@ -590,7 +628,9 @@ case class DeltaSharingSource(
// This may happen when there's a protocol change of the table, or optimize of a table where
// there are no data files with dataChange=true, so the server won't return any files for
// the version.
appendToSortedFetchedFiles(IndexedFile(v, -1, add = null, isLast = true))
appendToSortedFetchedFiles(
IndexedFile(v, -1, add = null, isSnapshot = false, isLast = true)
)
}
}
}
Expand Down Expand Up @@ -671,7 +711,9 @@ case class DeltaSharingSource(
endOffset: DeltaSharingSourceOffset): DataFrame = {
maybeGetFileChanges(startVersion, startIndex, isStartingVersion)

val (fileActions, lastQueryTimestamp, urlExpirationTimestamp) = popSortedFetchedFiles(endOffset)
val (fileActions, lastQueryTimestamp, urlExpirationTimestamp) = popSortedFetchedFiles(
startVersion, startIndex, endOffset
)
// Proceed the offset as the files before the endOffset are processed.
previousOffset = endOffset

Expand Down Expand Up @@ -828,7 +870,7 @@ case class DeltaSharingSource(
lastIndexedFile: IndexedFile,
previousOffsetVersion: Long,
ispreviousOffsetStartingVersion: Boolean): Option[DeltaSharingSourceOffset] = {
val IndexedFile(v, i, _, _, _, isLastFileInVersion) = lastIndexedFile
val IndexedFile(v, i, _, _, _, _, isLastFileInVersion) = lastIndexedFile
assert(v >= previousOffsetVersion,
s"buildOffsetFromIndexedFile receives an invalid previousOffsetVersion: $v " +
s"(expected: >= $previousOffsetVersion), tableId: $tableId")
Expand Down Expand Up @@ -928,6 +970,8 @@ case class DeltaSharingSource(
}

override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = {
logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
val endOffset = DeltaSharingSourceOffset(tableId, end)

val (startVersion, startIndex, isStartingVersion, startSourceVersion) = if (
Expand All @@ -953,6 +997,9 @@ case class DeltaSharingSource(
} else {
val startOffset = DeltaSharingSourceOffset(tableId, startOffsetOption.get)
if (startOffset == endOffset) {
logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
previousOffset = endOffset
// This happens only if we recover from a failure and `MicroBatchExecution` tries to call
// us with the previous offsets. The returned DataFrame will be dropped immediately, so we
// can return any DataFrame.
Expand Down

0 comments on commit 94c406b

Please sign in to comment.