Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat #470: Remove Qbeast Provider from the Catalog #527

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
26 changes: 19 additions & 7 deletions core/src/main/scala/io/qbeast/core/model/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ object QbeastOptions {
val qbeastOptionKeys: Set[String] =
Set(PATH, COLUMNS_TO_INDEX, CUBE_SIZE, TABLE_FORMAT, COLUMN_STATS)

val supportedTableFormats: Set[String] = Set("delta")

/**
* Gets the columns to index from the options
* @param options
* the options passed on the dataframe
* @return
*/
private def getColumnsToIndex(options: Map[String, String]): Seq[String] = {
def getColumnsToIndex(options: Map[String, String]): Seq[String] = {
val encodedColumnsToIndex = options.getOrElse(
COLUMNS_TO_INDEX, {
throw AnalysisExceptionFactory.create(
Expand All @@ -121,16 +123,20 @@ object QbeastOptions {
* the options passed on the dataframe
* @return
*/
private def getDesiredCubeSize(options: Map[String, String]): Int = {
def getDesiredCubeSize(options: Map[String, String]): Int = {
options.get(CUBE_SIZE) match {
case Some(value) => value.toInt
case None => DEFAULT_CUBE_SIZE
}
}

private def getTableFormat(options: Map[String, String]): String =
def getTableFormat(options: Map[String, String]): String =
options.get(TABLE_FORMAT) match {
case Some(value) => value
case Some(value) if (supportedTableFormats.contains(value)) => value
case Some(unsupportedValue) =>
throw AnalysisExceptionFactory.create(
s"Unsupported table format: $unsupportedValue. Supported formats are: " +
supportedTableFormats.mkString(", "))
case None => DEFAULT_TABLE_FORMAT
}

Expand All @@ -142,7 +148,7 @@ object QbeastOptions {
* the options passed on the dataframe
* @return
*/
private def getColumnStats(options: Map[String, String]): Option[String] =
def getColumnStats(options: Map[String, String]): Option[String] =
options.get(COLUMN_STATS)

/**
Expand All @@ -162,7 +168,7 @@ object QbeastOptions {
* contains the name of the hook, the full class name of the hook, and optionally the argument
* for the hook.
*/
private def getHookInfo(options: Map[String, String]): Seq[HookInfo] = {
def getHookInfo(options: Map[String, String]): Seq[HookInfo] = {
val hookNamePattern: Regex = s"${PRE_COMMIT_HOOKS_PREFIX.toLowerCase}.(\\w+)".r
options
.map {
Expand Down Expand Up @@ -229,7 +235,13 @@ object QbeastOptions {
/**
* The empty options to be used as a placeholder.
*/
def empty: QbeastOptions = QbeastOptions(Seq.empty, DEFAULT_CUBE_SIZE, DEFAULT_TABLE_FORMAT)
def empty: QbeastOptions = QbeastOptions(
columnsToIndex = Seq.empty[String],
cubeSize = DEFAULT_CUBE_SIZE,
tableFormat = DEFAULT_TABLE_FORMAT,
columnStats = None,
hookInfo = Nil,
extraOptions = Map.empty[String, String])

def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = {
new QTableID(
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/io/qbeast/spark/utils/Params.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ object MetadataConfig {
final val blocks = "qbeast.blocks"
final val tags = "qbeast.tags"
final val configuration = "qbeast.configuration"

final val tableConfigurationKeys: Seq[String] =
Seq(revision, lastRevisionID, configuration)

}
13 changes: 7 additions & 6 deletions src/main/scala/io/qbeast/catalog/QbeastCatalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.qbeast.internal.commands.AlterTableUnsetPropertiesQbeastCommand
import io.qbeast.sources.v2.QbeastStagedTableImpl
import io.qbeast.sources.v2.QbeastTableImpl
import org.apache.hadoop.fs.Path
import org.apache.spark.qbeast.config.DEFAULT_TABLE_FORMAT
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
Expand Down Expand Up @@ -95,8 +96,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
override def loadTable(ident: Identifier): Table = {
try {
getSessionCatalog().loadTable(ident) match {
case table
if QbeastCatalogUtils.isQbeastProvider(table.properties().asScala.get("provider")) =>
case table if QbeastCatalogUtils.isQbeastTable(table.properties()) =>
QbeastCatalogUtils.loadQbeastTable(table, tableFactory)
case o => o
}
Expand All @@ -105,6 +105,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
if QbeastCatalogUtils.isPathTable(ident) =>
QbeastTableImpl(
TableIdentifier(ident.name(), ident.namespace().headOption),
DEFAULT_TABLE_FORMAT,
new Path(ident.name()),
Map.empty,
tableFactory = tableFactory)
Expand All @@ -125,7 +126,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {

if (QbeastCatalogUtils.isQbeastProvider(properties)) {
if (QbeastCatalogUtils.isQbeastTable(properties)) {
// Create the table
QbeastCatalogUtils.createQbeastTable(
ident,
Expand Down Expand Up @@ -160,7 +161,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (QbeastCatalogUtils.isQbeastProvider(properties)) {
if (QbeastCatalogUtils.isQbeastTable(properties)) {
QbeastStagedTableImpl(
ident,
schema,
Expand Down Expand Up @@ -189,7 +190,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (QbeastCatalogUtils.isQbeastProvider(properties)) {
if (QbeastCatalogUtils.isQbeastTable(properties)) {
QbeastStagedTableImpl(
ident,
schema,
Expand Down Expand Up @@ -229,7 +230,7 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (QbeastCatalogUtils.isQbeastProvider(properties)) {
if (QbeastCatalogUtils.isQbeastTable(properties)) {
QbeastStagedTableImpl(
ident,
schema,
Expand Down
124 changes: 106 additions & 18 deletions src/main/scala/io/qbeast/catalog/QbeastCatalogUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@
package io.qbeast.catalog

import io.qbeast.core.model.QTableID
import io.qbeast.core.model.QbeastOptions
import io.qbeast.sources.v2.QbeastTableImpl
import io.qbeast.spark.utils.MetadataConfig
import io.qbeast.table.IndexedTable
import io.qbeast.table.IndexedTableFactory
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.qbeast.config.DEFAULT_TABLE_FORMAT
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.SparkCatalogV2Util
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources.DataSource
Expand All @@ -49,8 +51,20 @@ object QbeastCatalogUtils extends Logging {

val QBEAST_PROVIDER_NAME: String = "qbeast"

private val PROVIDER_CONF_KEY: String = "provider"

private val TABLE_FORMAT_CONF_KEY: String = QbeastOptions.TABLE_FORMAT

private val supportedProviders: Set[String] = QbeastOptions.supportedTableFormats

private val qbeastOptionConfigurationKeys: Set[String] = QbeastOptions.qbeastOptionKeys

private val qbeastTableConfigurationKeys: Set[String] =
MetadataConfig.tableConfigurationKeys.toSet

/**
* Checks if the provider is Qbeast
*
* @param provider
* the provider, if any
* @return
Expand All @@ -59,16 +73,69 @@ object QbeastCatalogUtils extends Logging {
provider.isDefined && provider.get == QBEAST_PROVIDER_NAME
}

def isQbeastProvider(tableSpec: TableSpec): Boolean = {
tableSpec.provider.contains(QBEAST_PROVIDER_NAME)
def isQbeastProvider(properties: Map[String, String]): Boolean = isQbeastProvider(
properties.get(PROVIDER_CONF_KEY))

/**
* Checks if the provider is supported by Qbeast
* @param provider
* the provider, if any
* @return
*/
def isSupportedProvider(provider: Option[String]): Boolean = {
provider.isDefined && supportedProviders.contains(provider.get)
}

/**
* Checks if the provider is supported by Qbeast
* @param tableSpec
* the table specification
* @return
*/
def isSupportedProvider(tableSpec: TableSpec): Boolean = {
isSupportedProvider(tableSpec.provider)
}

def isQbeastProvider(properties: Map[String, String]): Boolean = isQbeastProvider(
properties.get("provider"))
/**
* Checks if the properties contain the Qbeast Metadata
* @param properties
* @return
*/
def hasQbeastMetadata(properties: Map[String, String]): Boolean = {
qbeastTableConfigurationKeys.forall(prop =>
properties.contains(prop)) || qbeastOptionConfigurationKeys
.exists(
properties.contains
) // all properties are present OR ANY of the write options are present
}

/**
* TODO: Check if this method is correct. (We should allow users to write as delta in a
* qbeast-table) Checks id the Table is formatted with Qbeast A Table is considered Qbeast if:
* - the provider is Qbeast OR
* - it contains Qbeast Metadata & the provider is supported by Qbeast
* @param provider
* the provider, if any
* @param properties
* the properties of the table
* @return
*/
def isQbeastTable(provider: Option[String], properties: Map[String, String]): Boolean = {
isQbeastProvider(provider) || (isSupportedProvider(provider) && hasQbeastMetadata(properties))
}

def isQbeastProvider(properties: util.Map[String, String]): Boolean = isQbeastProvider(
def isQbeastTable(properties: Map[String, String]): Boolean = {
val providerConf = properties.get(PROVIDER_CONF_KEY)
isQbeastTable(providerConf, properties)
}

def isQbeastTable(properties: util.Map[String, String]): Boolean = isQbeastTable(
properties.asScala.toMap)

def isQbeastTable(table: CatalogTable): Boolean = {
isQbeastTable(table.provider, table.properties)
}

/**
* Checks if an Identifier is set with a path
* @param ident
Expand All @@ -92,15 +159,15 @@ object QbeastCatalogUtils extends Logging {
if (isPathTable(table)) return None
val tableExists = existingSessionCatalog.tableExists(table)
if (tableExists) {
val oldTable = existingSessionCatalog.getTableMetadata(table)
if (oldTable.tableType == CatalogTableType.VIEW) {
val existingTableMetadata = existingSessionCatalog.getTableMetadata(table)
if (existingTableMetadata.tableType == CatalogTableType.VIEW) {
throw AnalysisExceptionFactory.create(
s"$table is a view. You may not write data into a view.")
}
if (!isQbeastProvider(oldTable.provider)) {
if (!isQbeastTable(existingTableMetadata.properties)) {
throw AnalysisExceptionFactory.create(s"$table is not a Qbeast table.")
}
Some(oldTable)
Some(existingTableMetadata)
} else {
None
}
Expand Down Expand Up @@ -196,6 +263,23 @@ object QbeastCatalogUtils extends Logging {

}

/**
* Verifies the Catalog Properties to be used in the Catalog Table
* @param properties
* the properties to verify
* @return
*/
private def verifyAndUpdateCatalogProperties(
properties: Map[String, String],
writeOptions: Map[String, String]): Map[String, String] = {
// If the provider is Qbeast, we should add the tableFormat to the properties
if (properties.getOrElse(PROVIDER_CONF_KEY, "") == QBEAST_PROVIDER_NAME) {
properties.updated(
PROVIDER_CONF_KEY,
writeOptions.getOrElse(TABLE_FORMAT_CONF_KEY, DEFAULT_TABLE_FORMAT))
} else properties
}

/**
* Creates a Table on the Catalog
*
Expand Down Expand Up @@ -258,7 +342,9 @@ object QbeastCatalogUtils extends Logging {
// Process the parameters/options/configuration sent to the table
val qTableID = QTableID(loc.toString)
val indexedTable = tableFactory.getIndexedTable(qTableID)
val allProperties = indexedTable.verifyAndUpdateParameters(properties, dataFrame)
val allPropertiesVerified = indexedTable.verifyAndUpdateParameters(properties, dataFrame)
val allProperties = verifyAndUpdateCatalogProperties(allPropertiesVerified, writeOptions)
val provider = allProperties.get("provider")

// Initialize the path option
val storage = DataSource
Expand All @@ -280,7 +366,7 @@ object QbeastCatalogUtils extends Logging {
tableType = tableType,
storage = storage,
schema = schema,
provider = Some("qbeast"),
provider = provider,
partitionColumnNames = Seq.empty,
bucketSpec = None,
properties = allProperties,
Expand Down Expand Up @@ -353,26 +439,28 @@ object QbeastCatalogUtils extends Logging {
*/
def loadQbeastTable(table: Table, tableFactory: IndexedTableFactory): Table = {

val prop = table.properties()
val columns = table.columns()
val schema = SparkCatalogV2Util.v2ColumnsToStructType(columns)

table match {
case V1TableQbeast(t) =>
// Get the Catalog Table
val catalogTable = t.v1Table

val tableIdentifier = catalogTable.identifier
val tableProvider = catalogTable.provider.getOrElse(QBEAST_PROVIDER_NAME)
val path: String = if (catalogTable.tableType == CatalogTableType.EXTERNAL) {
// If it's an EXTERNAL TABLE, we can find the path through the Storage Properties
catalogTable.storage.locationUri.get.toString
} else {
// If it's a MANAGED TABLE, the location is set in the former catalogTable
catalogTable.location.toString
}
val schema = catalogTable.schema
val properties = catalogTable.properties

QbeastTableImpl(
catalogTable.identifier,
tableIdentifier,
tableProvider,
new Path(path),
prop.asScala.toMap,
properties,
Some(schema),
Some(catalogTable),
tableFactory)
Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/io/qbeast/internal/rules/SaveAsTableRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.qbeast.internal.rules

import io.qbeast.catalog.QbeastCatalogUtils.isQbeastProvider
import io.qbeast.catalog.QbeastCatalogUtils.isQbeastTable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.CreateTableAsSelect
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -55,12 +55,14 @@ class SaveAsTableRule(spark: SparkSession) extends Rule[LogicalPlan] with Loggin
// to make sure columnsToIndex is present
plan transformDown {
case saveAsSelect: CreateTableAsSelect
if isQbeastProvider(saveAsSelect.tableSpec.provider) =>
if isQbeastTable(saveAsSelect.tableSpec.provider, saveAsSelect.tableSpec.properties) =>
val tableSpec = saveAsSelect.tableSpec
val writeOptions = saveAsSelect.writeOptions
saveAsSelect.copy(tableSpec = createTableSpec(tableSpec, writeOptions))
case replaceAsSelect: ReplaceTableAsSelect
if isQbeastProvider(replaceAsSelect.tableSpec.provider) =>
if isQbeastTable(
replaceAsSelect.tableSpec.provider,
replaceAsSelect.tableSpec.properties) =>
val tableSpec = replaceAsSelect.tableSpec
val writeOptions = replaceAsSelect.writeOptions
replaceAsSelect.copy(tableSpec = createTableSpec(tableSpec, writeOptions))
Expand Down
Loading
Loading