Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes unused SPI APIs and makes connector instantiate catalogs #1588

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions partiql-cli/src/main/kotlin/org/partiql/cli/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import com.amazon.ionelement.api.loadAllElements
import org.partiql.cli.io.Format
import org.partiql.cli.pipeline.Pipeline
import org.partiql.cli.shell.Shell
import org.partiql.eval.PartiQLEngine
import org.partiql.eval.PartiQLResult
import org.partiql.plugins.memory.MemoryConnector
import org.partiql.plugins.memory.MemoryCatalog
import org.partiql.plugins.memory.MemoryTable
import org.partiql.spi.catalog.Catalog
import org.partiql.spi.catalog.Name
import org.partiql.spi.connector.Connector
import org.partiql.spi.catalog.Session
import org.partiql.spi.value.ion.IonDatum
import org.partiql.types.PType
import org.partiql.value.PartiQLValueExperimental
Expand All @@ -36,7 +36,6 @@ import picocli.CommandLine
import java.io.File
import java.io.InputStream
import java.io.SequenceInputStream
import java.time.Instant
import java.util.Collections
import java.util.Properties
import kotlin.system.exitProcess
Expand Down Expand Up @@ -185,24 +184,17 @@ internal class MainCommand : Runnable {
}
}

private fun session() = Pipeline.Session(
queryId = "cli",
userId = System.getProperty("user.name"),
currentCatalog = "default",
currentDirectory = emptyList(),
connectors = connectors(),
instant = Instant.now(),
debug = false,
mode = when (strict) {
true -> PartiQLEngine.Mode.STRICT
else -> PartiQLEngine.Mode.PERMISSIVE
}
)
private fun session() = Session.builder()
.identity(System.getProperty("user.name"))
.namespace(emptyList())
.catalog("default")
.catalogs(*catalogs().toTypedArray())
.build()

/**
* Produce the connector map for planning and execution.
*/
private fun connectors(): Map<String, Connector> {
private fun catalogs(): List<Catalog> {
if (dir != null && files != null && files!!.isNotEmpty()) {
error("Cannot specify both a database directory and a list of files.")
}
Expand All @@ -225,7 +217,7 @@ internal class MainCommand : Runnable {
} else {
ionNull()
}
val connector = MemoryConnector.builder()
val catalog = MemoryCatalog.builder()
.name("default")
.define(
MemoryTable.of(
Expand All @@ -235,9 +227,7 @@ internal class MainCommand : Runnable {
)
)
.build()
return mapOf(
"default" to connector
)
return listOf(catalog)
}

/**
Expand Down
28 changes: 1 addition & 27 deletions partiql-cli/src/main/kotlin/org/partiql/cli/pipeline/Pipeline.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,13 @@ import org.partiql.parser.PartiQLParser
import org.partiql.plan.v1.PartiQLPlan
import org.partiql.planner.PartiQLPlanner
import org.partiql.spi.catalog.Session
import org.partiql.spi.connector.Connector
import java.time.Instant

internal class Pipeline private constructor(
private val parser: PartiQLParser,
private val planner: PartiQLPlanner,
private val engine: PartiQLEngine,
) {

/**
* Combined planner and engine session.
*/
internal data class Session(
@JvmField val queryId: String,
@JvmField val userId: String,
@JvmField val currentCatalog: String,
@JvmField val currentDirectory: List<String>,
@JvmField val connectors: Map<String, Connector>,
@JvmField val instant: Instant,
@JvmField val debug: Boolean,
@JvmField val mode: PartiQLEngine.Mode,
) {

private val catalogs = connectors.values.map { it.getCatalog() }

fun planner() = org.partiql.spi.catalog.Session.builder()
.identity(userId)
.namespace(currentDirectory)
.catalog(currentCatalog)
.catalogs(*catalogs.toTypedArray())
.build()
}

/**
* TODO replace with the ResultSet equivalent?
*/
Expand All @@ -59,7 +33,7 @@ internal class Pipeline private constructor(

private fun plan(statement: Statement, session: Session): PartiQLPlan {
val callback = ProblemListener()
val result = planner.plan(statement, session.planner(), callback)
val result = planner.plan(statement, session, callback)
val errors = callback.problems.filter { it.details.severity == ProblemSeverity.ERROR }
if (errors.isNotEmpty()) {
throw RuntimeException(errors.joinToString())
Expand Down
21 changes: 10 additions & 11 deletions partiql-cli/src/main/kotlin/org/partiql/cli/shell/Shell.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.jline.utils.InfoCmp
import org.joda.time.Duration
import org.partiql.cli.pipeline.Pipeline
import org.partiql.eval.PartiQLResult
import org.partiql.spi.catalog.Session
import org.partiql.value.PartiQLValueExperimental
import org.partiql.value.io.PartiQLValueTextWriter
import java.io.Closeable
Expand Down Expand Up @@ -115,7 +116,7 @@ val donePrinting = AtomicBoolean(true)
*/
internal class Shell(
private val pipeline: Pipeline,
private val session: Pipeline.Session,
private val session: Session,
private val debug: Boolean,
) {

Expand Down Expand Up @@ -243,19 +244,17 @@ internal class Shell(
}
"info" -> {
// Print catalog information
val connector = session.connectors[session.currentCatalog]
if (connector == null) {
out.error("No connector for catalog ${session.currentCatalog}.")
continue
}
out.error("Connectors do not support listing metadata")
out.error("Catalogs do not support listing metadata")
}
"session" -> {
// Print session information
out.info("user: ${session.userId}")
out.info("mode: ${session.mode.name.lowercase()}")
out.info("catalog: ${session.currentCatalog}")
out.info("path: [${session.currentDirectory.joinToString(".")}]")
out.info("identity: ${session.getIdentity()}")
// TODO Session mode, `out.info("mode: ${session.getMode()")`
out.println()
out.info("ENVIRONMENT")
out.info("-----------")
out.info("CURRENT_CATALOG: ${session.getCatalog()}")
out.info("CURRENT_NAMESPACE: ${session.getNamespace()}")
out.println()
}
"version" -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.partiql.parser.PartiQLParser
import org.partiql.plan.v1.PartiQLPlan
import org.partiql.planner.builder.PartiQLPlannerBuilder
import org.partiql.planner.internal.SqlPlannerV1
import org.partiql.plugins.memory.MemoryConnector
import org.partiql.plugins.memory.MemoryCatalog
import org.partiql.plugins.memory.MemoryTable
import org.partiql.spi.catalog.Name
import org.partiql.spi.catalog.Session
Expand Down Expand Up @@ -1261,7 +1261,7 @@ class PartiQLEngineDefaultTest {

internal fun assert() {
val statement = parser.parse(input).root
val connector = MemoryConnector.builder()
val catalog = MemoryCatalog.builder()
.name("memory")
.apply {
globals.forEach {
Expand All @@ -1276,7 +1276,7 @@ class PartiQLEngineDefaultTest {
.build()
val session = Session.builder()
.catalog("memory")
.catalogs(connector.getCatalog())
.catalogs(catalog)
.build()
val plan = SqlPlannerV1.plan(statement, session)
val stmt = engine.prepare(plan, mode, session)
Expand Down Expand Up @@ -1353,10 +1353,10 @@ class PartiQLEngineDefaultTest {

private fun run(mode: PartiQLEngine.Mode): Pair<PartiQLValue, PartiQLPlan> {
val statement = parser.parse(input).root
val connector = MemoryConnector.builder().name("memory").build()
val catalog = MemoryCatalog.builder().name("memory").build()
val session = Session.builder()
.catalog("memory")
.catalogs(connector.getCatalog())
.catalogs(catalog)
.build()
val plan = SqlPlannerV1.plan(statement, session)
val stmt = engine.prepare(plan, mode, session)
Expand Down
22 changes: 11 additions & 11 deletions partiql-planner/src/main/kotlin/org/partiql/planner/internal/Env.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal class Env(private val session: Session) {
private val default: Catalog = catalogs.getCatalog(session.getCatalog()) ?: error("Default catalog does not exist")

/**
* Catalog lookup needs to search (3x) to handle schema-qualified and catalog-qualified use-cases.
* Catalog lookup needs to search (3x) to table schema-qualified and catalog-qualified use-cases.
*
* 1. Lookup in current catalog and namespace.
* 2. Lookup as a schema-qualified identifier.
Expand All @@ -59,38 +59,38 @@ internal class Env(private val session: Session) {
// 1. Search in current catalog and namespace
var catalog = default
var path = resolve(identifier)
var handle = catalog.getTableHandle(session, path)
var table = catalog.getTable(session, path)

// 2. Lookup as a schema-qualified identifier.
if (handle == null && identifier.hasQualifier()) {
if (table == null && identifier.hasQualifier()) {
path = identifier
handle = catalog.getTableHandle(session, path)
table = catalog.getTable(session, path)
}

// 3. Lookup as a catalog-qualified identifier
if (handle == null && identifier.hasQualifier()) {
if (table == null && identifier.hasQualifier()) {
val parts = identifier.getParts()
val head = parts.first()
val tail = parts.drop(1)
catalog = catalogs.getCatalog(head.getText(), ignoreCase = head.isRegular()) ?: return null
path = Identifier.of(tail)
handle = catalog.getTableHandle(session, path)
table = catalog.getTable(session, path)
}

// !! NOT FOUND !!
if (handle == null) {
if (table == null) {
return null
}

// Make a reference and return a global variable expression.
val refCatalog = catalog.getName()
val refName = handle.name
val refType = CompilerType(handle.table.getSchema())
val ref = Ref.Obj(refCatalog, refName, refType, handle.table)
val refName = table.getName()
val refType = CompilerType(table.getSchema())
val ref = Ref.Obj(refCatalog, refName, refType, table)

// Convert any remaining identifier parts to a path expression
val root = Rex(ref.type, rexOpVarGlobal(ref))
val tail = calculateMatched(path, handle.name)
val tail = calculateMatched(path, refName)
return if (tail.isEmpty()) root else root.toPath(tail)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ class PlanTest {
// Assert
DynamicTest.dynamicTest(displayName) {
val input = input[test.key] ?: error("no test cases")

val originalQuery = input
val normalizedQuery = test
listOf(true, false).forEach { isSignal ->
val inputPlan = pipeline.invoke(input, isSignal).plan
val outputPlan = pipeline.invoke(test, isSignal).plan
val inputPlan = pipeline.invoke(originalQuery, isSignal).plan
val outputPlan = pipeline.invoke(normalizedQuery, isSignal).plan
assertPlanEqual(inputPlan, outputPlan)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.partiql.plan.debug.PlanPrinter
import org.partiql.planner.internal.typer.CompilerType
import org.partiql.planner.internal.typer.PlanTyper.Companion.toCType
import org.partiql.planner.util.ProblemCollector
import org.partiql.plugins.memory.MemoryConnector
import org.partiql.plugins.memory.MemoryCatalog
import org.partiql.spi.catalog.Session
import org.partiql.types.BagType
import org.partiql.types.PType
Expand All @@ -25,21 +25,24 @@ internal class PlannerErrorReportingTests {
val userId = "test-user"
val queryId = "query"

val catalog = MemoryConnector
// TODO REMOVE fromStaticType
val catalog = MemoryCatalog
.builder()
.name(catalogName)
.define("missing_binding", StaticType.ANY)
.define("atomic", StaticType.INT2)
.define("collection_no_missing_atomic", BagType(StaticType.INT2))
.define("collection_contain_missing_atomic", BagType(StaticType.INT2))
.define("struct_no_missing", closedStruct(StructType.Field("f1", StaticType.INT2)))
.define("missing_binding", PType.fromStaticType(StaticType.ANY))
.define("atomic", PType.fromStaticType(StaticType.INT2))
.define("collection_no_missing_atomic", PType.fromStaticType(BagType(StaticType.INT2)))
.define("collection_contain_missing_atomic", PType.fromStaticType(BagType(StaticType.INT2)))
.define("struct_no_missing", PType.fromStaticType(closedStruct(StructType.Field("f1", StaticType.INT2))))
.define(
"struct_with_missing",
closedStruct(
StructType.Field("f1", StaticType.INT2),
PType.fromStaticType(
closedStruct(
StructType.Field("f1", StaticType.INT2),
)
)
)
.build().getCatalog()
.build()

val session = Session.builder()
.catalog(catalogName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package org.partiql.planner.internal
import org.partiql.spi.catalog.Catalog
import org.partiql.spi.catalog.Identifier
import org.partiql.spi.catalog.Name
import org.partiql.spi.catalog.Namespace
import org.partiql.spi.catalog.Session
import org.partiql.spi.catalog.Table
import org.partiql.types.PType

/**
* Basic catalog implementation used for testing; consider merging with MemoryConnector?
*
* TODO COMBINE WITH MemoryCatalog as the standard catalog implementation.
*/
public class TestCatalog private constructor(
private val name: String,
Expand All @@ -22,30 +23,12 @@ public class TestCatalog private constructor(
return null
}

override fun getTableHandle(session: Session, identifier: Identifier): Table.Handle? {
val matched = mutableListOf<String>()
override fun getTable(session: Session, identifier: Identifier): Table? {
var curr: Tree = root
for (part in identifier) {
curr = curr.get(part) ?: break
matched.add(curr.name)
}
if (curr.table == null) {
return null
}
return Table.Handle(
name = Name.of(matched),
table = curr.table!!
)
}

// TODO
override fun listTables(session: Session, namespace: Namespace): Collection<Name> {
return emptyList()
}

// TODO
override fun listNamespaces(session: Session, namespace: Namespace): Collection<Namespace> {
return emptyList()
return curr.table
}

private class Tree(
Expand Down Expand Up @@ -115,7 +98,7 @@ public class TestCatalog private constructor(
// upsert namespaces
curr = curr.getOrPut(part)
}
curr.table = Table.empty(name.getName(), schema)
curr.table = Table.empty(name, schema)
return this
}

Expand Down
Loading
Loading