diff --git a/trino-example/.gitattributes b/trino-example/.gitattributes new file mode 100644 index 0000000..b2b5f57 --- /dev/null +++ b/trino-example/.gitattributes @@ -0,0 +1 @@ +trino-410-views.tar.gz filter=lfs diff=lfs merge=lfs -text diff --git a/trino-example/.gitignore b/trino-example/.gitignore new file mode 100644 index 0000000..076ec38 --- /dev/null +++ b/trino-example/.gitignore @@ -0,0 +1,52 @@ +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* +replay_pid* + +.gradle +**/build/ +!src/**/build/ + +# Ignore Gradle GUI config +gradle-app.setting + +# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored) +!gradle-wrapper.jar + +# Avoid ignore Gradle wrappper properties +!gradle-wrapper.properties + +# Cache of project +.gradletasknamecache + +# Eclipse Gradle plugin generated files +# Eclipse Core +.project +# JDT-specific (Eclipse Java Development Tools) +.classpath + +# MacOS +.DS_Store + +# IDEA +.idea diff --git a/trino-example/README.md b/trino-example/README.md new file mode 100644 index 0000000..0612ce9 --- /dev/null +++ b/trino-example/README.md @@ -0,0 +1,34 @@ + + +# Iceberg Trino Demo + +This demo extends the Iceberg demo setup to include Trino. + +Clone this repository, change into the `flink-example` directory, and start up the docker environment. +```sh +git clone git@github.com:tabular-io/docker-spark-iceberg.git + +cd trino-example + +docker load < trino-410-views.tar.gz + +docker compose up +``` + diff --git a/trino-example/docker-compose.yml b/trino-example/docker-compose.yml new file mode 100644 index 0000000..5896712 --- /dev/null +++ b/trino-example/docker-compose.yml @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +version: "3" + +services: + trino-iceberg: + image: 'trino:iceberg-view-support-amd64' + container_name: trino-iceberg + ports: + - 8080:8080 + volumes: + - ./etc:/etc/trino + networks: + iceberg_net: + + spark-iceberg: + image: tabulario/spark-iceberg + container_name: spark-iceberg + build: ../spark + networks: + iceberg_net: + depends_on: + - rest + - minio + volumes: + - ./warehouse:/home/iceberg/warehouse + - ./notebooks:/home/iceberg/notebooks/notebooks + - ./trino:/usr/local/bin/trino + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + ports: + - 8888:8888 + - 8081:8080 + - 10000:10000 + - 10001:10001 + rest: + image: tabulario/iceberg-rest + container_name: iceberg-rest + networks: + iceberg_net: + ports: + - 8181:8181 + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + - CATALOG_WAREHOUSE=s3://warehouse/ + - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO + - CATALOG_S3_ENDPOINT=http://minio:9000 + minio: + image: minio/minio + container_name: minio + environment: + - MINIO_ROOT_USER=admin + - MINIO_ROOT_PASSWORD=password + - MINIO_DOMAIN=minio + networks: + iceberg_net: + aliases: + - warehouse.minio + ports: + - 9001:9001 + - 9000:9000 + command: ["server", "/data", "--console-address", ":9001"] + mc: + depends_on: + - minio + image: minio/mc + container_name: mc + networks: + iceberg_net: + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/warehouse; + /usr/bin/mc mb minio/warehouse; + /usr/bin/mc policy set public minio/warehouse; + tail -f /dev/null + " +networks: + iceberg_net: diff --git a/trino-example/etc/catalog/iceberg.properties b/trino-example/etc/catalog/iceberg.properties new file mode 100644 index 0000000..2732bba --- /dev/null +++ b/trino-example/etc/catalog/iceberg.properties @@ -0,0 +1,10 @@ +connector.name=iceberg +iceberg.catalog.type=rest +iceberg.rest-catalog.uri=http://rest:8181 + +fs.native-s3.enabled=true +s3.endpoint=http://minio:9000 +s3.aws-access-key=admin +s3.aws-secret-key=password +s3.path-style-access=true +s3.region=us-east-1 diff --git a/trino-example/etc/catalog/tpcds.properties b/trino-example/etc/catalog/tpcds.properties new file mode 100644 index 0000000..366f0a5 --- /dev/null +++ b/trino-example/etc/catalog/tpcds.properties @@ -0,0 +1,2 @@ +connector.name=tpcds +tpcds.splits-per-node=4 diff --git a/trino-example/etc/catalog/tpch.properties b/trino-example/etc/catalog/tpch.properties new file mode 100644 index 0000000..599f5ec --- /dev/null +++ b/trino-example/etc/catalog/tpch.properties @@ -0,0 +1,2 @@ +connector.name=tpch +tpch.splits-per-node=4 diff --git a/trino-example/etc/config.properties b/trino-example/etc/config.properties new file mode 100644 index 0000000..2b4522a --- /dev/null +++ b/trino-example/etc/config.properties @@ -0,0 +1,6 @@ +#single node install config +coordinator=true +node-scheduler.include-coordinator=true +http-server.http.port=8080 +discovery-server.enabled=true +discovery.uri=http://localhost:8080 diff --git a/trino-example/etc/jvm.config b/trino-example/etc/jvm.config new file mode 100644 index 0000000..7e1de37 --- /dev/null +++ b/trino-example/etc/jvm.config @@ -0,0 +1,20 @@ +-server +-Xmx1G +-XX:InitialRAMPercentage=80 +-XX:MaxRAMPercentage=80 +-XX:G1HeapRegionSize=32M +-XX:+ExplicitGCInvokesConcurrent +-XX:+ExitOnOutOfMemoryError +-XX:+HeapDumpOnOutOfMemoryError +-XX:-OmitStackTraceInFastThrow +-XX:ReservedCodeCacheSize=512M +-XX:PerMethodRecompilationCutoff=10000 +-XX:PerBytecodeRecompilationCutoff=10000 +-Djdk.attach.allowAttachSelf=true +-Djdk.nio.maxCachedBufferSize=2000000 +-Dfile.encoding=UTF-8 +# Reduce starvation of threads by GClocker, recommend to set about the number of cpu cores (JDK-8192647) +-XX:+UnlockDiagnosticVMOptions +-XX:GCLockerRetryAllocationCount=32 +# Allow loading dynamic agent used by JOL +-XX:+EnableDynamicAgentLoading diff --git a/trino-example/etc/log.properties b/trino-example/etc/log.properties new file mode 100644 index 0000000..abee45e --- /dev/null +++ b/trino-example/etc/log.properties @@ -0,0 +1,2 @@ +# Enable verbose logging from Trino +#io.trino=DEBUG diff --git a/trino-example/etc/node.properties b/trino-example/etc/node.properties new file mode 100644 index 0000000..aaf7398 --- /dev/null +++ b/trino-example/etc/node.properties @@ -0,0 +1,3 @@ +node.environment=docker +node.data-dir=/data/trino +plugin.dir=/usr/lib/trino/plugin diff --git a/trino-example/notebooks/Iceberg Views - Spark and Trino Interoperability.ipynb b/trino-example/notebooks/Iceberg Views - Spark and Trino Interoperability.ipynb new file mode 100644 index 0000000..2274b19 --- /dev/null +++ b/trino-example/notebooks/Iceberg Views - Spark and Trino Interoperability.ipynb @@ -0,0 +1,916 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "38b77222-0bc8-445d-8a88-dea253126e0a", + "metadata": {}, + "source": [ + "\"spark-logo\"\n", + "\n", + "First, lets set up our tables using pySpark to do a teardown to avoid conflicts and recreate the `nyc.taxis` table to store data from the NYC Taxi and Limousine Commission Record Data. We will initially load a month of trip data and do a simple query to verify the data was correctly written to the table." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6a5c8206", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "spark = SparkSession.builder.appName(\"Jupyter\").getOrCreate()\n", + "\n", + "spark" + ] + }, + { + "cell_type": "markdown", + "id": "6f9a9f41", + "metadata": {}, + "source": [ + "## Load One Month of NYC Taxi/Limousine Trip Data\n", + "\n", + "This notebook uses the New York City Taxi and Limousine Commission Trip Record Data available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. This data is stored in an iceberg table called `taxis`." + ] + }, + { + "cell_type": "markdown", + "id": "747bee98", + "metadata": {}, + "source": [ + "To be able to rerun the notebook several times, let's drop the table and the views if they exist to start fresh." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "930682ce", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "CREATE DATABASE IF NOT EXISTS nyc.taxis;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ce8ee544-274e-474f-b9f2-06a409bebf72", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "DROP TABLE IF EXISTS nyc.taxis;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "66d0ff00-95aa-477d-bc11-166600ac8318", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "DROP VIEW IF EXISTS nyc.long_distances;" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7cf0bbcd-6509-46bd-b531-dd76a9600fcf", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "DROP VIEW IF EXISTS nyc.negative_amounts;" + ] + }, + { + "cell_type": "markdown", + "id": "5816de2e", + "metadata": {}, + "source": [ + "## Create the `nyc.taxis` table in Spark" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f918310a", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "CREATE TABLE\n", + " nyc.taxis (\n", + " VendorID BIGINT,\n", + " tpep_pickup_datetime TIMESTAMP,\n", + " tpep_dropoff_datetime TIMESTAMP,\n", + " passenger_count DOUBLE,\n", + " trip_distance DOUBLE,\n", + " RatecodeID DOUBLE,\n", + " store_and_fwd_flag string,\n", + " PULocationID BIGINT,\n", + " DOLocationID BIGINT,\n", + " payment_type BIGINT,\n", + " fare_amount DOUBLE,\n", + " extra DOUBLE,\n", + " mta_tax DOUBLE,\n", + " tip_amount DOUBLE,\n", + " tolls_amount DOUBLE,\n", + " improvement_surcharge DOUBLE,\n", + " total_amount DOUBLE,\n", + " congestion_surcharge DOUBLE,\n", + " airport_fee DOUBLE\n", + " ) USING iceberg PARTITIONED BY (days (tpep_pickup_datetime))" + ] + }, + { + "cell_type": "markdown", + "id": "fcba103e", + "metadata": {}, + "source": [ + "## Write a month of trip data to `nyc.taxis`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1c37ca92", + "metadata": {}, + "outputs": [], + "source": [ + "df = spark.read.parquet(\"/home/iceberg/data/yellow_tripdata_2022-01.parquet\")\n", + "df.writeTo(\"nyc.taxis\").append()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a69152aa", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SELECT\n", + " *\n", + "FROM\n", + " nyc.taxis\n", + "LIMIT\n", + " 10" + ] + }, + { + "cell_type": "markdown", + "id": "e4822c22-9a1b-4111-86f7-b23e067b9738", + "metadata": {}, + "source": [ + "\"iceberg-rest-logo\"\n", + "
{REST:Catalog}
\n", + "\n", + "## View the Tables in the catalog 👀\n", + "\n", + "Now that we've created a table, let's look at the table metadata in the REST catalog to have a quick refresher at the table metadata highlights." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c5a1defa-245c-42d9-9702-09c35bef2ab4", + "metadata": {}, + "outputs": [], + "source": [ + "from json import loads, dumps\n", + "from IPython.display import JSON\n", + "\n", + "#call the rest api using curl and asign back to IPython SList\n", + "taxis_table_meta=!curl -s http://rest:8181/v1/namespaces/nyc/tables/taxis\n", + "\n", + "#parse table metadata payload into Python dictionary and print payload\n", + "table_meta_dict=loads(taxis_table_meta.spstr)\n", + "print(dumps(table_meta_dict, indent=2)[0:600], \"\\n ...\", \"\\n}\")\n", + "#print(dumps(table_meta_dict, indent=2))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "afb4cd97-5fc6-4096-b793-d9bd9c19721e", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "metadata_location=table_meta_dict[\"metadata-location\"]\n", + "metadata=table_meta_dict[\"metadata\"]\n", + "\n", + "\n", + "#display summary table metadata in an interactive json view\n", + "JSON(\n", + " {\n", + " \"metadata-location\": metadata_location,\n", + " \"location\": metadata[\"location\"],\n", + " \"table-uuid\": metadata[\"table-uuid\"],\n", + " \"schemas\":[{\n", + " \"schema-id\": schema[\"schema-id\"],\n", + " \"fields\": \", \".join([\n", + " \"::\".join([ \n", + " str(field[\"id\"]),\n", + " field[\"name\"],\n", + " field[\"type\"],\n", + " \"required\" if field[\"required\"] else \"nullable\"\n", + " ]) \n", + " for field in schema['fields']\n", + " ]),\n", + " \"current-schema\": \"✳️\" if schema[\"schema-id\"] == metadata[\"current-schema-id\"] else \"false\"\n", + " } for schema in metadata[\"schemas\"]],\n", + " \"last-sequence-number\": metadata[\"last-sequence-number\"],\n", + " \"snapshots\":[ {\n", + " \"sequence-number\": snapshot[\"sequence-number\"],\n", + " \"snapshot-id\": snapshot[\"snapshot-id\"],\n", + " \"summary\": str(snapshot['summary']),\n", + " \"manifest-list\": snapshot['manifest-list'],\n", + " \"schema-id\": snapshot['schema-id'],\n", + " \"refs\": str([\"{}::{}\".format(name, ref[\"type\"]) for name, ref in metadata[\"refs\"].items() if ref[\"snapshot-id\"]==snapshot[\"snapshot-id\"]]),\n", + " \"current-snapshot\": \"✳️\" if snapshot[\"snapshot-id\"] == metadata[\"current-snapshot-id\"] else \"false\"\n", + " } for snapshot in metadata[\"snapshots\"]],\n", + " \"snapshot-log\": [str(log) for log in metadata[\"snapshot-log\"]],\n", + " \"metadata-log\": [str(log) for log in metadata[\"metadata-log\"]],\n", + " \"statistics\": str(metadata[\"statistics\"]),\n", + " \"partition-statistics\": str(metadata[\"partition-statistics\"])\n", + " },\n", + " root='.table-metadata',\n", + " expanded=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "6602918c-1080-4a9b-ae34-fc909365c01e", + "metadata": {}, + "source": [ + "## Query `nyc.taxis` from Trino" + ] + }, + { + "cell_type": "markdown", + "id": "1655ebdd-e169-4591-8fbc-cbe96b6ff9a0", + "metadata": {}, + "source": [ + "\"trino-logo\"\n", + "\n", + "To verify the shared table representation of Iceberg is also read correctly by Trino, let's run the same original query ran in spark, as well as, a query that pulls out specific fields and makes assumptions about types based on the schema we saw above. This is nothing new, just important to understand what the Iceberg table metadata offers us with interoperability." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01bc8f3a-4ffa-4db1-b410-9d910a4114dd", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Markdown, display\n", + "\n", + "# Run using the Trino CLI since %%sql magic is from pySpark and only connects to Spark\n", + "trino_out=!(trino --server='http://trino-iceberg:8080/iceberg' \\\n", + "--output-format='MARKDOWN' \\\n", + "--execute=\"\\\n", + "SELECT \\\n", + " * \\\n", + "FROM \\\n", + " nyc.taxis \\\n", + "LIMIT 10\") 2> /dev/null\n", + "\n", + "display(Markdown(trino_out.nlstr))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "36a8c13d-bb13-4567-83f1-5b5514bb52cc", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Markdown, display\n", + "\n", + "# Run using the Trino CLI since %%sql magic is from pySpark and only connects to Spark\n", + "trino_out=!(trino --server='http://trino-iceberg:8080' \\\n", + "--output-format='MARKDOWN' \\\n", + "--execute=\"\\\n", + "SELECT \\\n", + " vendorid, \\\n", + " format_datetime(tpep_pickup_datetime, 'YYYY-DDD HH:mm') AS pickup, /* Trino timestamp type */\\\n", + " format_datetime(tpep_dropoff_datetime, 'YYYY-DDD HH:mm') AS dropoff, /* Trino timestamp type */\\\n", + " CAST(passenger_count AS INT) AS num_riders, /* Trino Numeric type */\\\n", + " trip_distance AS distance, \\\n", + " PULocationID AS pickup_loc_id, \\\n", + " DOLocationID AS dropoff_loc_id, \\\n", + " payment_type AS pay_type, \\\n", + " fare_amount AS base, \\\n", + " format('%.2f%%', mta_tax) AS tax, /* Trino DOUBLE type */\\\n", + " tip_amount AS tip, \\\n", + " total_amount AS total \\\n", + "FROM \\\n", + " iceberg.nyc.taxis \\\n", + "LIMIT 10\") 2> /dev/null\n", + "\n", + "display(Markdown(trino_out.nlstr))" + ] + }, + { + "cell_type": "markdown", + "id": "fd854d56-33d5-46a5-b552-869479b8e188", + "metadata": {}, + "source": [ + "\"spark-logo\"\n", + "\n", + "## Create a view\n", + "\n", + "Let's create an Iceberg view to look at the longest distances travelled and the total amount of the trips." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8fade1a3", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "CREATE VIEW\n", + " nyc.long_distances (\n", + " vendor_id COMMENT 'Vendor ID',\n", + " pickup_date,\n", + " dropoff_date,\n", + " distance COMMENT 'Trip Distance',\n", + " total COMMENT 'Total amount'\n", + " ) AS\n", + "SELECT\n", + " VendorID,\n", + " tpep_pickup_datetime,\n", + " tpep_dropoff_datetime,\n", + " trip_distance,\n", + " total_amount\n", + "FROM\n", + " nyc.taxis\n", + "ORDER BY\n", + " trip_distance" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cfee5d8f-f862-4aa3-a096-8ff9ea66ba26", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SELECT\n", + " *\n", + "FROM\n", + " nyc.long_distances\n", + "LIMIT\n", + " 10" + ] + }, + { + "cell_type": "markdown", + "id": "6fce6bb4", + "metadata": {}, + "source": [ + "## Update View to order results differently\n", + "\n", + "The output isn't as helpful as imagined, so let's update the view and change the order of columns and the ordering of the results." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "74c10267-d65b-4650-ab92-02a978f5872a", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "CREATE\n", + "OR REPLACE VIEW nyc.long_distances (\n", + " distance COMMENT 'Trip Distance',\n", + " total COMMENT 'Total amount',\n", + " vendor_id COMMENT 'Vendor ID',\n", + " pickup_date,\n", + " dropoff_date\n", + ") AS\n", + "SELECT\n", + " trip_distance,\n", + " total_amount,\n", + " VendorID,\n", + " tpep_pickup_datetime,\n", + " tpep_dropoff_datetime\n", + "FROM\n", + " nyc.taxis\n", + "WHERE\n", + " trip_distance > 35\n", + "ORDER BY\n", + " total_amount,\n", + " trip_distance" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0e764a28-297f-4c8d-87dc-45ae63380d6e", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SELECT\n", + " *\n", + "FROM\n", + " nyc.long_distances\n", + "LIMIT \n", + " 10" + ] + }, + { + "cell_type": "markdown", + "id": "7170744c-4567-4733-a584-328961ec4722", + "metadata": {}, + "source": [ + "\"iceberg-rest-logo\"\n", + "
{REST:Catalog}
\n", + "\n", + "## View the Views in the catalog 👀\n", + "\n", + "Now that we've both created and replaced a view, let's look at the view metadata in the REST catalog and compare it with table metadata we saw before. It's important to notice both the differences and similarities of what data is represented." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "290ca154-5f08-4cd2-b133-91214e2ad1ff", + "metadata": {}, + "outputs": [], + "source": [ + "from json import loads, dumps\n", + "from IPython.display import JSON\n", + "\n", + "#call the rest api using curl and asign back to IPython SList\n", + "view_meta=!curl -s http://rest:8181/v1/namespaces/nyc/views/long_distances\n", + "\n", + "#parse table metadata payload into Python dictionary and print payload\n", + "view_meta_dict=loads(view_meta.spstr)\n", + "print(dumps(view_meta_dict, indent=2)[0:600], \"\\n ...\", \"\\n}\")\n", + "#print(dumps(view_meta_dict, indent=2))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5a584ae9-c3f3-43c5-8e71-4d04f7c737c5", + "metadata": {}, + "outputs": [], + "source": [ + "#get basic view metadata\n", + "metadata_location=view_meta_dict[\"metadata-location\"]\n", + "metadata=view_meta_dict[\"metadata\"]\n", + "\n", + "current_schema_id = next(version for version in metadata[\"versions\"] if version[\"version-id\"] == metadata[\"current-version-id\"])[\"schema-id\"]\n", + "\n", + "#display summary table metadata in an interactive json view\n", + "JSON(\n", + " {\n", + " \"metadata-location\": metadata_location,\n", + " \"location\": metadata[\"location\"],\n", + " \"view_uuid\": metadata[\"view-uuid\"],\n", + " \"schemas\":[{\n", + " \"schema-id\": schema[\"schema-id\"],\n", + " \"fields\": \", \".join([\n", + " \"::\".join([ \n", + " str(field[\"id\"]),\n", + " field[\"name\"],\n", + " field[\"type\"],\n", + " \"required\" if field[\"required\"] else \"nullable\"\n", + " ]) \n", + " for field in schema['fields']\n", + " ]),\n", + " \"current-schema\": \"✳️\" if schema[\"schema-id\"] == current_schema_id else \"false\"\n", + " } for schema in metadata[\"schemas\"]],\n", + " \"versions\":[ {\n", + " \"version-id\": version[\"version-id\"],\n", + " \"summary\": str(version['summary']),\n", + " \"default-namespace\": version['default-namespace'],\n", + " \"schema-id\": version['schema-id'],\n", + " \"representations\": version['representations'],\n", + " \"current-version\": \"✳️\" if version[\"version-id\"] == metadata[\"current-version-id\"] else \"false\"\n", + " } for version in metadata[\"versions\"]],\n", + " \"version-log\": [str(log) for log in metadata[\"version-log\"]]\n", + " },\n", + " root='.view-metadata',\n", + " expanded=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "4625a901-22ab-45d2-9dc8-6a7a118a7ca3", + "metadata": {}, + "source": [ + "\"trino-logo\"\n", + "\n", + "## Query `nyc.long_distances` view from Trino" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bc87f642-18b1-4387-b0e2-adf10154de49", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Markdown, display\n", + "\n", + "# Run using the Trino CLI since %%sql magic is from pySpark and only connects to Spark\n", + "trino_out=!(trino --server='http://trino-iceberg:8080/iceberg' \\\n", + "--output-format='MARKDOWN' \\\n", + "--execute=\"\\\n", + "SELECT \\\n", + " * \\\n", + "FROM \\\n", + " nyc.long_distances \\\n", + "LIMIT 10\")\n", + "\n", + "print('\\n'.join([line for line in trino_out.l if line.startswith('Query')]))" + ] + }, + { + "cell_type": "markdown", + "id": "451e0599-8c33-473e-b55d-6c970e6d57ef", + "metadata": { + "jp-MarkdownHeadingCollapsed": true + }, + "source": [ + "Although this seems like a bug, this is actually intentional based on the current discussion of a [the Trino Pull Request](https://github.com/trinodb/trino/pull/19818#discussion_r1400212612). This is a great point to discuss the meat of the differences between table and view representations.\n", + "\n", + "\n", + "## Fundamental difference between Iceberg Tables and Views\n", + "\n", + "### Table And View similarities\n", + "The [Iceberg Table Spec](https://iceberg.apache.org/spec/) and the [Iceberg View Spec](https://iceberg.apache.org/view-spec/) can be confusing to parse modeling differences when dealing directly with views and tables in SQL. This is because views are designed to provide the same experience as a table in SQL. Let's first talk about what properties and traits are shared between representations.\n", + "\n", + "* Both views and tables are associated with a schema, containing column ids, names, types, and if the column is nullable.\n", + "* They both store the **warehouse** and **namespace** to address the table/view in SQL.\n", + "* They both track schema changes over time.\n", + "* They both hold information that tells them where to find data, but what they store and how that information is used to find data is where these implementations diverge.\n", + "* Both use Iceberg's [optimistic concurrency](https://iceberg.apache.org/spec/#optimistic-concurrency) model with a catalog, updating a common [metadata location](https://iceberg.apache.org/view-spec/#metadata-location). This ensures that changes to views inherit the same gurantees provided to evolving the view definition as tables do.\n", + "\n", + "### Table Spec\n", + "Iceberg table metadata contain the information needed for any compute engine to correctly and performantly retrieve the data from storage. The metadata contains an abstract internal representation of the table metadata.\n", + "\n", + "* The actual location of the data files on disk and columnar ranges of each file to avoid reading files from storage that do not contain relevant information.\n", + "* The snapshots of committed data to a table over time.\n", + "* The partitioning schema over time to enable skipping the reads of irrelevant partitions of a table.\n", + "* The ordering of the data on disk.\n", + "\n", + "### View Spec\n", + "Iceberg views enable storing multiple dialects for the same view, though currently there is no mechanism in Iceberg or the spec that ensures multiple views are represented. A view in Iceberg holds the view query in the SQL dialect of the defining compute engine using these abstractions:\n", + "\n", + "* The [view representation](https://iceberg.apache.org/view-spec/#representations) contains the raw query stored in `sql`, the query syntax `dialect`, and the `type` of representation (currently limited to `\"sql\"`). It's worth noting that although not supported by any direct procedure, Iceberg views store multiple representations of the view in different dialects, which will enable different compute engines to access the most appropriate view for them and either manage the transpiling themselves, or run it natively if its a direct match.\n", + "* As opposed to being explicitely defined as with the `CREATE TABLE` statement, a view's schema is defined from the result set of the query. In some cases, the `CREATE VIEW` statement will allow you to add a schema to explicitely cast all fields to the same type.\n", + "* A [view version](https://iceberg.apache.org/view-spec/#versions) is the combination of the current schema, the current view representation, and a few more fields. Similar to snapshots, view versions have a log field that details when they've been updated.\n", + "\n", + "## Table vs View Guarantees\n", + "\n", + "Using schemas, both they ensure the data types adhere to the contract which leads to better data quality with this type checking. The catch is that views are tightly coupled to the compute engines' query syntax, the state of the compute engine, and typing systems to infer the correct schema. Unlike Postgres views which utilize a uniform typing system on a its own tables where it can validate type, the view behavior when reading Iceberg views with a SQL dialect outside of your own is up to the query engine based on how it implements error handling around corrupted or unsure states. We'll show below how Spark enables reading view dialects outside of Spark. Both of these approaches have their tradeoffs.\n", + "\n", + "Perhaps this doesn't give you concern you as you'll get an error message in most cases. There are more nefarious silent failures that can take place where view models lack full visibility of the schema and phsyical locations of tables. This issue is not Iceberg specific, but derives from using interoperable engines to storage layers.\n" + ] + }, + { + "cell_type": "markdown", + "id": "a3113859-da63-460a-a338-3c327da23068", + "metadata": {}, + "source": [ + "# Listing and describing views" + ] + }, + { + "cell_type": "markdown", + "id": "35bf8f88-a493-42c0-b7a9-3941f8ebf4c8", + "metadata": {}, + "source": [ + "## Create a `nyc.negative_amounts` view, this time in Trino\n", + "It appears that there are trips with negative total amounts. Let's display these results in a separate view" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aa47bf43-2460-4990-88df-6040897c3386", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Markdown, display\n", + "\n", + "# Run using the Trino CLI since %%sql magic is from pySpark and only connects to Spark\n", + "trino_out=!(trino --server='http://trino-iceberg:8080' \\\n", + "--catalog='iceberg'\\\n", + "--output-format='MARKDOWN' \\\n", + "--execute=\"\\\n", + "CREATE OR REPLACE VIEW nyc.negative_amounts AS\\\n", + "SELECT\\\n", + " total_amount,\\\n", + " trip_distance,\\\n", + " VendorID,\\\n", + " tpep_pickup_datetime,\\\n", + " tpep_dropoff_datetime\\\n", + "FROM\\\n", + " nyc.taxis\\\n", + "WHERE\\\n", + " total_amount < 0\\\n", + "ORDER BY\\\n", + " total_amount\\\n", + "\") 2> /dev/null\n", + "\n", + "display(Markdown(trino_out.nlstr))" + ] + }, + { + "cell_type": "markdown", + "id": "51661fd9-c859-492a-89e8-d585442eb9f4", + "metadata": {}, + "source": [ + "We should be able to query this view in Trino since Trino created this dialect. Let's first show the views in Trino. Trino does [not currently support a way to filter views versus tables](https://github.com/trinodb/trino/issues/2999#issuecomment-1127533014), so to verify which views will show, let's run a `SHOW TABLES` command, followed by a simple query of the new Trino view." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "70a2f57c-75b2-46a0-b421-bc58e2d58d0a", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Markdown, display\n", + "\n", + "# Run using the Trino CLI since %%sql magic is from pySpark and only connects to Spark\n", + "trino_out=!(trino --server='http://trino-iceberg:8080/iceberg' \\\n", + "--output-format='MARKDOWN' \\\n", + "--execute=\"SHOW TABLES IN nyc\") 2> /dev/null\n", + "\n", + "display(Markdown(trino_out.nlstr))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "698c7409-b711-4bbb-a1d4-2cc3530bb2b9", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Markdown, display\n", + "\n", + "# Run using the Trino CLI since %%sql magic is from pySpark and only connects to Spark\n", + "trino_out=!(trino --server='http://trino-iceberg:8080/iceberg' \\\n", + "--output-format='MARKDOWN' \\\n", + "--execute=\"\\\n", + "SELECT \\\n", + " * \\\n", + "FROM \\\n", + " nyc.negative_amounts \\\n", + "LIMIT 10\") 2> /dev/null\n", + "\n", + "display(Markdown(trino_out.nlstr))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5f824f50-c2d3-4de9-bc9f-eff2206b9551", + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.display import Markdown, display\n", + "\n", + "# Run using the Trino CLI since %%sql magic is from pySpark and only connects to Spark\n", + "trino_out=!(trino --server='http://trino-iceberg:8080/iceberg' \\\n", + "--output-format='MARKDOWN' \\\n", + "--execute=\"SHOW CREATE VIEW nyc.negative_amounts\")\n", + "\n", + "display(Markdown(trino_out.nlstr))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5a606415-a077-4ffc-8f56-48d75883aa0f", + "metadata": {}, + "outputs": [], + "source": [ + "from json import loads, dumps\n", + "from IPython.display import JSON\n", + "\n", + "#call the rest api using curl and asign back to IPython SList\n", + "view_meta=!curl -s http://rest:8181/v1/namespaces/nyc/views/negative_amounts\n", + "\n", + "#parse table metadata payload into Python dictionary and print payload\n", + "view_meta_dict=loads(view_meta.spstr)\n", + "#print(dumps(table_meta_dict, indent=2)[0:600], \"\\n ...\", \"\\n}\")\n", + "print(dumps(view_meta_dict, indent=2))" + ] + }, + { + "cell_type": "markdown", + "id": "65deb074", + "metadata": {}, + "source": [ + "# Listing and describing views" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bab64f90", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SHOW VIEWS IN nyc" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7eec58d7-e589-4386-a549-1db970950e0f", + "metadata": {}, + "outputs": [], + "source": [ + "spark.catalog.setCurrentCatalog(\"demo\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d6c6a306-1752-4a6d-9213-d5b615110b1d", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SELECT\n", + " *\n", + "FROM\n", + " nyc.negative_amounts\n", + "LIMIT 10" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dfc2dcb3-4717-4730-94b6-18b9a239cf74", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "DESCRIBE nyc.long_distances" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7653ef78-f419-462b-915b-0cbd9f62d473", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "DESCRIBE EXTENDED nyc.long_distances" + ] + }, + { + "cell_type": "markdown", + "id": "b11e64c9", + "metadata": {}, + "source": [ + "# Displaying the CREATE statement of a view" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1c4a942c", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SHOW\n", + "CREATE TABLE\n", + " nyc.long_distances" + ] + }, + { + "cell_type": "markdown", + "id": "42f6e042-00ce-4277-bf9a-16931f898d7b", + "metadata": {}, + "source": [ + "# Altering and displaying properties of a view\n", + "\n", + "This will add a new property and also update the comment of the view. \n", + "The comment will be shown when describing the view.\n", + "The end of this section will also remove a property from the view." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fa823a4c-ede3-40d7-906e-27818070fa9b", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SHOW TBLPROPERTIES nyc.long_distances" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9c3b2fb4-4db9-408f-a36d-84970108dd5b", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "ALTER VIEW nyc.long_distances\n", + "SET\n", + " TBLPROPERTIES (\n", + " 'key1' = 'val1',\n", + " 'key2' = 'val2',\n", + " 'comment' = 'This is a view comment'\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2a8dd804-d222-44e2-92b9-2069868e206a", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SHOW TBLPROPERTIES nyc.long_distances" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1950bd5d-a5fc-4ee1-a4a9-1242261232f0", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "DESCRIBE EXTENDED nyc.long_distances" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "88781989-0967-4349-b435-ad193c9697e7", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "ALTER VIEW nyc.long_distances UNSET TBLPROPERTIES ('key1')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e6864599-b4fa-4525-8304-f1cb3ee7144a", + "metadata": {}, + "outputs": [], + "source": [ + "%%sql\n", + "SHOW TBLPROPERTIES nyc.long_distances" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/trino-example/trino b/trino-example/trino new file mode 100755 index 0000000..f402a8d Binary files /dev/null and b/trino-example/trino differ diff --git a/trino-example/trino-410-views.tar.gz b/trino-example/trino-410-views.tar.gz new file mode 100644 index 0000000..ae7100a --- /dev/null +++ b/trino-example/trino-410-views.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:a7f72737f1f80320be62dfa4c74fd350f1c0bd0e5cd923f8cff931b45b46ee04 +size 1064017792