From 2981618d5aadc90c371f00a84a0bac8dcf5ad6e5 Mon Sep 17 00:00:00 2001
From: Uros Stankovic <uros.stankovic@databricks.com>
Date: Fri, 1 Mar 2024 22:04:58 +0800
Subject: [PATCH] [SPARK-47167][SQL] Add concrete class for JDBC anonymous
 relation

### What changes were proposed in this pull request?

- Introducing new class to avoid creation of anonymous class in JDBCScan::toV1TableScan method

### Why are the changes needed?
This change should provide us better abilities for logging and debugging information about relation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Since changes are very simple, no testing was done, build is pre check-in validation tests are enough to show everything is alright.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45259 from urosstan-db/SPARK-47167-add-descriptive-relation.

Authored-by: Uros Stankovic <uros.stankovic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
---
 .../datasources/v2/jdbc/JDBCScan.scala        | 28 ++++-----
 .../v2/jdbc/JDBCV1RelationFromV2Scan.scala    | 57 +++++++++++++++++++
 2 files changed, 69 insertions(+), 16 deletions(-)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCV1RelationFromV2Scan.scala

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScan.scala
index d9ef94cee0438..75f3b04287aa4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScan.scala
@@ -16,8 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.jdbc
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.read.V1Scan
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation
@@ -40,20 +39,17 @@ case class JDBCScan(
   override def readSchema(): StructType = prunedSchema
 
   override def toV1TableScan[T <: BaseRelation with TableScan](context: SQLContext): T = {
-    new BaseRelation with TableScan {
-      override def sqlContext: SQLContext = context
-      override def schema: StructType = prunedSchema
-      override def needConversion: Boolean = relation.needConversion
-      override def buildScan(): RDD[Row] = {
-        val columnList = if (groupByColumns.isEmpty) {
-          prunedSchema.map(_.name).toArray
-        } else {
-          pushedAggregateColumn
-        }
-        relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
-          pushedLimit, sortOrders, pushedOffset)
-      }
-    }.asInstanceOf[T]
+    JDBCV1RelationFromV2Scan(
+      context,
+      prunedSchema,
+      relation,
+      pushedPredicates,
+      pushedAggregateColumn,
+      groupByColumns,
+      tableSample,
+      pushedLimit,
+      sortOrders,
+      pushedOffset).asInstanceOf[T]
   }
 
   override def description(): String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCV1RelationFromV2Scan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCV1RelationFromV2Scan.scala
new file mode 100644
index 0000000000000..feb33effae236
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCV1RelationFromV2Scan.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.jdbc
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation
+import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Relation that is compatible with V1 TableScan,
+ * but it uses JDBCRelation's buildScan which accepts all v2 pushdowns
+ */
+case class JDBCV1RelationFromV2Scan(
+    context: SQLContext,
+    prunedSchema: StructType,
+    relation: JDBCRelation,
+    pushedPredicates: Array[Predicate],
+    pushedAggregateColumn: Array[String] = Array(),
+    groupByColumns: Option[Array[String]],
+    tableSample: Option[TableSampleInfo],
+    pushedLimit: Int,
+    sortOrders: Array[String],
+    pushedOffset: Int) extends BaseRelation with TableScan {
+  override def sqlContext: SQLContext = context
+  override def schema: StructType = prunedSchema
+  override def needConversion: Boolean = relation.needConversion
+  override def buildScan(): RDD[Row] = {
+    val columnList = if (groupByColumns.isEmpty) {
+      prunedSchema.map(_.name).toArray
+    } else {
+      pushedAggregateColumn
+    }
+
+    relation.buildScan(columnList, prunedSchema, pushedPredicates, groupByColumns, tableSample,
+      pushedLimit, sortOrders, pushedOffset)
+  }
+
+  override def toString: String = "JDBC v1 Relation from v2 scan"
+}