Skip to content

Commit

Permalink
ci(pyspark): restructure curl invocation to name the file correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Sep 18, 2024
1 parent 6ffd26c commit 82a44f8
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 49 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ PGPASSWORD="postgres"
MYSQL_PWD="ibis"
MSSQL_SA_PASSWORD="1bis_Testing!"
DRUID_URL="druid://localhost:8082/druid/v2/sql"
SPARK_ICEBERG_JAR="iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
12 changes: 11 additions & 1 deletion .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,17 @@ jobs:
- name: install iceberg
shell: bash
if: matrix.pyspark-version == '3.5'
run: pushd "$(poetry run python -c "import pyspark; print(pyspark.__file__.rsplit('/', 1)[0])")/jars" && curl -LO https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar
run: |
set -euo pipefail
base="iceberg-spark-runtime-3.5_2.12-1.5.2.jar"
path="$(poetry run python -c "import pyspark; print(pyspark.__file__.rsplit('/', 1)[0])")/jars"
url="https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/${base}"
pushd "${path}"
curl -Lo "${base}" "${url}"
echo "SPARK_ICEBERG_JAR=${base}" >> "$GITHUB_ENV"
- name: run tests
run: just ci-check -m pyspark
Expand Down
111 changes: 71 additions & 40 deletions ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@
import sqlglot.expressions as sge
from packaging.version import parse as vparse
from pyspark import SparkConf

try:
from pyspark.errors.exceptions.base import AnalysisException # PySpark 3.5+
except ImportError:
from pyspark.sql.utils import AnalysisException # PySpark 3.3


from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType, DoubleType, LongType, StringType

Expand All @@ -38,9 +31,9 @@
from ibis.util import deprecated

try:
from pyspark.errors import ParseException as PySparkParseException
from pyspark.errors import AnalysisException, ParseException
except ImportError:
from pyspark.sql.utils import ParseException as PySparkParseException
from pyspark.sql.utils import AnalysisException, ParseException

if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
Expand All @@ -53,8 +46,9 @@

from ibis.expr.api import Watermark

PYSPARK_LT_34 = vparse(pyspark.__version__) < vparse("3.4")
PYSPARK_LT_35 = vparse(pyspark.__version__) < vparse("3.5")
PYSPARK_VERSION = vparse(pyspark.__version__)
PYSPARK_LT_34 = PYSPARK_VERSION < vparse("3.4")
PYSPARK_LT_35 = PYSPARK_VERSION < vparse("3.5")
ConnectionMode = Literal["streaming", "batch"]


Expand Down Expand Up @@ -279,55 +273,89 @@ def _active_catalog_database(self, catalog: str | None, db: str | None):
#
# We attempt to use the Unity-specific Spark SQL to set CATALOG and DATABASE
# and if that causes a parser exception we fall back to using the catalog API.
v = self.compiler.v
quoted = self.compiler.quoted
dialect = self.dialect
catalog_api = self._session.catalog

try:
if catalog is not None:
catalog_sql = sge.Use(
kind=v.CATALOG, this=sg.to_identifier(catalog, quoted=quoted)
).sql(dialect)

try:
catalog_sql = sg.to_identifier(catalog).sql(self.dialect)
self.raw_sql(f"USE CATALOG {catalog_sql}")
except PySparkParseException:
self._session.catalog.setCurrentCatalog(catalog)
self.raw_sql(catalog_sql)
except ParseException:
catalog_api.setCurrentCatalog(catalog)

db_sql = sge.Use(
kind=v.DATABASE, this=sg.to_identifier(db, quoted=quoted)
).sql(dialect)

try:
db_sql = sg.to_identifier(db).sql(self.dialect)
self.raw_sql(f"USE DATABASE {db_sql}")
except PySparkParseException:
self._session.catalog.setCurrentDatabase(db)
self.raw_sql(db_sql)
except ParseException:
catalog_api.setCurrentDatabase(db)
yield
finally:
if catalog is not None:
catalog_sql = sge.Use(
kind=v.CATALOG,
this=sg.to_identifier(current_catalog, quoted=quoted),
).sql(dialect)
try:
catalog_sql = sg.to_identifier(current_catalog).sql(self.dialect)
self.raw_sql(f"USE CATALOG {catalog_sql}")
except PySparkParseException:
self._session.catalog.setCurrentCatalog(current_catalog)
self.raw_sql(catalog_sql)
except ParseException:
catalog_api.setCurrentCatalog(current_catalog)

db_sql = sge.Use(
kind=v.DATABASE, this=sg.to_identifier(current_db, quoted=quoted)
).sql(dialect)

try:
db_sql = sg.to_identifier(current_db).sql(self.dialect)
self.raw_sql(f"USE DATABASE {db_sql}")
except PySparkParseException:
self._session.catalog.setCurrentDatabase(current_db)
self.raw_sql(db_sql)
except ParseException:
catalog_api.setCurrentDatabase(current_db)

@contextlib.contextmanager
def _active_catalog(self, name: str | None):
if name is None or PYSPARK_LT_34:
yield
return

prev_catalog = self.current_catalog
prev_database = self.current_database

v = self.compiler.v
quoted = self.compiler.quoted
dialect = self.dialect

catalog_sql = sge.Use(
kind=v.CATALOG, this=sg.to_identifier(name, quoted=quoted)
).sql(dialect)
catalog_api = self._session.catalog

try:
try:
catalog_sql = sg.to_identifier(name).sql(self.dialect)
self.raw_sql(f"USE CATALOG {catalog_sql};")
except PySparkParseException:
self._session.catalog.setCurrentCatalog(name)
self.raw_sql(catalog_sql)
except ParseException:
catalog_api.setCurrentCatalog(name)
yield
finally:
catalog_sql = sge.Use(
kind=v.CATALOG, this=sg.to_identifier(prev_catalog, quoted=quoted)
).sql(dialect)
db_sql = sge.Use(
kind=v.DATABASE, this=sg.to_identifier(prev_database, quoted=quoted)
).sql(dialect)

try:
catalog_sql = sg.to_identifier(prev_catalog).sql(self.dialect)
db_sql = sg.to_identifier(prev_database).sql(self.dialect)
self.raw_sql(f"USE CATALOG {catalog_sql};")
self.raw_sql(f"USE DATABASE {db_sql};")
except PySparkParseException:
self._session.catalog.setCurrentCatalog(prev_catalog)
self._session.catalog.setCurrentDatabase(prev_database)
self.raw_sql(catalog_sql)
self.raw_sql(db_sql)
except ParseException:
catalog_api.setCurrentCatalog(prev_catalog)
catalog_api.setCurrentDatabase(prev_database)

def list_catalogs(self, like: str | None = None) -> list[str]:
catalogs = [res.catalog for res in self._session.sql("SHOW CATALOGS").collect()]
Expand Down Expand Up @@ -491,7 +519,7 @@ def create_database(
sql = sge.Create(
kind="DATABASE",
exist=force,
this=sg.to_identifier(name),
this=sg.to_identifier(name, quoted=self.compiler.quoted),
properties=properties,
)
with self._active_catalog(catalog):
Expand All @@ -515,7 +543,10 @@ def drop_database(
"""
sql = sge.Drop(
kind="DATABASE", exist=force, this=sg.to_identifier(name), cascade=force
kind="DATABASE",
exist=force,
this=sg.to_identifier(name, quoted=self.compiler.quoted),
cascade=force,
)
with self._active_catalog(catalog):
with self._safe_raw_sql(sql):
Expand Down
5 changes: 4 additions & 1 deletion ibis/backends/pyspark/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,15 @@ def connect(*, tmpdir, worker_id, **kw):
.config("spark.sql.execution.arrow.pyspark.enabled", False)
.config("spark.sql.streaming.schemaInference", True)
)

config = (
config.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config(
"spark.jars.packages",
f"org.apache.iceberg:{os.environ['SPARK_ICEBERG_JAR']}",
)
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "icehouse")
Expand Down
6 changes: 2 additions & 4 deletions ibis/backends/pyspark/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@


@pytest.mark.xfail_version(pyspark=["pyspark<3.4"], reason="no catalog support")
def test_catalog_db_args(con, monkeypatch):
monkeypatch.setattr(ibis.options, "default_backend", con)
def test_catalog_db_args(con):
t = ibis.memtable({"epoch": [1712848119, 1712848121, 1712848155]})

assert con.current_catalog == "spark_catalog"
Expand Down Expand Up @@ -40,8 +39,7 @@ def test_catalog_db_args(con, monkeypatch):
assert con.current_database == "ibis_testing"


def test_create_table_no_catalog(con, monkeypatch):
monkeypatch.setattr(ibis.options, "default_backend", con)
def test_create_table_no_catalog(con):
t = ibis.memtable({"epoch": [1712848119, 1712848121, 1712848155]})

assert con.current_database != "default"
Expand Down
7 changes: 4 additions & 3 deletions poetry-overrides.nix
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
final: prev: {
pyspark = prev.pyspark.overridePythonAttrs (attrs:
let
icebergJarUrl = "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar";
baseName = "iceberg-spark-runtime-3.5_2.12-1.5.2.jar";
icebergJarUrl = "https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/${baseName}";
icebergJar = final.pkgs.fetchurl {
name = "iceberg-spark-runtime-3.5_2.12-1.5.2.jar";
name = baseName;
url = icebergJarUrl;
sha256 = "12v1704h0bq3qr2fci0mckg9171lyr8v6983wpa83k06v1w4pv1a";
sha256 = "sha256-KuxLeNgGzIHU5QMls1H2NJyQ3mQVROZExgMvAAk4YYs=";
};
in
{
Expand Down

0 comments on commit 82a44f8

Please sign in to comment.