Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ezspa 718 fixed #170

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import sys
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test DF + pandas + udf App") \
.getOrCreate()


def multiply_func(a, b):
return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())
x = pd.Series([1, 2, 3])
pd_result = multiply_func(x, x).to_json()

df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
df_result = df.select(multiply(col("x"), col("x")))

df_result.explain()
df_result.show()

spark.stop()
19 changes: 19 additions & 0 deletions examples/spark-3.2.0/apps/src/main/python/dataframe_with_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import sys
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test DF + arrow App") \
.getOrCreate()

pdf = pd.DataFrame(np.arange(20).reshape(4,5))
df = spark.createDataFrame(pdf)
result_pdf = df.select("*")

result_pdf.explain()
print(result_pdf.toPandas())

spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import sys
from pyspark.sql import SparkSession

from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test AFT Survival Regression App") \
.getOrCreate()

training = spark.createDataFrame([
(1.218, 1.0, Vectors.dense(1.560, -0.605)),
(2.949, 0.0, Vectors.dense(0.346, 2.158)),
(3.627, 0.0, Vectors.dense(1.380, 0.231)),
(0.273, 1.0, Vectors.dense(0.520, 1.151)),
(4.199, 0.0, Vectors.dense(0.795, -0.226)),
], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(
quantileProbabilities=quantileProbabilities,
quantilesCol="quantiles",
)

model = aft.fit(training)
df = model.transform(training).select('prediction')
df.explain()
df.collect()

spark.stop()
61 changes: 61 additions & 0 deletions examples/spark-3.2.0/apps/src/main/python/ml/cross_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import sys
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test Cross Validation App") \
.getOrCreate()

training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

paramGrid = ParamGridBuilder().addGrid(
hashingTF.numFeatures, [10, 100, 1000]
).addGrid(lr.regParam, [0.1, 0.01]).build()

crossval = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=2,
)

cvModel = crossval.fit(training)

test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(7, "apache hadoop"),
], ["id", "text"])

prediction = cvModel.transform(test)
selected = prediction.select("prediction")
selected.explain()
selected.show()

spark.stop()
30 changes: 30 additions & 0 deletions examples/spark-3.2.0/apps/src/main/python/ml/df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import sys
from pyspark.sql import SparkSession

from pyspark.mllib.stat import Statistics
from pyspark.mllib.util import MLUtils

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test Data Frame ML App") \
.getOrCreate()

input_path = sys.argv[1]
df = spark.read \
.format("libsvm") \
.load(input_path)

features = MLUtils.convertVectorColumnsFromML(
df, "features"
).select("features")

features.explain()

summary = Statistics.colStats(features.rdd.map(lambda r: r.features))

summary.mean() \
.tolist()

spark.stop()

34 changes: 34 additions & 0 deletions examples/spark-3.2.0/apps/src/main/python/ml/kmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import sys
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test KMeans ML App") \
.getOrCreate()

dataset = spark.read.format("libsvm").load(sys.argv[1])

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

predictions.explain()

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()

print(centers)

spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
import sys


if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test Logistic Regression ML App") \
.getOrCreate()

training = spark.read.format("libsvm").load(
sys.argv[1]
)

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lrModel = lr.fit(training)
trainingSummary = lrModel.summary
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()

bestThreshold = fMeasure.where(
fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']
).select('threshold')

bestThreshold.explain()
bestThreshold.show()

spark.stop()
20 changes: 20 additions & 0 deletions examples/spark-3.2.0/apps/src/main/python/pandas_group_by.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import sys
from pyspark.sql import SparkSession

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Test Pandas App") \
.getOrCreate()

df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
df2 = spark.createDataFrame([(1, 1)], ("column", "value"))

res = df1.groupby("COLUMN").cogroup(
df2.groupby("COLUMN")
).applyInPandas(lambda r, l: r + l, df1.schema)

res.explain()
res.show()

spark.stop()
50 changes: 50 additions & 0 deletions examples/spark-3.2.0/crs/gpu/ml/aft-survival-regression.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: "sparkoperator.hpe.com/v1beta2"
kind: SparkApplication
metadata:
name: gpu-pyspark-atf
namespace: sampletenant
spec:
sparkConf:
# Note: If you are executing the application as a K8 user that MapR can verify,
# you do not need to specify a spark.mapr.user.secret
spark.mapr.user.secret: ${your secret}

# Enabling RAPIDs plugin
spark.plugins: "com.nvidia.spark.SQLPlugin"
spark.rapids.sql.enabled: "true"

# GPU allocation and discovery settings
spark.task.resource.gpu.amount: "1"
spark.executor.resource.gpu.amount: "1"
spark.executor.resource.gpu.vendor: "nvidia.com"
spark.executor.resource.gpu.discoveryScript: "/opt/mapr/spark/spark-3.2.0/examples/src/main/scripts/getGpusResources.sh"

type: Scala
sparkVersion: 3.2.0
mode: cluster
image: gcr.io/mapr-252711/spark-3.2.0:202202161825P150-gpu
imagePullPolicy: Always
mainApplicationFile: "maprfs:///${path to file}/aft_survival_regression.py"
restartPolicy:
type: Never
imagePullSecrets:
- imagepull
driver:
cores: 1
coreLimit: "1000m"
memory: "512m"
labels:
version: 3.2.0
# Note: You do not need to specify a serviceAccount
# it will be auto-generated referencing the pre-existing "hpe-<namespace>"
serviceAccount: hpe-sampletenant
executor:
cores: 1
coreLimit: "1000m"
instances: 1
memory: "2G"
gpu:
name: "nvidia.com/gpu"
quantity: 1
labels:
version: 3.2.0
50 changes: 50 additions & 0 deletions examples/spark-3.2.0/crs/gpu/ml/cross-validator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: "sparkoperator.hpe.com/v1beta2"
kind: SparkApplication
metadata:
name: gpu-pyspark-cross-validator
namespace: sampletenant
spec:
sparkConf:
# Note: If you are executing the application as a K8 user that MapR can verify,
# you do not need to specify a spark.mapr.user.secret
spark.mapr.user.secret: ${your secret}

# Enabling RAPIDs plugin
spark.plugins: "com.nvidia.spark.SQLPlugin"
spark.rapids.sql.enabled: "true"

# GPU allocation and discovery settings
spark.task.resource.gpu.amount: "1"
spark.executor.resource.gpu.amount: "1"
spark.executor.resource.gpu.vendor: "nvidia.com"
spark.executor.resource.gpu.discoveryScript: "/opt/mapr/spark/spark-3.2.0/examples/src/main/scripts/getGpusResources.sh"

type: Scala
sparkVersion: 3.2.0
mode: cluster
image: gcr.io/mapr-252711/spark-3.2.0:202202161825P150-gpu
imagePullPolicy: Always
mainApplicationFile: "maprfs:///${path to file}/cross_validator.py"
restartPolicy:
type: Never
imagePullSecrets:
- imagepull
driver:
cores: 1
coreLimit: "1000m"
memory: "512m"
labels:
version: 3.2.0
# Note: You do not need to specify a serviceAccount
# it will be auto-generated referencing the pre-existing "hpe-<namespace>"
serviceAccount: hpe-sampletenant
executor:
cores: 1
coreLimit: "1000m"
instances: 1
memory: "2G"
gpu:
name: "nvidia.com/gpu"
quantity: 1
labels:
version: 3.2.0
Loading