Skip to content

Commit

Permalink
Spark 3.5.0 support on raydp (oap-project#395)
Browse files Browse the repository at this point in the history
* spark 3.5.0

* spark 3.5.0

* revert the workflow test as python 3.7 and 3.8 not compatible
  • Loading branch information
raviranak authored Dec 15, 2023
1 parent 386ea3c commit b7bf060
Show file tree
Hide file tree
Showing 11 changed files with 357 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ __pycache__/
build/
dist/
*.egg-info/
*.eggs/

dev/.tmp_dir/
target/
Expand Down
1 change: 1 addition & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<spark322.version>3.2.2</spark322.version>
<spark330.version>3.3.0</spark330.version>
<spark340.version>3.4.0</spark340.version>
<spark350.version>3.5.0</spark350.version>
<snappy.version>1.1.10.4</snappy.version>
<netty.version>4.1.94.Final</netty.version>
<commons.text.version>1.10.0</commons.text.version>
Expand Down
1 change: 1 addition & 0 deletions core/shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<module>spark322</module>
<module>spark330</module>
<module>spark340</module>
<module>spark350</module>
</modules>

<properties>
Expand Down
99 changes: 99 additions & 0 deletions core/shims/spark350/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.intel</groupId>
<artifactId>raydp-shims</artifactId>
<version>1.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>raydp-shims-spark350</artifactId>
<name>RayDP Shims for Spark 3.5.0</name>
<packaging>jar</packaging>

<properties>
<scala.version>2.12.15</scala.version>
<jackson.version>2.13.5</jackson.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>

<dependencies>
<dependency>
<groupId>com.intel</groupId>
<artifactId>raydp-shims-common</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark350.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark350.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.intel.raydp.shims.spark350.SparkShimProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.raydp.shims.spark350

import com.intel.raydp.shims.{SparkShims, SparkShimDescriptor}

object SparkShimProvider {
val SPARK350_DESCRIPTOR = SparkShimDescriptor(3, 5, 0)
val DESCRIPTOR_STRINGS = Seq(s"$SPARK350_DESCRIPTOR")
val DESCRIPTOR = SPARK350_DESCRIPTOR
}

class SparkShimProvider extends com.intel.raydp.shims.SparkShimProvider {
def createShim: SparkShims = {
new Spark350Shims()
}

def matches(version: String): Boolean = {
SparkShimProvider.DESCRIPTOR_STRINGS.contains(version)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.raydp.shims.spark350

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.executor.RayDPExecutorBackendFactory
import org.apache.spark.executor.spark350._
import org.apache.spark.spark350.TaskContextUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.spark350.SparkSqlUtils

import com.intel.raydp.shims.{ShimDescriptor, SparkShims}

class Spark350Shims extends SparkShims {
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR

override def toDataFrame(
rdd: JavaRDD[Array[Byte]],
schema: String,
session: SparkSession): DataFrame = {
SparkSqlUtils.toDataFrame(rdd, schema, session)
}

override def getExecutorBackendFactory(): RayDPExecutorBackendFactory = {
new RayDPSpark350ExecutorBackendFactory()
}

override def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = {
TaskContextUtils.getDummyTaskContext(partitionId, env)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.spark350

import java.util.Properties

import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl}
import org.apache.spark.memory.TaskMemoryManager

object TaskContextUtils {
def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = {
new TaskContextImpl(0, 0, partitionId, -1024, 0, 0,
new TaskMemoryManager(env.memoryManager, 0), new Properties(), env.metricsSystem)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import java.net.URL

import org.apache.spark.SparkEnv
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv

class RayCoarseGrainedExecutorBackend(
rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
extends CoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
bindAddress,
hostname,
cores,
env,
resourcesFileOpt,
resourceProfile) {

override def getUserClassPath: Seq[URL] = userClassPath

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor.spark350

import java.net.URL

import org.apache.spark.SparkEnv
import org.apache.spark.executor._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv

class RayDPSpark350ExecutorBackendFactory
extends RayDPExecutorBackendFactory {
override def createExecutorBackend(
rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile): CoarseGrainedExecutorBackend = {
new RayCoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
bindAddress,
hostname,
cores,
userClassPath,
env,
resourcesFileOpt,
resourceProfile)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.spark350

import org.apache.spark.TaskContext
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.types._

object SparkSqlUtils {
def toDataFrame(
arrowBatchRDD: JavaRDD[Array[Byte]],
schemaString: String,
session: SparkSession): DataFrame = {
val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
val timeZoneId = session.sessionState.conf.sessionLocalTimeZone
val rdd = arrowBatchRDD.rdd.mapPartitions { iter =>
val context = TaskContext.get()
ArrowConverters.fromBatchIterator(iter, schema, timeZoneId,false, context)
}
session.internalCreateDataFrame(rdd.setName("arrow"), schema)
}
}

0 comments on commit b7bf060

Please sign in to comment.