Skip to content

Commit

Permalink
add more debugging info (#575)
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db authored Sep 24, 2024
1 parent 3e3b864 commit b2ece42
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,10 @@ class DeltaSharingRestClient(
private def checkRespondedFormat(respondedFormat: String, rpc: String, table: String): Unit = {
if (!responseFormatSet.contains(respondedFormat)) {
logError(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table.")
s"responseFormat($responseFormat) for $rpc for table $table, queryId[$queryId].")
throw new IllegalArgumentException("The responseFormat returned from the delta sharing " +
s"server doesn't match the requested responseFormat: respondedFormat($respondedFormat)" +
s" != requestedFormat($responseFormat).")
s" != requestedFormat($responseFormat), queryId[$queryId].")
}
}

Expand Down Expand Up @@ -716,7 +716,7 @@ class DeltaSharingRestClient(
|$expectedProtocol, $expectedMetadata. Actual: version $version,
|$respondedFormat, ${lines(0)}, ${lines(1)}""".stripMargin
logError(s"Error while fetching next page files at url $targetUrl " +
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg)")
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg), queryId[$queryId].")
throw new IllegalStateException(errorMsg)
}

Expand Down Expand Up @@ -1008,7 +1008,8 @@ class DeltaSharingRestClient(
}
} catch {
case e: org.apache.http.ConnectionClosedException =>
val error = s"Request to delta sharing server failed due to ${e}."
val error = s"Request to delta sharing server failed for queryId[$queryId] " +
s"due to ${e}."
logError(error)
lineBuffer += error
lineBuffer.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ case class DeltaSharingSource(
val intervalSeconds = ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS.max(
ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf)
)
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}.")
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
if (intervalSeconds < ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS) {
throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($intervalSeconds) " +
s"must not be less than ${ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS} seconds.")
Expand All @@ -174,13 +175,14 @@ case class DeltaSharingSource(
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
val serverVersion = deltaLog.client.getTableVersion(deltaLog.table)
logInfo(s"Got table version $serverVersion from Delta Sharing Server.")
logInfo(s"Got table version $serverVersion from Delta Sharing Server." +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
if (serverVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$serverVersion.")
} else if (serverVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " +
s"$latestTableVersion.")
s"$latestTableVersion, for table(id:$tableId, name:${deltaLog.table.toString})")
}
latestTableVersion = serverVersion
lastGetVersionTimestamp = currentTimeMillis
Expand Down Expand Up @@ -382,7 +384,7 @@ case class DeltaSharingSource(
)
}
logInfo(s"Refreshed ${numUrlsRefreshed} urls in sortedFetchedFiles(size: " +
s"${sortedFetchedFiles.size}).")
s"${sortedFetchedFiles.size}), for table(id:$tableId, name:${deltaLog.table.toString})")
}
}

Expand Down Expand Up @@ -456,7 +458,7 @@ case class DeltaSharingSource(
val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
" delta sharing server."
s" delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
Expand Down Expand Up @@ -514,7 +516,7 @@ case class DeltaSharingSource(
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
"delta sharing server."
s"delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
)
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
Expand Down Expand Up @@ -1128,7 +1130,7 @@ case class DeltaSharingSource(
} else if (options.startingTimestamp.isDefined) {
val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp)
logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " +
s"from Delta Sharing Server.")
s"from Delta Sharing Server, for table(id:$tableId, name:${deltaLog.table.toString})")
Some(version)
} else {
None
Expand Down

0 comments on commit b2ece42

Please sign in to comment.