diff --git a/Examples/PySparkExample.ipynb b/Examples/PySparkExample.ipynb new file mode 100644 index 00000000..b4267d0c --- /dev/null +++ b/Examples/PySparkExample.ipynb @@ -0,0 +1,243 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "497761f5", + "metadata": {}, + "source": [ + "# Spark-HPCC Connector for HPCC Systems Platform and Spark Connectivity\n", + "\n", + "This example demonstrates how to use the Spark-HPCC Connector to read and write data from / to HPCC Systems clusters, as well as providing basic setup information for the Spark-HPCC connector.\n", + "\n", + "## Spark-HPCC Connector Installation:\n", + "\n", + "---\n", + "\n", + "The Spark-HPCC Connector jar and its dependencies need to be made available to all Spark worker nodes and the Spark driver application. This can be done by adding the Spark-HPCC connector jar to the classpath on every node in the Spark cluster and to the classpath for the Spark driver, or by using the ```--jars``` option when executing spark-submit or pyspark.\n", + "\n", + "Download the Spark-HPCC jar with dependencies from Maven Central: https://mvnrepository.com/artifact/org.hpccsystems/spark-hpcc\n", + "\n", + "### Example of using the jars option:\n", + "```\n", + "pyspark --jars spark-hpcc-9.2.2-1-jar-with-dependencies.jar\n", + "```\n", + "\n", + "### Adding Spark-HPCC jar to classpath\n", + "The Spark-HPCC jar can also be added to the classpath through various means depending on the configuration of your Spark cluster, more information about updating the classpath can be found within the Spark documentation: https://spark.apache.org/docs/latest/configuration.html" + ] + }, + { + "cell_type": "markdown", + "id": "eb1182be", + "metadata": {}, + "source": [ + "# Creating a test dataset\n", + "\n", + "The following code will create a dataframe with two columns, key and fill, that will be used to demonstrate the reading and writing functionality of the Spark-HPCC connector.\n", + "\n", + "---" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "7103a826", + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "import random" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "44c6d7e4", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----------+\n", + "|key| fill|\n", + "+---+----------+\n", + "| 0|2093710133|\n", + "| 1|8298950336|\n", + "| 2|8184283203|\n", + "| 3|7991449698|\n", + "| 4|2230822419|\n", + "| 5|6088498312|\n", + "| 6|2125960683|\n", + "| 7|9243328381|\n", + "| 8|6184681638|\n", + "| 9|6103701586|\n", + "| 10|1113644174|\n", + "| 11|6422865225|\n", + "| 12|3522318506|\n", + "| 13|5734827156|\n", + "| 14|7946567617|\n", + "| 15|6700616122|\n", + "| 16|5306580724|\n", + "| 17|9696286149|\n", + "| 18|4157652341|\n", + "| 19|3429216958|\n", + "+---+----------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "data = [(i, int(1e10 * random.random())) for i in range(1000)]\n", + "df = spark.createDataFrame(data, [\"key\", \"fill\"])\n", + "df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "2668405b", + "metadata": {}, + "source": [ + "# Writing Data to HPCC Systems\n", + "\n", + "---\n", + "\n", + "A Spark Dataframe can be written to HPCC using the Spark DataSource API.\n", + "- **Mode**: This is the Spark SaveMode, the Spark-HPCC Connector supports: *[ErrorIfExists, Ignore, Overwrite]*\n", + " - Defaults to ErrorIfExists\n", + "- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n", + "- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n", + "- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n", + "- **Path**: The file path for the dataset within the HPCC Systems cluster\n", + "- **Compression**: The compression algorithm to use when writing the file to the HPCC Systems cluster.\n", + " - Options: *[default, none, lz4, flz, lzw]*\n" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "05ba80cb", + "metadata": {}, + "outputs": [], + "source": [ + "df.write.save(format=\"hpcc\",\n", + " mode=\"overwrite\",\n", + " host=\"http://127.0.0.1:8010\",\n", + " username=\"\",\n", + " password=\"\",\n", + " cluster=\"mythor\",\n", + " path=\"spark::test::dataset\",\n", + " compression=\"default\")" + ] + }, + { + "cell_type": "markdown", + "id": "1c4d4c9f", + "metadata": {}, + "source": [ + "# Reading Data from HPCC Systems\n", + "\n", + "---\n", + "\n", + "A dataset from within an HPCC Systems cluster can be read via the Spark Datasource API.\n", + "\n", + "- **Host**: The URL of an ESP running on the target HPCC Systems cluster.\n", + "- **Username / Password**: Credentials for an HPCC Systems cluster user, can be empty or null if security isn't enabled on the target cluster.\n", + "- **Cluster**: The name of the underlying Thor cluster storage plane, this will change based on the target HPCC Systems cluster configuration, but will default to \"mythor\" on bare-metal and \"data\" on containerized systems.\n", + "- **Path**: The file path for the dataset within the HPCC Systems cluster\n", + "- **limitPerFilePart**: *Optional* Limit on the number of records to be read per file part / partition within the HPCC Systems dataset.\n", + "- **projectList**: *Optional* The columns that should be read from the HPCC Systems dataset.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "e8d49d8f", + "metadata": {}, + "outputs": [], + "source": [ + "readDf = spark.read.load(format=\"hpcc\",\n", + " host=\"http://127.0.0.1:8010\",\n", + " username=\"\",\n", + " password=\"\",\n", + " cluster=\"mythor\",\n", + " path=\"spark::test::dataset\",\n", + " limitPerFilePart=100,\n", + " projectList=\"key, fill\")" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "c16a758c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+----------+\n", + "|key| fill|\n", + "+---+----------+\n", + "| 0|2093710133|\n", + "| 1|8298950336|\n", + "| 2|8184283203|\n", + "| 3|7991449698|\n", + "| 4|2230822419|\n", + "| 5|6088498312|\n", + "| 6|2125960683|\n", + "| 7|9243328381|\n", + "| 8|6184681638|\n", + "| 9|6103701586|\n", + "| 10|1113644174|\n", + "| 11|6422865225|\n", + "| 12|3522318506|\n", + "| 13|5734827156|\n", + "| 14|7946567617|\n", + "| 15|6700616122|\n", + "| 16|5306580724|\n", + "| 17|9696286149|\n", + "| 18|4157652341|\n", + "| 19|3429216958|\n", + "+---+----------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "readDf.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6186315a", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/Examples/pom.xml b/Examples/pom.xml deleted file mode 100644 index 44e71c9c..00000000 --- a/Examples/pom.xml +++ /dev/null @@ -1,33 +0,0 @@ - - 4.0.0 - org.hpccsystems - Spark_Examples - 0.0.1-SNAPSHOT - hpcc_spark_examples - HPCC Spark Examples - - src - - - maven-compiler-plugin - 3.7.0 - - 1.8 - 1.8 - - - - - - - org.apache.spark - spark-core_2.10 - [2.2.2,) - - - org.apache.spark - spark-mllib_2.10 - [2.2.2,) - - - diff --git a/Examples/src/main/ecl/IrisDS.ecl b/Examples/src/main/ecl/IrisDS.ecl deleted file mode 100644 index b1155038..00000000 --- a/Examples/src/main/ecl/IrisDS.ecl +++ /dev/null @@ -1,176 +0,0 @@ -/*############################################################################## - HPCC SYSTEMS software Copyright (C) 2021 HPCC Systems®. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -############################################################################## */ - -dsRecord := RECORD - REAL a1 ; - REAL a2 ; - REAL a3 ; - REAL a4 ; - REAL class; -END; - -EXPORT irisDS := DATASET([ -{5.1,3.5,1.4,0.2,0}, -{4.9,3.0,1.4,0.2,0}, -{4.7,3.2,1.3,0.2,0}, -{4.6,3.1,1.5,0.2,0}, -{5.0,3.6,1.4,0.2,0}, -{5.4,3.9,1.7,0.4,0}, -{4.6,3.4,1.4,0.3,0}, -{5.0,3.4,1.5,0.2,0}, -{4.4,2.9,1.4,0.2,0}, -{4.9,3.1,1.5,0.1,0}, -{5.4,3.7,1.5,0.2,0}, -{4.8,3.4,1.6,0.2,0}, -{4.8,3.0,1.4,0.1,0}, -{4.3,3.0,1.1,0.1,0}, -{5.8,4.0,1.2,0.2,0}, -{5.7,4.4,1.5,0.4,0}, -{5.4,3.9,1.3,0.4,0}, -{5.1,3.5,1.4,0.3,0}, -{5.7,3.8,1.7,0.3,0}, -{5.1,3.8,1.5,0.3,0}, -{5.4,3.4,1.7,0.2,0}, -{5.1,3.7,1.5,0.4,0}, -{4.6,3.6,1.0,0.2,0}, -{5.1,3.3,1.7,0.5,0}, -{4.8,3.4,1.9,0.2,0}, -{5.0,3.0,1.6,0.2,0}, -{5.0,3.4,1.6,0.4,0}, -{5.2,3.5,1.5,0.2,0}, -{5.2,3.4,1.4,0.2,0}, -{4.7,3.2,1.6,0.2,0}, -{4.8,3.1,1.6,0.2,0}, -{5.4,3.4,1.5,0.4,0}, -{5.2,4.1,1.5,0.1,0}, -{5.5,4.2,1.4,0.2,0}, -{4.9,3.1,1.5,0.1,0}, -{5.0,3.2,1.2,0.2,0}, -{5.5,3.5,1.3,0.2,0}, -{4.9,3.1,1.5,0.1,0}, -{4.4,3.0,1.3,0.2,0}, -{5.1,3.4,1.5,0.2,0}, -{5.0,3.5,1.3,0.3,0}, -{4.5,2.3,1.3,0.3,0}, -{4.4,3.2,1.3,0.2,0}, -{5.0,3.5,1.6,0.6,0}, -{5.1,3.8,1.9,0.4,0}, -{4.8,3.0,1.4,0.3,0}, -{5.1,3.8,1.6,0.2,0}, -{4.6,3.2,1.4,0.2,0}, -{5.3,3.7,1.5,0.2,0}, -{5.0,3.3,1.4,0.2,0}, -{7.0,3.2,4.7,1.4,1}, -{6.4,3.2,4.5,1.5,1}, -{6.9,3.1,4.9,1.5,1}, -{5.5,2.3,4.0,1.3,1}, -{6.5,2.8,4.6,1.5,1}, -{5.7,2.8,4.5,1.3,1}, -{6.3,3.3,4.7,1.6,1}, -{4.9,2.4,3.3,1.0,1}, -{6.6,2.9,4.6,1.3,1}, -{5.2,2.7,3.9,1.4,1}, -{5.0,2.0,3.5,1.0,1}, -{5.9,3.0,4.2,1.5,1}, -{6.0,2.2,4.0,1.0,1}, -{6.1,2.9,4.7,1.4,1}, -{5.6,2.9,3.6,1.3,1}, -{6.7,3.1,4.4,1.4,1}, -{5.6,3.0,4.5,1.5,1}, -{5.8,2.7,4.1,1.0,1}, -{6.2,2.2,4.5,1.5,1}, -{5.6,2.5,3.9,1.1,1}, -{5.9,3.2,4.8,1.8,1}, -{6.1,2.8,4.0,1.3,1}, -{6.3,2.5,4.9,1.5,1}, -{6.1,2.8,4.7,1.2,1}, -{6.4,2.9,4.3,1.3,1}, -{6.6,3.0,4.4,1.4,1}, -{6.8,2.8,4.8,1.4,1}, -{6.7,3.0,5.0,1.7,1}, -{6.0,2.9,4.5,1.5,1}, -{5.7,2.6,3.5,1.0,1}, -{5.5,2.4,3.8,1.1,1}, -{5.5,2.4,3.7,1.0,1}, -{5.8,2.7,3.9,1.2,1}, -{6.0,2.7,5.1,1.6,1}, -{5.4,3.0,4.5,1.5,1}, -{6.0,3.4,4.5,1.6,1}, -{6.7,3.1,4.7,1.5,1}, -{6.3,2.3,4.4,1.3,1}, -{5.6,3.0,4.1,1.3,1}, -{5.5,2.5,4.0,1.3,1}, -{5.5,2.6,4.4,1.2,1}, -{6.1,3.0,4.6,1.4,1}, -{5.8,2.6,4.0,1.2,1}, -{5.0,2.3,3.3,1.0,1}, -{5.6,2.7,4.2,1.3,1}, -{5.7,3.0,4.2,1.2,1}, -{5.7,2.9,4.2,1.3,1}, -{6.2,2.9,4.3,1.3,1}, -{5.1,2.5,3.0,1.1,1}, -{5.7,2.8,4.1,1.3,1}, -{6.3,3.3,6.0,2.5,2}, -{5.8,2.7,5.1,1.9,2}, -{7.1,3.0,5.9,2.1,2}, -{6.3,2.9,5.6,1.8,2}, -{6.5,3.0,5.8,2.2,2}, -{7.6,3.0,6.6,2.1,2}, -{4.9,2.5,4.5,1.7,2}, -{7.3,2.9,6.3,1.8,2}, -{6.7,2.5,5.8,1.8,2}, -{7.2,3.6,6.1,2.5,2}, -{6.5,3.2,5.1,2.0,2}, -{6.4,2.7,5.3,1.9,2}, -{6.8,3.0,5.5,2.1,2}, -{5.7,2.5,5.0,2.0,2}, -{5.8,2.8,5.1,2.4,2}, -{6.4,3.2,5.3,2.3,2}, -{6.5,3.0,5.5,1.8,2}, -{7.7,3.8,6.7,2.2,2}, -{7.7,2.6,6.9,2.3,2}, -{6.0,2.2,5.0,1.5,2}, -{6.9,3.2,5.7,2.3,2}, -{5.6,2.8,4.9,2.0,2}, -{7.7,2.8,6.7,2.0,2}, -{6.3,2.7,4.9,1.8,2}, -{6.7,3.3,5.7,2.1,2}, -{7.2,3.2,6.0,1.8,2}, -{6.2,2.8,4.8,1.8,2}, -{6.1,3.0,4.9,1.8,2}, -{6.4,2.8,5.6,2.1,2}, -{7.2,3.0,5.8,1.6,2}, -{7.4,2.8,6.1,1.9,2}, -{7.9,3.8,6.4,2.0,2}, -{6.4,2.8,5.6,2.2,2}, -{6.3,2.8,5.1,1.5,2}, -{6.1,2.6,5.6,1.4,2}, -{7.7,3.0,6.1,2.3,2}, -{6.3,3.4,5.6,2.4,2}, -{6.4,3.1,5.5,1.8,2}, -{6.0,3.0,4.8,1.8,2}, -{6.9,3.1,5.4,2.1,2}, -{6.7,3.1,5.6,2.4,2}, -{6.9,3.1,5.1,2.3,2}, -{5.8,2.7,5.1,1.9,2}, -{6.8,3.2,5.9,2.3,2}, -{6.7,3.3,5.7,2.5,2}, -{6.7,3.0,5.2,2.3,2}, -{6.3,2.5,5.0,1.9,2}, -{6.5,3.0,5.2,2.0,2}, -{6.2,3.4,5.4,2.3,2}, -{5.9,3.0,5.1,1.8,2}], dsRecord); - -dsname := '~samples::iris_sample'; - -OUTPUT(irisDS,,dsname,OVERWRITE); \ No newline at end of file diff --git a/Examples/src/main/scala/org/hpccsystesm/spark_examples/Dataframe_Iris_LR.scala b/Examples/src/main/scala/org/hpccsystesm/spark_examples/Dataframe_Iris_LR.scala deleted file mode 100644 index 236bd29c..00000000 --- a/Examples/src/main/scala/org/hpccsystesm/spark_examples/Dataframe_Iris_LR.scala +++ /dev/null @@ -1,57 +0,0 @@ -package main.scala.org.hpccsystesm.spark_examples -import org.hpccsystems.spark.HpccFile -import org.hpccsystems.spark.thor.RemapInfo -import org.apache.spark.sql.Dataset -import org.apache.spark.ml.feature.VectorAssembler -import org.apache.spark.ml.classification.LogisticRegression -import org.apache.spark.mllib.evaluation.MulticlassMetrics -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession -import scala.io.StdIn - -object Dataframe_Iris_LR { - def main(args: Array[String]) { - val hpcc_jar = StdIn.readLine("Full path of Spark-HPCC Jar: ") - val japi_jar = StdIn.readLine("Full path of JAPI Jar: ") - val jar_list = Array(hpcc_jar, japi_jar) - // Spark setup - val conf = new SparkConf().setAppName("Iris_Spark_HPCC") - conf.setMaster("local[2]") - val sparkHome = StdIn.readLine("Full path to Spark home: ") - conf.setSparkHome(sparkHome) - conf.setJars(jar_list) - val spark = SparkSession.builder().config(conf).getOrCreate() - val hpcc_protocol = StdIn.readLine("protocol: ") - val hpcc_ip = StdIn.readLine("ESP IP: ") - val hpcc_port = StdIn.readLine("port: ") - val hpcc_file = StdIn.readLine("File name: ") - val user = StdIn.readLine("user: ") - val pword = StdIn.readLine("password: ") - val nodes = StdIn.readLine("nodes: ") - val base_ip = StdIn.readLine("base ip: ") - - val espconn = new Connection(hpcc_protocol, hpcc_ip, hpcc_port); - espconn.setUserName(user); - espconn.setPassword(pword); - - val hpcc = new HpccFile(hpcc_file, espconn); - - if (!nodes.equals("") && !base_ip.equals("")) - hpcc.setClusterRemapInfo(new RemapInfo(Integer.parseInt(nodes), base_ip)); - - val my_df = hpcc.getDataframe(spark) - val assembler = new VectorAssembler() - assembler.setInputCols(Array("petal_length","petal_width", "sepal_length", "sepal_width")) - assembler.setOutputCol("features") - val iris_fv = assembler.transform(my_df).withColumnRenamed("class", "label") - val lr = new LogisticRegression() - val iris_model = lr.fit(iris_fv) - val with_preds = iris_model.transform(iris_fv) - val predictionAndLabel = with_preds.rdd.map( - r => (r.getDouble(r.fieldIndex("prediction")), - r.getDouble(r.fieldIndex("label")))) - val metrics = new MulticlassMetrics(predictionAndLabel) - println("Confusion matrix:") - println(metrics.confusionMatrix) - } -} diff --git a/Examples/src/main/scala/org/hpccsystesm/spark_examples/Iris_LR.scala b/Examples/src/main/scala/org/hpccsystesm/spark_examples/Iris_LR.scala deleted file mode 100644 index 681d4df3..00000000 --- a/Examples/src/main/scala/org/hpccsystesm/spark_examples/Iris_LR.scala +++ /dev/null @@ -1,60 +0,0 @@ -package org.hpccsystesm.spark_examples - -import org.hpccsystems.spark.HpccFile -import org.hpccsystems.spark.HpccRDD -import org.hpccsystems.spark.thor.RemapInfo -import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.evaluation.MulticlassMetrics -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.SparkConf -import org.apache.spark.SparkContext -import scala.io.StdIn - -object Iris_LR { - def main(args: Array[String]) { - val hpcc_jar = StdIn.readLine("Full path of Spark-HPCC Jar: ") - val japi_jar = StdIn.readLine("Full path of JAPI Jar: ") - val jar_list = Array(hpcc_jar, japi_jar) - // Spark setup - val conf = new SparkConf().setAppName("Iris_Spark_HPCC") - conf.setMaster("local[2]") - val sparkHome = StdIn.readLine("Full path to Spark home: ") - conf.setSparkHome(sparkHome) - conf.setJars(jar_list) - val sc = new SparkContext(conf) - val hpcc_protocol = StdIn.readLine("protocol: ") - val hpcc_ip = StdIn.readLine("ESP IP: ") - val hpcc_port = StdIn.readLine("port: ") - val hpcc_file = StdIn.readLine("File name: ") - val user = StdIn.readLine("user: ") - val pword = StdIn.readLine("password: ") - val nodes = StdIn.readLine("nodes: ") - val base_ip = StdIn.readLine("base ip: ") - - val espconn = new Connection(hpcc_protocol, hpcc_ip, hpcc_port); - espconn.setUserName(user); - espconn.setPassword(pword); - - val hpcc = new HpccFile(hpcc_file, espconn); - - if (!nodes.equals("") && !base_ip.equals("")) - hpcc.setClusterRemapInfo(new RemapInfo(Integer.parseInt(nodes), base_ip)); - - val myRDD = hpcc.getRDD(sc) - val names = new Array[String](4) - names(0) = "petal_length" - names(1) = "petal_width" - names(2) = "sepal_length" - names(3) = "sepal_width" - val lpRDD = myRDD.makeMLLibLabeledPoint("class", names) - val lr = new LogisticRegressionWithLBFGS().setNumClasses(3) - val iris_model = lr.run(lpRDD) - val predictionAndLabel = lpRDD.map {case LabeledPoint(label, features) => - val prediction = iris_model.predict(features) - (prediction, label) - } - val metrics = new MulticlassMetrics(predictionAndLabel) - println("Confusion matrix:") - println(metrics.confusionMatrix) - } -}