Skip to content

Commit

Permalink
Merge branch 'main' of github.com:delta-io/delta-sharing into SC-135281
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Jun 28, 2023
2 parents 22793d1 + c0370ca commit 327dcc0
Show file tree
Hide file tree
Showing 31 changed files with 234 additions and 52 deletions.
31 changes: 28 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ lazy val commonSettings = Seq(

lazy val root = (project in file(".")).aggregate(spark, server)

lazy val spark = (project in file("spark")) settings(
name := "delta-sharing-spark",
crossScalaVersions := Seq(scala212, scala213),
lazy val client = (project in file("client")) settings(
name := "delta-sharing-client",
commonSettings,
scalaStyleSettings,
releaseSettings,
Expand All @@ -57,6 +56,32 @@ lazy val spark = (project in file("spark")) settings(
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.scalatest" %% "scalatest" % "3.2.3" % "test"
),
Compile / sourceGenerators += Def.task {
val file = (Compile / sourceManaged).value / "io" / "delta" / "sharing" / "client" / "package.scala"
IO.write(file,
s"""package io.delta.sharing
|
|package object client {
| val VERSION = "${version.value}"
|}
|""".stripMargin)
Seq(file)
}
)

lazy val spark = (project in file("spark")) dependsOn(client) settings(
name := "delta-sharing-spark",
crossScalaVersions := Seq(scala212, scala213),
commonSettings,
scalaStyleSettings,
releaseSettings,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",
"org.scalatest" %% "scalatest" % "3.2.3" % "test"
),
Compile / sourceGenerators += Def.task {
val file = (Compile / sourceManaged).value / "io" / "delta" / "sharing" / "spark" / "package.scala"
IO.write(file,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.delta.sharing.client.DeltaSharingFileSystem
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark
package io.delta.sharing.client

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.net.{URL, URLEncoder}
Expand All @@ -39,11 +39,11 @@ import org.apache.http.impl.client.{HttpClientBuilder, HttpClients}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import io.delta.sharing.spark.model._
import io.delta.sharing.spark.util.{JsonUtils, RetryUtils, UnexpectedHttpStatus}
import io.delta.sharing.client.model._
import io.delta.sharing.client.util.{JsonUtils, RetryUtils, UnexpectedHttpStatus}

/** An interface to fetch Delta metadata from remote server. */
private[sharing] trait DeltaSharingClient {
trait DeltaSharingClient {
def listAllTables(): Seq[Table]

def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long
Expand Down Expand Up @@ -93,7 +93,7 @@ private[sharing] case class ListAllTablesResponse(
nextPageToken: Option[String]) extends PaginationResponse

/** A REST client to fetch Delta metadata from remote server. */
private[spark] class DeltaSharingRestClient(
class DeltaSharingRestClient(
profileProvider: DeltaSharingProfileProvider,
timeoutInSeconds: Int = 120,
numRetries: Int = 10,
Expand Down Expand Up @@ -393,7 +393,8 @@ private[spark] class DeltaSharingRestClient(
}
}

private[spark] def prepareHeaders(httpRequest: HttpRequestBase): HttpRequestBase = {
// TODO: [linzhou] mark this as private once tests are migrated.
def prepareHeaders(httpRequest: HttpRequestBase): HttpRequestBase = {
val customeHeaders = profileProvider.getCustomHeaders
if (customeHeaders.contains(HttpHeaders.AUTHORIZATION)
|| customeHeaders.contains(HttpHeaders.USER_AGENT)) {
Expand Down Expand Up @@ -503,7 +504,7 @@ private[spark] class DeltaSharingRestClient(
}
}

private[spark] object DeltaSharingRestClient extends Logging {
object DeltaSharingRestClient extends Logging {
val CURRENT = 1

val SPARK_STRUCTURED_STREAMING = "Delta-Sharing-SparkStructuredStreaming"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark
package io.delta.sharing.client

import java.net.{URI, URLDecoder, URLEncoder}
import java.util.concurrent.TimeUnit
Expand All @@ -28,8 +28,8 @@ import org.apache.spark.SparkEnv
import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher}
import org.apache.spark.network.util.JavaUtils

import io.delta.sharing.spark.model.FileAction
import io.delta.sharing.spark.util.{ConfUtils, RetryUtils}
import io.delta.sharing.client.model.FileAction
import io.delta.sharing.client.util.{ConfUtils, RetryUtils}

/** Read-only file system for delta paths. */
private[sharing] class DeltaSharingFileSystem extends FileSystem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

package io.delta.sharing.spark
package io.delta.sharing.client

import java.nio.charset.StandardCharsets.UTF_8

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import io.delta.sharing.spark.util.JsonUtils
import io.delta.sharing.client.util.JsonUtils

case class DeltaSharingProfile(
shareCredentialsVersion: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark
package io.delta.sharing.client

import java.io.{ByteArrayInputStream, EOFException}
import java.net.URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.sharing.spark
package io.delta.sharing.client

import java.io.{EOFException, InputStream, IOException}
import java.nio.charset.StandardCharsets.UTF_8
Expand All @@ -51,7 +51,7 @@ import org.apache.http.conn.EofSensorInputStream
import org.apache.spark.delta.sharing.PreSignedUrlFetcher
import org.apache.spark.internal.Logging

import io.delta.sharing.spark.util.{RetryUtils, UnexpectedHttpStatus}
import io.delta.sharing.client.util.{RetryUtils, UnexpectedHttpStatus}

/**
* This is a special input stream to provide random access over HTTP. This class requires the server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark.model
package io.delta.sharing.client.model

import com.fasterxml.jackson.annotation.JsonInclude
import org.apache.spark.sql.types.{DataType, LongType, StringType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark.util
package io.delta.sharing.client.util

import java.util.concurrent.TimeUnit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark.util
package io.delta.sharing.client.util

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark.util
package io.delta.sharing.client.util

import java.io.{InterruptedIOException, IOException}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.util.{RpcUtils, ThreadUtils}

import io.delta.sharing.spark.DeltaSharingProfileProvider
import io.delta.sharing.client.DeltaSharingProfileProvider

/**
* @param expiration the expiration time of the pre signed urls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.delta.sharing.spark
package io.delta.sharing.client

import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* limitations under the License.
*/

package io.delta.sharing.spark
package io.delta.sharing.client

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite

import io.delta.sharing.spark.model.{AddCDCFile, AddFile, AddFileForCDF, FileAction, RemoveFile}
import io.delta.sharing.client.model.{AddCDCFile, AddFile, AddFileForCDF, FileAction, RemoveFile}

class DeltaSharingFileSystemSuite extends SparkFunSuite {
import DeltaSharingFileSystem._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.sharing.client

import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.sys.process._
import scala.util.Try

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkFunSuite
import org.scalatest.BeforeAndAfterAll

trait DeltaSharingIntegrationTest extends SparkFunSuite with BeforeAndAfterAll {

def shouldRunIntegrationTest: Boolean = {
sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0) &&
sys.env.get("AZURE_TEST_ACCOUNT_KEY").exists(_.length > 0) &&
sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").exists(_.length > 0)
}

@volatile private var process: Process = _
@volatile private var pidFile: File = _
var testProfileFile: File = _

val TEST_PORT = 12345

override def beforeAll(): Unit = {
super.beforeAll()
if (shouldRunIntegrationTest) {
pidFile = Files.createTempFile("delta-sharing-server", ".pid").toFile
testProfileFile = Files.createTempFile("delta-test", ".share").toFile
FileUtils.writeStringToFile(testProfileFile,
s"""{
| "shareCredentialsVersion": 1,
| "endpoint": "https://localhost:$TEST_PORT/delta-sharing",
| "bearerToken": "dapi5e3574ec767ca1548ae5bbed1a2dc04d"
|}""".stripMargin, UTF_8)

val startLatch = new CountDownLatch(1)
new Thread("Run TestDeltaSharingServer") {
setDaemon(true)

override def run(): Unit = {
val processLogger = ProcessLogger { stdout =>
// scalastyle:off println
println(stdout)
// scalastyle:on println
if (stdout.contains(s"https://127.0.0.1:$TEST_PORT/")) {
startLatch.countDown()
}
}
process =
Seq(
"/bin/bash",
"-c",
s"cd .. && build/sbt 'server / Test / runMain " +
s"io.delta.sharing.server.TestDeltaSharingServer ${pidFile.getCanonicalPath}'")
.run(processLogger)
process.exitValue()
process = null
startLatch.countDown()
}
}.start()
try {
assert(startLatch.await(120, TimeUnit.SECONDS), "the server didn't start in 120 seconds")
if (process == null) {
fail("the process exited with an error")
}
} catch {
case e: Throwable =>
if (process != null) {
process.destroy()
process = null
}
throw e
}
}
}

override def afterAll(): Unit = {
if (shouldRunIntegrationTest) {
try {
org.apache.hadoop.fs.FileSystem.closeAll()
if (process != null) {
process.destroy()
process = null
}
if (pidFile != null) {
val pid = FileUtils.readFileToString(pidFile)
Try(pid.toLong).foreach { pid =>
// scalastyle:off println
println(s"Killing $pid")
// scalastyle:on println
s"kill -9 $pid".!
}
pidFile.delete()
}
if (testProfileFile != null) {
testProfileFile.delete()
}
} finally {
super.afterAll()
}
}
}

def testProfileProvider: DeltaSharingProfileProvider = {
new DeltaSharingFileProfileProvider(new Configuration, testProfileFile.getCanonicalPath)
}

def integrationTest(testName: String)(func: => Unit): Unit = {
test(testName) {
assume(shouldRunIntegrationTest)
func
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* limitations under the License.
*/

package io.delta.sharing.spark
package io.delta.sharing.client

import java.sql.Timestamp

import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpGet

import io.delta.sharing.spark.model.{
import io.delta.sharing.client.model.{
AddCDCFile,
AddFile,
AddFileForCDF,
Expand All @@ -31,7 +31,7 @@ import io.delta.sharing.spark.model.{
RemoveFile,
Table
}
import io.delta.sharing.spark.util.UnexpectedHttpStatus
import io.delta.sharing.client.util.UnexpectedHttpStatus

// scalastyle:off maxLineLength
class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
Expand Down
Loading

0 comments on commit 327dcc0

Please sign in to comment.