Skip to content

Commit

Permalink
Log additional queryId and table id and table name (#576)
Browse files Browse the repository at this point in the history
* add more debugging info

* add more debugging info
  • Loading branch information
linzhou-db authored Sep 25, 2024
1 parent b2ece42 commit 1bee77b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ import io.delta.sharing.client.util.{ConfUtils, JsonUtils, RetryUtils, Unexpecte

/** An interface to fetch Delta metadata from remote server. */
trait DeltaSharingClient {

protected var dsQueryId: Option[String] = None

def getQueryId: String = {
dsQueryId.getOrElse("dsQueryIdNotSet")
}
def listAllTables(): Seq[Table]

def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long
Expand Down Expand Up @@ -175,8 +181,6 @@ class DeltaSharingRestClient(
// Convert the responseFormat to a Seq to be used later.
private val responseFormatSet = responseFormat.split(",").toSet

private var queryId: Option[String] = None

private lazy val client = {
val clientBuilder: HttpClientBuilder = if (sslTrustAll) {
val sslBuilder = new SSLContextBuilder()
Expand Down Expand Up @@ -281,10 +285,10 @@ class DeltaSharingRestClient(
private def checkRespondedFormat(respondedFormat: String, rpc: String, table: String): Unit = {
if (!responseFormatSet.contains(respondedFormat)) {
logError(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table, queryId[$queryId].")
s"responseFormat($responseFormat) for $rpc for table $table, dsQueryId[$dsQueryId].")
throw new IllegalArgumentException("The responseFormat returned from the delta sharing " +
s"server doesn't match the requested responseFormat: respondedFormat($respondedFormat)" +
s" != requestedFormat($responseFormat), queryId[$queryId].")
s" != requestedFormat($responseFormat), dsQueryId[$dsQueryId].")
}
}

Expand Down Expand Up @@ -716,7 +720,7 @@ class DeltaSharingRestClient(
|$expectedProtocol, $expectedMetadata. Actual: version $version,
|$respondedFormat, ${lines(0)}, ${lines(1)}""".stripMargin
logError(s"Error while fetching next page files at url $targetUrl " +
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg), queryId[$queryId].")
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg), dsQueryId[$dsQueryId].")
throw new IllegalStateException(errorMsg)
}

Expand Down Expand Up @@ -974,8 +978,8 @@ class DeltaSharingRestClient(
allowNoContent: Boolean = false,
fetchAsOneString: Boolean = false
): (Option[Long], Option[String], Seq[String]) = {
// Reset queryId before calling RetryUtils, and before prepareHeaders.
queryId = Some(UUID.randomUUID().toString().split('-').head)
// Reset dsQueryId before calling RetryUtils, and before prepareHeaders.
dsQueryId = Some(UUID.randomUUID().toString().split('-').head)
RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) {
val profile = profileProvider.getProfile
val response = client.execute(
Expand Down Expand Up @@ -1008,7 +1012,7 @@ class DeltaSharingRestClient(
}
} catch {
case e: org.apache.http.ConnectionClosedException =>
val error = s"Request to delta sharing server failed for queryId[$queryId] " +
val error = s"Request to delta sharing server failed for dsQueryId[$dsQueryId] " +
s"due to ${e}."
logError(error)
lineBuffer += error
Expand Down Expand Up @@ -1059,7 +1063,7 @@ class DeltaSharingRestClient(
}

private def getQueryIdString: String = {
s"QueryId-${queryId.getOrElse("not_set")}"
s"QueryId-${dsQueryId.getOrElse("not_set")}"
}

// The value for delta-sharing-capabilities header, semicolon separated capabilities.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ case class DeltaSharingSource(
val intervalSeconds = ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS.max(
ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf)
)
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}," +
getTableInfoForLogging)
if (intervalSeconds < ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS) {
throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($intervalSeconds) " +
s"must not be less than ${ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS} seconds.")
Expand All @@ -167,6 +167,13 @@ case class DeltaSharingSource(
TableRefreshResult(Map.empty[String, String], None, None)
}

private lazy val getTableInfoForLogging: String =
s"for table(id:$tableId, name:${deltaLog.table.toString})"

private def getQueryIdForLogging: String = {
s", with queryId(${deltaLog.client.getQueryId})"
}

// Check the latest table version from the delta sharing server through the client.getTableVersion
// RPC. Adding a minimum interval of QUERY_TABLE_VERSION_INTERVAL_MILLIS between two consecutive
// rpcs to avoid traffic jam on the delta sharing server.
Expand All @@ -175,14 +182,14 @@ case class DeltaSharingSource(
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
val serverVersion = deltaLog.client.getTableVersion(deltaLog.table)
logInfo(s"Got table version $serverVersion from Delta Sharing Server." +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
logInfo(s"Got table version $serverVersion from Delta Sharing Server, " +
getTableInfoForLogging)
if (serverVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$serverVersion.")
} else if (serverVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " +
s"$latestTableVersion, for table(id:$tableId, name:${deltaLog.table.toString})")
s"$latestTableVersion, " + getTableInfoForLogging)
}
latestTableVersion = serverVersion
lastGetVersionTimestamp = currentTimeMillis
Expand Down Expand Up @@ -248,7 +255,7 @@ case class DeltaSharingSource(
s"$fromVersion, $fromIndex, $isStartingVersion) is not included in sortedFetchedFiles[" +
s"(${headFile.version}, ${headFile.index}, ${headFile.isSnapshot}) to " +
s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})], " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
sortedFetchedFiles = Seq.empty
} else {
return
Expand All @@ -267,7 +274,7 @@ case class DeltaSharingSource(
logInfo(s"Reducing ending version for delta sharing rpc from currentLatestVersion(" +
s"$currentLatestVersion) to endingVersionForQuery($endingVersionForQuery), fromVersion:" +
s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
getTableInfoForLogging
)
}

Expand Down Expand Up @@ -334,7 +341,7 @@ case class DeltaSharingSource(
): Unit = {
synchronized {
logInfo(s"Refreshing sortedFetchedFiles(size: ${sortedFetchedFiles.size}) with newIdToUrl(" +
s"size: ${newIdToUrl.size}), for table(id:$tableId, name:${deltaLog.table.toString}).")
s"size: ${newIdToUrl.size}), " + getTableInfoForLogging + getQueryIdForLogging)
lastQueryTableTimestamp = queryTimestamp
minUrlExpirationTimestamp = newMinUrlExpiration
if (!CachedTableManager.INSTANCE.isValidUrlExpirationTime(minUrlExpirationTimestamp)) {
Expand Down Expand Up @@ -384,7 +391,7 @@ case class DeltaSharingSource(
)
}
logInfo(s"Refreshed ${numUrlsRefreshed} urls in sortedFetchedFiles(size: " +
s"${sortedFetchedFiles.size}), for table(id:$tableId, name:${deltaLog.table.toString})")
s"${sortedFetchedFiles.size}), " + getTableInfoForLogging)
}
}

Expand All @@ -411,7 +418,7 @@ case class DeltaSharingSource(
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
getTableInfoForLogging
)
resetGlobalTimestamp()
if (isStartingVersion) {
Expand Down Expand Up @@ -458,7 +465,7 @@ case class DeltaSharingSource(
val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
s" delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
s" delta sharing server, " + getTableInfoForLogging + getQueryIdForLogging
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
Expand Down Expand Up @@ -512,11 +519,13 @@ case class DeltaSharingSource(

TableRefreshResult(idToUrl, minUrlExpiration, None)
}
val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version)
val filteredAddFiles = validateCommitAndFilterAddFiles(tableFiles)
val allAddFiles = filteredAddFiles.groupBy(a => a.version)
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"Fetched ${tableFiles.addFiles.size} files, filtered ${filteredAddFiles.size} " +
s"in ${allAddFiles.size} versions from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
s"delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
s"delta sharing server, " + getTableInfoForLogging + getQueryIdForLogging
)
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
Expand Down Expand Up @@ -556,8 +565,7 @@ case class DeltaSharingSource(
fromIndex: Long,
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching CDF files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString}).")
s"endingVersionForQuery($endingVersionForQuery), " + getTableInfoForLogging)
resetGlobalTimestamp()
val tableFiles = deltaLog.client.getCDFFiles(
deltaLog.table,
Expand Down Expand Up @@ -1003,7 +1011,7 @@ case class DeltaSharingSource(

override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = {
logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
val endOffset = DeltaSharingSourceOffset(tableId, end)

val (startVersion, startIndex, isStartingVersion, startSourceVersion) = if (
Expand All @@ -1030,7 +1038,7 @@ case class DeltaSharingSource(
val startOffset = DeltaSharingSourceOffset(tableId, startOffsetOption.get)
if (startOffset == endOffset) {
logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
previousOffset = endOffset
// This happens only if we recover from a failure and `MicroBatchExecution` tries to call
// us with the previous offsets. The returned DataFrame will be dropped immediately, so we
Expand Down Expand Up @@ -1130,7 +1138,7 @@ case class DeltaSharingSource(
} else if (options.startingTimestamp.isDefined) {
val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp)
logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " +
s"from Delta Sharing Server, for table(id:$tableId, name:${deltaLog.table.toString})")
s"from Delta Sharing Server, " + getTableInfoForLogging)
Some(version)
} else {
None
Expand Down

0 comments on commit 1bee77b

Please sign in to comment.