Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6832] Impl Spark DSv2 YARN Connector that supports reading YARN aggregation logs #6856

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
634991c
init module kyuubi-spark-connector-yarn
naive-zhang Dec 9, 2024
855c5e0
introduce DFSMiniCluster and YarnCluster for test cases
naive-zhang Dec 10, 2024
7fcd74f
Merge branch 'apache:master' into yarn-catalog
naive-zhang Dec 10, 2024
827eb91
init yarn.default.app table query and impl yarn catalog suite
naive-zhang Dec 11, 2024
a86b79c
make YarnAppScan support BATCH_SCAN
naive-zhang Dec 11, 2024
d03e568
modify reader logic in YarnAppScan
naive-zhang Dec 12, 2024
133d307
get hadoop conf from SparkSession.active.SparkConf
naive-zhang Dec 12, 2024
db4a774
add xml file sense
naive-zhang Dec 13, 2024
f6064f1
split hdfs related xml and yarn related xml
naive-zhang Dec 13, 2024
ea8be05
reformat app query test code
naive-zhang Dec 13, 2024
e092859
try to read from hdfs
naive-zhang Dec 14, 2024
357c0ef
use more elegant method for hdfs and yarn api
naive-zhang Dec 14, 2024
d8f8184
predicates push down in app tables with equalTo(appId) and equalTo(st…
naive-zhang Dec 17, 2024
ce7930a
predicates push down in app tables with equalTo(appType)
naive-zhang Dec 17, 2024
c0dd20a
predicates push down in app tables with in appType or in appState
naive-zhang Dec 18, 2024
ebb7a28
refactor all log related scala codes
naive-zhang Dec 18, 2024
d1192d8
refactor all log related scala codes
naive-zhang Dec 18, 2024
7ef1fd3
refactor all log related scala codes
naive-zhang Dec 18, 2024
0faade6
add todo for modify task nums
naive-zhang Dec 19, 2024
5134708
Merge branch 'apache:master' into yarn-catalog
naive-zhang Dec 19, 2024
445c405
try to push down predicates for log reading
naive-zhang Dec 20, 2024
94a570d
fix list tables error
naive-zhang Dec 20, 2024
2efb0df
rename row_number into line_num and add file_name column in the table…
naive-zhang Dec 20, 2024
15ea677
fix the case of dir which does not end with '/'
naive-zhang Dec 20, 2024
531d33c
fix the case of predicate push down when query apps table with equalT…
naive-zhang Dec 20, 2024
a2a2164
fix code style problem in YarnAppPartitionReader
naive-zhang Dec 20, 2024
1b46b7b
fix star match in log dir
naive-zhang Dec 20, 2024
c150558
fix code style problem in YarnAppPartitionReader
naive-zhang Dec 20, 2024
240393c
remove fs close
naive-zhang Dec 20, 2024
5f6597a
remove supports for hadoop2
naive-zhang Dec 20, 2024
f9f1fb2
fix predicates push down error when query app_logs with line_num
naive-zhang Dec 21, 2024
b2ccf50
fix predicates push down error when query app_logs with line_num and …
naive-zhang Dec 21, 2024
c1775ee
fix style check error
naive-zhang Dec 21, 2024
21973e1
change query condition from user into host and remove local username
naive-zhang Dec 21, 2024
e0f0bbf
Update extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/or…
naive-zhang Dec 22, 2024
9be34a7
canonicalize some error message
naive-zhang Dec 22, 2024
54e1baf
remove unnecessary abstract class BasicScanBuilder
naive-zhang Dec 22, 2024
2f171e8
Update extensions/spark/kyuubi-spark-connector-yarn/src/main/scala/or…
naive-zhang Dec 22, 2024
b258a75
Update extensions/spark/kyuubi-spark-connector-yarn/pom.xml
naive-zhang Dec 22, 2024
86f9476
remove unnecessary abstract class BasicScanBuilder
naive-zhang Dec 22, 2024
1d4d45d
Merge remote-tracking branch 'origin/yarn-catalog' into yarn-catalog
naive-zhang Dec 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions extensions/spark/kyuubi-spark-connector-yarn/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-parent</artifactId>
<version>1.11.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>kyuubi-spark-connector-yarn_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Spark Yarn Logs Connector</name>
naive-zhang marked this conversation as resolved.
Show resolved Hide resolved
<url>https://kyuubi.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalacheck-1-17_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-server_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-server_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>com.google.guava:guava</include>
<include>org.apache.kyuubi:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

<profiles>
<profile>
<id>cross-version-test</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<excludeDefaultDirectories>true</excludeDefaultDirectories>
<filesets>
<fileset>
<directory>target/scala-${scala.binary.version}/classes</directory>
<includes>**/*.*</includes>
</fileset>
</filesets>
</configuration>
<executions>
<execution>
<id>clean target/scala-${scala.binary.version}/classes</id>
<goals>
<goal>clean</goal>
</goals>
<phase>process-test-classes</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.yarn

import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read._

trait BasicScan
extends ScanBuilder
with Scan with Batch with Serializable {

protected val hadoopConfMap: Map[String, String] = SparkSession.active.sparkContext
.hadoopConfiguration.asScala.map(kv => (kv.getKey, kv.getValue)).toMap

override def toBatch: Batch = this

override def build(): Scan = this
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.yarn

import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.sources.Filter

trait BasicScanBuilder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this abstract layer is really helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it seems unnecessary and I've removed it

extends ScanBuilder
with SupportsPushDownFilters with Serializable {

protected var pushed: Array[Filter] = Array.empty

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
val (supportedFilter, unsupportedFilter) = filters.partition {
case _: org.apache.spark.sql.sources.EqualTo => true
case _: org.apache.spark.sql.sources.In => true
case _ => false
}
pushed = supportedFilter
unsupportedFilter
}

override def pushedFilters(): Array[Filter] = pushed
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.yarn

import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.sources.Filter

case class YarnAppPartition(hadoopConfMap: Map[String, String], filters: Array[Filter])
extends InputPartition
Loading
Loading