Skip to content

Commit

Permalink
[Docs] Add pyspark in spark-guide (lakesoul-io#414)
Browse files Browse the repository at this point in the history
1. add pyspark code guide
2. add call syntax for rollback and compaction in snapshot-manage.md and spark-api-docs.md

Signed-off-by: maosen <[email protected]>
Co-authored-by: maosen <[email protected]>
  • Loading branch information
moresun and maosen authored Jan 12, 2024
1 parent cb9f60f commit bb3c958
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class LakeSoulSparkSqlExtensionsParser(delegate: ParserInterface) extends Parser
lexer.addErrorListener(LakeSoulParseErrorListener)
val tokenStream = new CommonTokenStream(lexer)
val parser = new LakeSoulSqlExtensionsParser(tokenStream)
parser.addParseListener(IcebergSqlExtensionsPostProcessor)
parser.addParseListener(LakeSoulSqlExtensionsPostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(LakeSoulParseErrorListener)
try {
Expand Down Expand Up @@ -138,7 +138,7 @@ class LakeSoulSparkSqlExtensionsParser(delegate: ParserInterface) extends Parser
normalized.startsWith("call")
}
}
case object IcebergSqlExtensionsPostProcessor extends LakeSoulSqlExtensionsListener {
case object LakeSoulSqlExtensionsPostProcessor extends LakeSoulSqlExtensionsListener {

/** Remove the back ticks from an Identifier. */
override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = {
Expand Down
104 changes: 95 additions & 9 deletions website/docs/01-Getting Started/02-spark-guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,37 @@ LakeSoul | Spark Version
2.2.x-2.4.x |3.3.x
2.0.x-2.1.x| 3.1.x

### Spark Shell/SQL
### Spark Shell/SQL/PySpark

Run spark-shell/spark-sql with the `LakeSoulSparkSessionExtension` sql extension.
Run spark-shell/spark-sql/pyspark with the `LakeSoulSparkSessionExtension` sql extension.
<Tabs
defaultValue="Spark SQL"
values={[
{label: 'Spark SQL', value: 'Spark SQL'},
{label: 'Scala', value: 'Scala'},
]}>
<TabItem value="Spark SQL" label="Spark SQL" default>
{label: 'PySpark', value: 'PySpark'},
]}>
<TabItem value="Spark SQL" label="Spark SQL" default>

```bash
spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-2.5.0-spark-3.3.jar
```

</TabItem>
</TabItem>

<TabItem value="Scala" label="Scala">
<TabItem value="Scala" label="Scala">

```bash
spark-shell --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-2.5.0-spark-3.3.jar
```
</TabItem>
</TabItem>
<TabItem value="PySpark" label="PySpark">

```bash
wget https://github.com/lakesoul-io/LakeSoul/tree/main/python/lakesoul/spark/tables.py
pyspark --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --jars lakesoul-spark-2.5.0-spark-3.3.jar --py-files tables.py
```
</TabItem>

</Tabs>

Expand Down Expand Up @@ -109,6 +117,15 @@ spark.sql("SHOW TABLES")
```

</TabItem>
<TabItem value="PySpark" label="PySpark">

```python
// python
spark.sql("CREATE NAMESPACE IF NOT EXISTS lakesoul_namespace")
spark.sql("USE lakesoul_namespace")
spark.sql("SHOW TABLES")
```
</TabItem>
</Tabs>


Expand All @@ -135,7 +152,13 @@ spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) U
```

</TabItem>
<TabItem value="PySpark" label="PySpark">

```python
// python
spark.sql("CREATE TABLE lakesoul_table (id BIGINT, name STRING, `date` STRING) USING lakesoul PARTITIONED BY (`date`) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_table'")
```
</TabItem>
</Tabs>

### Primary Key Table
Expand All @@ -162,7 +185,13 @@ spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, da
```

</TabItem>
<TabItem value="PySpark" label="PySpark">

```python
// python
spark.sql("CREATE TABLE lakesoul_hash_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_hash_table' TBLPROPERTIES ( 'hashPartitions'='id', 'hashBucketNum'='2')")
```
</TabItem>
</Tabs>

### CDC Table
Expand All @@ -188,7 +217,13 @@ spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, dat
```

</TabItem>
<TabItem value="PySpark" label="PySpark">

```python
// python
spark.sql("CREATE TABLE lakesoul_cdc_table (id BIGINT NOT NULL, name STRING, date STRING) USING lakesoul PARTITIONED BY (date) LOCATION 'file:/tmp/lakesoul_namespace/lakesoul_cdc_table' TBLPROPERTIES('hashPartitions'='id', 'hashBucketNum'='2', 'lakesoul_cdc_change_column' = 'op')")
```
</TabItem>
</Tabs>


Expand Down Expand Up @@ -219,7 +254,17 @@ df.write.format("lakesoul").insertInto("lakesoul_table")
```
</TabItem>

<TabItem value="PySpark" label="PySpark">

```python
// python
from pyspark.sql.types import *
data = [(1,"Cathy","2024-01-02")]
schema = StructType([StructField("id", LongType(), False), StructField("name", StringType(), True), StructField("date", StringType(), False)])
df = spark.createDataFrame(data,schema=schema)
df.write.format("lakesoul").insertInto("lakesoul_table")
```
</TabItem>
</Tabs>
To append new data to a hash-partitioned table using Spark SQL, use Merge INTO.
Expand Down Expand Up @@ -266,7 +311,19 @@ LakeSoulTable.forPath(tablePath).upsert(dfUpsert)
```
</TabItem>

<TabItem value="PySpark" label="PySpark">
```python
// python
from pyspark.sql.types import *
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
df = spark.createDataFrame([(20201101, 1, 1), (20201101, 2, 2), (20201101, 3, 3), (20201102, 4, 4)],schema='range string,hash string,value string')
df.write.format("lakesoul").mode("overwrite").option("rangePartitions", "range").option("hashPartitions", "hash").option("hashBucketNum", 2).save(tablePath)
dfUpsert = spark.createDataFrame([(20201111, 1, 1), (20201111, 2, 2), (20201111, 3, 3), (20201112, 4, 4)],schema='range string,hash string,value string')
LakeSoulTable.forPath(spark,tablePath).upsert(dfUpsert)
```
</TabItem>
</Tabs>
Expand Down Expand Up @@ -294,7 +351,15 @@ LakeSoulTable.forPath(tablePath).updateExpr("id = 2", Seq(("name"->"'David'")).t
```
</TabItem>
<TabItem value="PySpark" label="PySpark">
```python
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
LakeSoulTable.forPath(spark,tablePath).update("hash = 4", { "value":"5"})
```
</TabItem>
</Tabs>
Expand All @@ -320,7 +385,15 @@ LakeSoulTable.forPath(tablePath).delete("id = 1 or id =2")
```
</TabItem>
<TabItem value="PySpark" label="PySpark">
```python
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
LakeSoulTable.forPath(spark,tablePath).delete("hash = 4")
```
</TabItem>
</Tabs>
## Query Data
Expand Down Expand Up @@ -354,7 +427,20 @@ LakeSoulTable.forName(tableName).toDF
```
</TabItem>
<TabItem value="PySpark" label="PySpark">
```python
// python
from tables import LakeSoulTable
tablePath = "file:/tmp/lakesoul_namespace/lakesoul_upsert_table"
// query data with LakeSoulTable API
LakeSoulTable.forPath(spark,tablePath).toDF().show()
// query data with DataFrameReader API
spark.read.format("lakesoul").load(tablePath).show()
```
</TabItem>
</Tabs>
## Time Travel Query
Expand Down
4 changes: 4 additions & 0 deletions website/docs/02-Tutorials/03-snapshot-manage.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ val lakeSoulTable = LakeSoulTable. forPath(tablePath)

//Rollback metadata and storage data partitioned as '2021-01-02' when the timestamp is less than or equal to and the closest to '2022-01-01 15:15:15'
lakeSoulTable.rollbackPartition("date='2022-01-02'", "2022-01-01 15:15:15")
//sql
spark.sql("call LakeSoulTable.rollback(partitionvalue=>map('date','2022-01-02'),toTime=>'2022-01-01 15:15:15',tablePath=>'" + tablePath + "')")
spark.sql("call LakeSoulTable.rollback(partitionvalue=>map('date','2022-01-02'),toTime=>'2022-01-01 15:15:15',tzoneId=>'Asia/Shanghai',tableName=>'lakesoul')")

```
The rollback operation itself will create a new snapshot version, while other version snapshots and data will not be deleted.

Expand Down
4 changes: 4 additions & 0 deletions website/docs/03-Usage Docs/03-spark-api-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ lakeSoulTable.compaction("date='2021-01-01'")
lakeSoulTable.compaction()
//compaction on all partitions, but only partitions meet the conditions will be performed
lakeSoulTable.compaction(false)
//spark sql
spark.sql("call LakeSoulTable.compaction(condition=>map('date','2021-01-01'),tablePath=>'"+tablePath+"')")
spark.sql("call LakeSoulTable.compaction(tableName=>'lakesoul_table_name')")

```

Expand All @@ -210,6 +213,7 @@ Since version 2.0, LakeSoul supports load partition into Hive after compaction.
import com.dmetasoul.lakesoul.tables.LakeSoulTable
val lakeSoulTable = LakeSoulTable.forName("lakesoul_test_table")
lakeSoulTable.compaction("date='2021-01-01'", "spark_catalog.default.hive_test_table")
spark.sql("call LakeSoulTable.compaction(tableName=>'lakesoul_table_name',hiveTableName=>'spark_catalog.default.hive_test_table',condition=>map('date','2021-01-01'))")
```

**Note** If `lakesoul` has been set as default catalog, Hive tables should be referenced with `spark_catalog` prefix.
Expand Down
Loading

0 comments on commit bb3c958

Please sign in to comment.