Skip to content

Commit

Permalink
HPCC4J-520 Update Spark-HPCC Connector examples
Browse files Browse the repository at this point in the history
- Removed outdated examples
- Added PySpark example Juptyer Notebooks example

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Jul 17, 2023
1 parent fa278e5 commit 04c28d3
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 326 deletions.
243 changes: 243 additions & 0 deletions Examples/PySparkExample.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "497761f5",
"metadata": {},
"source": [
"# Spark-HPCC Connector for HPCC Systems Platform and Spark Connectivity\n",
"\n",
"This example demonstrates how to use the Spark-HPCC Connector to read and write data from / to HPCC Systems clusters, as well as providing basic setup information for the Spark-HPCC connector.\n",
"\n",
"## Spark-HPCC Connector Installation:\n",
"\n",
"---\n",
"\n",
"The Spark-HPCC Connector jar and its dependencies need to be made available to all Spark worker nodes and the Spark driver application. This can be done by adding the Spark-HPCC connector jar to the classpath on every node in the Spark cluster and to the classpath for the Spark driver, or by using the ```--jars``` option when executing spark-submit or pyspark.\n",
"\n",
"Download the Spark-HPCC jar with dependencies from Maven Central: https://mvnrepository.com/artifact/org.hpccsystems/spark-hpcc\n",
"\n",
"### Example of using the jars option:\n",
"```\n",
"pyspark --jars spark-hpcc-9.2.2-1-jar-with-dependencies.jar\n",
"```\n",
"\n",
"### Adding Spark-HPCC jar to classpath\n",
"The Spark-HPCC jar can also be added to the classpath through various means depending on the configuration of your Spark cluster, more information about updating the classpath can be found within the Spark documentation: https://spark.apache.org/docs/latest/configuration.html"
]
},
{
"cell_type": "markdown",
"id": "eb1182be",
"metadata": {},
"source": [
"# Creating a test dataset\n",
"\n",
"The following code will create a dataframe with two columns, key and fill, that will be used to demonstrate the reading and writing functionality of the Spark-HPCC connector.\n",
"\n",
"---"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "7103a826",
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"import random"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "44c6d7e4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+----------+\n",
"|key| fill|\n",
"+---+----------+\n",
"| 0|2093710133|\n",
"| 1|8298950336|\n",
"| 2|8184283203|\n",
"| 3|7991449698|\n",
"| 4|2230822419|\n",
"| 5|6088498312|\n",
"| 6|2125960683|\n",
"| 7|9243328381|\n",
"| 8|6184681638|\n",
"| 9|6103701586|\n",
"| 10|1113644174|\n",
"| 11|6422865225|\n",
"| 12|3522318506|\n",
"| 13|5734827156|\n",
"| 14|7946567617|\n",
"| 15|6700616122|\n",
"| 16|5306580724|\n",
"| 17|9696286149|\n",
"| 18|4157652341|\n",
"| 19|3429216958|\n",
"+---+----------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"data = [(i, int(1e10 * random.random())) for i in range(1000)]\n",
"df = spark.createDataFrame(data, [\"key\", \"fill\"])\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "2668405b",
"metadata": {},
"source": [
"# Writing Data to HPCC Systems\n",
"\n",
"---\n",
"\n",
"A Spark Dataframe can be written to HPCC using the Spark DataSource API.\n",
"- **Mode**: This is the Spark SaveMode, the Spark-HPCC Connector supports: *[ErrorIfExists, Ignore, Overwrite]*\n",
" - Defaults to ErrorIfExists\n",
"- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n",
"- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n",
"- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n",
"- **Path**: The file path for the dataset within the HPCC Systems cluster\n",
"- **Compression**: The compression algorithm to use when writing the file to the HPCC Systems cluster.\n",
" - Options: *[default, none, lz4, flz, lzw]*\n"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "05ba80cb",
"metadata": {},
"outputs": [],
"source": [
"df.write.save(format=\"hpcc\",\n",
" mode=\"overwrite\",\n",
" host=\"http://127.0.0.1:8010\",\n",
" username=\"\",\n",
" password=\"\",\n",
" cluster=\"mythor\",\n",
" path=\"spark::test::dataset\",\n",
" compression=\"default\")"
]
},
{
"cell_type": "markdown",
"id": "1c4d4c9f",
"metadata": {},
"source": [
"# Reading Data from HPCC Systems\n",
"\n",
"---\n",
"\n",
"A dataset from within an HPCC Systems cluster can be read via the Spark Datasource API.\n",
"\n",
"- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n",
"- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n",
"- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n",
"- **Path**: The file path for the dataset within the HPCC Systems cluster\n",
"- **limitPerFilePart**: *Optional* Limit on the number of records to be read per file part / partition within the HPCC Systems dataset.\n",
"- **projectList**: *Optional* The columns that should be read from the HPCC Systems dataset.\n"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "e8d49d8f",
"metadata": {},
"outputs": [],
"source": [
"readDf = spark.read.load(format=\"hpcc\",\n",
" host=\"http://127.0.0.1:8010\",\n",
" username=\"\",\n",
" password=\"\",\n",
" cluster=\"mythor\",\n",
" path=\"spark::test::dataset\",\n",
" limitPerFilePart=100,\n",
" projectList=\"key, fill\")"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "c16a758c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+----------+\n",
"|key| fill|\n",
"+---+----------+\n",
"| 0|2093710133|\n",
"| 1|8298950336|\n",
"| 2|8184283203|\n",
"| 3|7991449698|\n",
"| 4|2230822419|\n",
"| 5|6088498312|\n",
"| 6|2125960683|\n",
"| 7|9243328381|\n",
"| 8|6184681638|\n",
"| 9|6103701586|\n",
"| 10|1113644174|\n",
"| 11|6422865225|\n",
"| 12|3522318506|\n",
"| 13|5734827156|\n",
"| 14|7946567617|\n",
"| 15|6700616122|\n",
"| 16|5306580724|\n",
"| 17|9696286149|\n",
"| 18|4157652341|\n",
"| 19|3429216958|\n",
"+---+----------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"readDf.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6186315a",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
33 changes: 0 additions & 33 deletions Examples/pom.xml

This file was deleted.

Loading

0 comments on commit 04c28d3

Please sign in to comment.