Skip to content

Commit

Permalink
Support includeEndStreamAction for listFiles in client (#578)
Browse files Browse the repository at this point in the history
* tmp

* Client changes to support includeEndStreamAction

* fix and add test

* resolve comments

* fix scalastyle check
  • Loading branch information
linzhou-db authored Sep 29, 2024
1 parent ec4733a commit 84e1bf0
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 69 deletions.
210 changes: 151 additions & 59 deletions client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ object ConfUtils {
val USE_ASYNC_QUERY_CONF = "spark.delta.sharing.network.useAsyncQuery"
val USE_ASYNC_QUERY_DEFAULT = "false"

val INCLUDE_END_STREAM_ACTION_CONF = "spark.delta.sharing.query.includeEndStreamAction"
val INCLUDE_END_STREAM_ACTION_DEFAULT = "true"

val TIMEOUT_CONF = "spark.delta.sharing.network.timeout"
val TIMEOUT_DEFAULT = "320s"

Expand Down Expand Up @@ -176,6 +179,14 @@ object ConfUtils {
conf.getConfString(USE_ASYNC_QUERY_CONF, USE_ASYNC_QUERY_DEFAULT).toBoolean
}

def includeEndStreamAction(conf: Configuration): Boolean = {
conf.getBoolean(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT.toBoolean)
}

def includeEndStreamAction(conf: SQLConf): Boolean = {
conf.getConfString(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT).toBoolean
}

def timeoutInSeconds(conf: Configuration): Int = {
val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT)
toTimeInSeconds(timeoutStr, TIMEOUT_CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
var httpRequestBase = new DeltaSharingRestClient(
testProfileProvider, forStreaming = false, readerFeatures = "willBeIgnored").prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, false)
checkDeltaSharingCapabilities(httpRequestBase, "responseformat=parquet")
checkDeltaSharingCapabilities(httpRequestBase, s"${RESPONSE_FORMAT}=parquet;$DELTA_SHARING_END_STREAM_ACTION=true")

val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
httpRequestBase = new DeltaSharingRestClient(
Expand All @@ -102,14 +102,15 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"responseformat=delta;readerfeatures=$readerFeatures"
httpRequestBase, s"$RESPONSE_FORMAT=delta;$READER_FEATURES=$readerFeatures;$DELTA_SHARING_END_STREAM_ACTION=true"
)

httpRequestBase = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = true,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
readerFeatures = readerFeatures,
includeEndStreamAction = false).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"responseformat=delta,parquet;readerfeatures=$readerFeatures"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ class ConfUtilsSuite extends SparkFunSuite {
}.getMessage.contains(TIMEOUT_CONF)
}

test("includeEndStreamAction") {
assert(includeEndStreamAction(newConf()) == true)
assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false)
assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "random"))) == true)

assert(includeEndStreamAction(newSqlConf()) == true)
assert(
includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false
)
intercept[IllegalArgumentException] {
assert(
includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "random"))) == false
)
}.getMessage.contains(INCLUDE_END_STREAM_ACTION_CONF)
}

test("maxConnections") {
assert(maxConnections(newConf()) == MAX_CONNECTION_DEFAULT)
assert(maxConnections(newConf(Map(MAX_CONNECTION_CONF -> "100"))) == 100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ class DeltaSharingService(serverConfig: ServerConfig) {
}
val responseFormatSet = getResponseFormatSet(capabilitiesMap)
val clientReaderFeaturesSet = getReaderFeatures(capabilitiesMap)
val includeEndStreamAction = getIncludeEndStreamAction(capabilitiesMap)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val queryResult = if (
request.predicateHints.isEmpty
&& request.maxFiles.isEmpty
Expand Down Expand Up @@ -611,7 +611,7 @@ class DeltaSharingService(serverConfig: ServerConfig) {
includeEndStreamAction: Boolean = false): HttpResponse = {
var capabilities = Seq[String](s"${DELTA_SHARING_RESPONSE_FORMAT}=$responseFormat")
if (includeEndStreamAction) {
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_INCLUDE_END_STREAM_ACTION=true"
capabilities = capabilities :+ s"$DELTA_SHARING_END_STREAM_ACTION=true"
}
val dsCapHeader = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER)

Expand Down Expand Up @@ -647,7 +647,7 @@ object DeltaSharingService {
val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities"
val DELTA_SHARING_RESPONSE_FORMAT = "responseformat"
val DELTA_SHARING_CAPABILITIES_ASYNC_QUERY = "asyncquery"
val DELTA_SHARING_CAPABILITIES_INCLUDE_END_STREAM_ACTION = "includeendstreamaction"
val DELTA_SHARING_END_STREAM_ACTION = "endstreamaction"
val DELTA_SHARING_READER_FEATURES = "readerfeatures"
val DELTA_SHARING_CAPABILITIES_DELIMITER = ";"

Expand Down Expand Up @@ -783,9 +783,9 @@ object DeltaSharingService {
headerCapabilities.get(DELTA_SHARING_CAPABILITIES_ASYNC_QUERY).exists(_.toBoolean)
}

private[server] def getIncludeEndStreamAction(
private[server] def getRequestEndStreamAction(
headerCapabilities: Map[String, String]): Boolean = {
headerCapabilities.get(DELTA_SHARING_CAPABILITIES_INCLUDE_END_STREAM_ACTION).exists(_.toBoolean)
headerCapabilities.get(DELTA_SHARING_END_STREAM_ACTION).exists(_.toBoolean)
}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
deltaSharingCapabilities += s";responseformat=$responseFormat"
deltaSharingCapabilities += readerFeatures
if (includeEndStreamAction) {
deltaSharingCapabilities += s";includeendstreamaction=true"
deltaSharingCapabilities += s";endstreamaction=true"
}
connection.setRequestProperty("delta-sharing-capabilities", deltaSharingCapabilities)

Expand Down Expand Up @@ -187,7 +187,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
val responseCapabilities = connection.getHeaderField("delta-sharing-capabilities")
var expectedHeader = s"responseformat=$responseFormat"
if (includeEndStreamAction) {
expectedHeader += s";includeendstreamaction=true"
expectedHeader += s";endstreamaction=true"
}
assert(responseCapabilities == expectedHeader, s"Incorrect header: $responseCapabilities")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class TestDeltaSharingClient(
readerFeatures: String = "",
queryTablePaginationEnabled: Boolean = false,
maxFilesPerReq: Int = 10000,
includeEndStreamAction: Boolean = false,
enableAsyncQuery: Boolean = false,
asyncQueryPollIntervalMillis: Long = 1000L,
asyncQueryMaxDuration: Long = Long.MaxValue,
Expand Down

0 comments on commit 84e1bf0

Please sign in to comment.