Skip to content

Commit

Permalink
Delta Sharing Server to support return response in delta log format (#…
Browse files Browse the repository at this point in the history
…335)

* delta sharing server changes

* Delta Sharing Server to Support queryDeltaLog

* fix import

* fix tests

* actions work

* handle delta-sharing-capabilities header

* update header name

* update comment

* resolve comments

* fix
  • Loading branch information
linzhou-db authored Jul 13, 2023
1 parent 94c406b commit ef18d8d
Show file tree
Hide file tree
Showing 4 changed files with 893 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.linecorp.armeria.server.auth.AuthService
import io.delta.standalone.internal.DeltaCDFErrors
import io.delta.standalone.internal.DeltaCDFIllegalArgumentException
import io.delta.standalone.internal.DeltaDataSource
import io.delta.standalone.internal.DeltaSharedTable
import io.delta.standalone.internal.DeltaSharedTableLoader
import net.sourceforge.argparse4j.ArgumentParsers
import org.apache.commons.io.FileUtils
Expand Down Expand Up @@ -268,13 +269,32 @@ class DeltaSharingService(serverConfig: ServerConfig) {
HttpResponse.of(headers)
}

private def getDeltaSharingCapabilitiesMap(headerString: String): Map[String, String] = {
if (headerString == null) {
return Map.empty[String, String]
}
headerString.toLowerCase().split(",").map { capability =>
val splits = capability.split("=")
if (splits.size == 2) {
(splits(0), splits(1))
} else {
("", "")
}
}.toMap
}

@Get("/shares/{share}/schemas/{schema}/tables/{table}/metadata")
def getMetadata(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String): HttpResponse = processRequest {
import scala.collection.JavaConverters._
val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val tableConfig = sharedTableManager.getTable(share, schema, table)
val responseFormat = getResponseFormat(capabilitiesMap)
val (v, actions) = deltaSharedTableLoader.loadTable(tableConfig).query(
includeFiles = false,
predicateHints = Nil,
Expand All @@ -283,18 +303,22 @@ class DeltaSharingService(serverConfig: ServerConfig) {
version = None,
timestamp = None,
startingVersion = None,
endingVersion = None
)
streamingOutput(Some(v), actions)
endingVersion = None,
responseFormat = responseFormat)
streamingOutput(Some(v), responseFormat, actions)
}

@Post("/shares/{share}/schemas/{schema}/tables/{table}/query")
@ConsumesJson
def listFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
request: QueryTableRequest): HttpResponse = processRequest {
val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val numVersionParams = Seq(request.version, request.timestamp, request.startingVersion)
.filter(_.isDefined).size
if (numVersionParams > 1) {
Expand Down Expand Up @@ -331,6 +355,7 @@ class DeltaSharingService(serverConfig: ServerConfig) {
)
}
}
val responseFormat = getResponseFormat(capabilitiesMap)
val (version, actions) = deltaSharedTableLoader.loadTable(tableConfig).query(
includeFiles = true,
request.predicateHints,
Expand All @@ -339,21 +364,22 @@ class DeltaSharingService(serverConfig: ServerConfig) {
request.version,
request.timestamp,
request.startingVersion,
request.endingVersion
)
request.endingVersion,
responseFormat = responseFormat)
if (version < tableConfig.startVersion) {
throw new DeltaSharingIllegalArgumentException(
s"You can only query table data since version ${tableConfig.startVersion}."
)
}
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(version), actions)
streamingOutput(Some(version), responseFormat, actions)
}

@Get("/shares/{share}/schemas/{schema}/tables/{table}/changes")
@ConsumesJson
def listCdfFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
Expand All @@ -363,42 +389,52 @@ class DeltaSharingService(serverConfig: ServerConfig) {
@Param("endingTimestamp") @Nullable endingTimestamp: String,
@Param("includeHistoricalMetadata") @Nullable includeHistoricalMetadata: String
): HttpResponse = processRequest {
val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val start = System.currentTimeMillis
val tableConfig = sharedTableManager.getTable(share, schema, table)
if (!tableConfig.cdfEnabled) {
throw new DeltaSharingIllegalArgumentException("cdf is not enabled on table " +
s"$share.$schema.$table")
}

val responseFormat = getResponseFormat(capabilitiesMap)
val (v, actions) = deltaSharedTableLoader.loadTable(tableConfig).queryCDF(
getCdfOptionsMap(
Option(startingVersion),
Option(endingVersion),
Option(startingTimestamp),
Option(endingTimestamp)
),
includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false)
includeHistoricalMetadata = Try(includeHistoricalMetadata.toBoolean).getOrElse(false),
responseFormat = responseFormat
)
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table cdf " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(v), actions)
streamingOutput(Some(v), responseFormat, actions)
}

private def streamingOutput(version: Option[Long], actions: Seq[SingleAction]): HttpResponse = {
private def streamingOutput(
version: Option[Long],
responseFormat: String,
actions: Seq[Object]): HttpResponse = {
val headers = if (version.isDefined) {
createHeadersBuilderForTableVersion(version.get)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, s"$DELTA_SHARING_RESPONSE_FORMAT=$responseFormat")
.build()
} else {
ResponseHeaders.builder(200)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, s"$DELTA_SHARING_RESPONSE_FORMAT=$responseFormat")
.build()
}
ResponseConversionUtil.streamingFrom(
actions.asJava.stream(),
headers,
HttpHeaders.of(),
(o: SingleAction) => processRequest {
(o: Object) => processRequest {
val out = new ByteArrayOutputStream
JsonUtils.mapper.writeValue(out, o)
out.write('\n')
Expand All @@ -412,8 +448,8 @@ class DeltaSharingService(serverConfig: ServerConfig) {
object DeltaSharingService {
val DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version"
val DELTA_TABLE_METADATA_CONTENT_TYPE = "application/x-ndjson; charset=utf-8"

val SPARK_STRUCTURED_STREAMING = "SparkStructuredStreaming"
val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities"
val DELTA_SHARING_RESPONSE_FORMAT = "responseformat"

private val parser = {
val parser = ArgumentParsers
Expand Down Expand Up @@ -533,6 +569,12 @@ object DeltaSharingService {
endingTimestamp.map(DeltaDataSource.CDF_END_TIMESTAMP_KEY -> _)).toMap
}

private[server] def getResponseFormat(headerCapabilities: Map[String, String]): String = {
headerCapabilities.get(DELTA_SHARING_RESPONSE_FORMAT).getOrElse(
DeltaSharedTable.RESPONSE_FORMAT_PARQUET
)
}

def main(args: Array[String]): Unit = {
val ns = parser.parseArgsOrFail(args)
val serverConfigPath = ns.getString("config")
Expand Down
135 changes: 135 additions & 0 deletions server/src/main/scala/io/delta/sharing/server/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ case class AddFile(
override def wrap: SingleAction = SingleAction(file = this)
}

// This was added because when we develop cdf support in delta sharing, AddFile is used with "file"
// key in the response json, so we need another action to be used with "add".
case class AddFileForCDF(
url: String,
id: String,
Expand Down Expand Up @@ -135,3 +137,136 @@ object Action {
// Basically delta sharing doesn't support write for now.
val maxWriterVersion = 0
}

/**
* Actions defined in delta format, used in response when requested format is delta.
*/

sealed trait DeltaAction {
/** Turn this object to the [[DeltaSingleAction]] wrap object. */
def wrap: DeltaSingleAction
}

/**
* Used to block older clients from reading the shared table when backwards
* incompatible changes are made to the protocol. Readers and writers are
* responsible for checking that they meet the minimum versions before performing
* any other operations.
*
* Since this action allows us to explicitly block older clients in the case of a
* breaking change to the protocol, clients should be tolerant of messages and
* fields that they do not understand.
*/
case class DeltaProtocol(minReaderVersion: Int) extends DeltaAction {
override def wrap: DeltaSingleAction = DeltaSingleAction(protocol = this)
}

/**
* DeltaAddFile used in delta sharing protocol, copied from AddFile in delta.
* Adding 4 delta sharing related fields: id/version/timestamp/expirationTimestamp.
* If the client uses delta kernel, it should redact these fields as needed.
* - id: used to uniquely identify a file, and in idToUrl mapping for executor to get
* presigned url.
* - version/timestamp: the version and timestamp of the commit, used to generate faked delta
* log file on the client side.
* - expirationTimestamp: indicate when the presigned url is going to expire and need a
* refresh.
* Ignoring 1 field: tags.
*/
case class DeltaAddFile(
path: String,
id: String,
@JsonInclude(JsonInclude.Include.ALWAYS)
partitionValues: Map[String, String],
size: Long,
modificationTime: Long,
dataChange: Boolean,
@JsonRawValue
stats: String = null,
version: java.lang.Long = null,
timestamp: java.lang.Long = null,
expirationTimestamp: Long) extends DeltaAction {
require(path.nonEmpty)

override def wrap: DeltaSingleAction = DeltaSingleAction(add = this)
}

/**
* DeltaRemoveFile used in delta sharing protocol, copied from RemoveFile in delta.
* Adding 4 delta sharing related fields: id/version/timestamp/expirationTimestamp.
* If the client uses delta kernel, it should redact these fields as needed.
* Ignoring 1 field: tags.
*/
case class DeltaRemoveFile(
path: String,
id: String,
deletionTimestamp: Option[Long],
dataChange: Boolean = true,
extendedFileMetadata: Boolean = false,
partitionValues: Map[String, String] = null,
size: Option[Long] = None,
version: Long,
timestamp: Long,
expirationTimestamp: Long) extends DeltaAction {
override def wrap: DeltaSingleAction = DeltaSingleAction(remove = this)
}

/**
* DeltaAddCDCFile used in delta sharing protocol, copied from AddCDCFile in delta.
* Adding 4 delta sharing related fields: id/version/timestamp/expirationTimestamp.
* If the client uses delta kernel, it should redact these fields as needed.
* Ignoring 1 field: tags.
*/
case class DeltaAddCDCFile(
path: String,
id: String,
partitionValues: Map[String, String],
size: Long,
version: Long,
timestamp: Long,
expirationTimestamp: Long) extends DeltaAction {
override def wrap: DeltaSingleAction = DeltaSingleAction(cdc = this)
}

/**
* DeltaMetadata used in delta sharing protocol, copied from Metadata in delta.
* Adding 1 delta sharing related field: version.
* If the client uses delta kernel, it should redact these fields as needed.
*/
case class DeltaMetadata(
id: String,
name: String = null,
description: String = null,
format: Format = Format(),
schemaString: String = null,
partitionColumns: Seq[String] = Nil,
configuration: Map[String, String] = Map.empty,
version: java.lang.Long = null,
createdTime: Option[Long] = Some(System.currentTimeMillis())) extends DeltaAction {
override def wrap: DeltaSingleAction = DeltaSingleAction(metaData = this)
}

/** A serialization helper to create a common action envelope. */
case class DeltaSingleAction(
add: DeltaAddFile = null,
remove: DeltaRemoveFile = null,
metaData: DeltaMetadata = null,
protocol: DeltaProtocol = null,
cdc: DeltaAddCDCFile = null) {

def unwrap: DeltaAction = {
if (add != null) {
add
} else if (remove != null) {
remove
} else if (metaData != null) {
metaData
} else if (protocol != null) {
protocol
} else if (cdc != null) {
cdc
} else {
null
}
}
}
Loading

0 comments on commit ef18d8d

Please sign in to comment.