-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Collecting a dataframe do not work on a Spark cluster deployed by the radanalytics Operator #350
Comments
first off, thank you for such a detailed issue @vemonet unfortunately, i do not have much in the way of advice as i am not using these projects actively. in the past, when i have seen issues with if you haven't seen the radanalytics website tutorials, you might investigate those for some inspiration. although i don't think we have a dataframes example specifically, you will see several different application deployment styles and many of them are using pySpark. hope that helps =) edit: just an afterthought, you might need to expose some sort of service on your driver application. it's possible that the collected results are not returning properly to that process. although, i would expect to see something in the logs if that were the case. |
Thanks a lot for the pointers @elmiko ! I tried the piece of code to compute Pi from https://github.com/radanalyticsio/tutorial-sparkpi-python-flask/blob/master/app.py#L11 but it's also getting stuck as soon as Spark is used (
Thanks, that could be something to check! I am using a container based on the The problem is that it is not possible to find an example of a driver application image from the examples: https://github.com/radanalyticsio/tutorial-sparkpi-python-flask oc new-app --template oshinko-python-spark-build-dc \
-p APPLICATION_NAME=sparkpi \
-p GIT_URI=https://github.com/radanalyticsio/tutorial-sparkpi-python-flask.git If I search more about this I find more instructions: https://radanalytics.io/howdoi/use-python3-with-spark , but there are no links to the source code of the Dockerfile or OpenShift template used. It's only pushing me to install one more CLI tool So I am not sure where I should look at to understand what is actually needed for a driver application deployment It's quite frustrating because I have some experience with Dockerfiles, writing YAML files for Kubernetes, building OpenShift templates and Helm charts that works on different OpenShift and Kubernetes clusters (even if I am not a "sysadmin" or "devops" per say, bash commands and YAML files are easy to read and debug by most developers, they should not be hidden that much imo!) Any idea how/where I can find out the source for those deployments? (Dockerfile and OpenShift Template YAML file) Thanks a lot! |
@elmiko I was looking into the official Spark documentation for Kubernetes and just realized what you call the "driver application" is actually the "master" Spark node! https://spark.apache.org/docs/latest/running-on-kubernetes.html I thought you were talking about the container from which we run the python code that access the "master/driver" spark (in our case a JupyterLab container) The thing is that the |
Logs after starting the Spark clusterHere are the logs we get from the master when deploying a basic cluster with 1 master and 4 workers, before doing any operation on this cluster:
It seems that the master lose connections to the workers: ` 22/05/04 10:44:06 INFO Master: Removing worker worker-20220504083319-10.131.6.37-45597 on 10.131.6.37:45597
22/05/04 10:44:06 INFO Master: Telling app of lost worker: worker-20220504083319-10.131.6.37-45597
22/05/04 10:44:06 INFO Master: 10.128.2.30:37842 got disassociated, removing it. And the logs for a worker: Starting worker, will connect to: spark://spark-cluster:7077
Waiting for spark master to be available ...
Waiting for spark master to be available ...
Waiting for spark master to be available ...
Waiting for spark master to be available ...
22/05/04 08:33:19 INFO Worker: Started daemon with process name: 11@spark-cluster-w-sdpx9
22/05/04 08:33:19 INFO SignalUtils: Registered signal handler for TERM
22/05/04 08:33:19 INFO SignalUtils: Registered signal handler for HUP
22/05/04 08:33:19 INFO SignalUtils: Registered signal handler for INT
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark-distro/spark-3.0.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/05/04 08:33:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/04 08:33:19 INFO SecurityManager: Changing view acls to: 185
22/05/04 08:33:19 INFO SecurityManager: Changing modify acls to: 185
22/05/04 08:33:19 INFO SecurityManager: Changing view acls groups to:
22/05/04 08:33:19 INFO SecurityManager: Changing modify acls groups to:
22/05/04 08:33:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185); groups with view permissions: Set(); users with modify permissions: Set(185); groups with modify permissions: Set()
22/05/04 08:33:20 INFO Utils: Successfully started service 'sparkWorker' on port 45355.
22/05/04 08:33:20 INFO Worker: Starting Spark worker 10.129.3.77:45355 with 1 cores, 502.6 GiB RAM
22/05/04 08:33:20 INFO Worker: Running Spark version 3.0.1
22/05/04 08:33:20 INFO Worker: Spark home: /opt/spark
22/05/04 08:33:20 INFO ResourceUtils: ==============================================================
22/05/04 08:33:20 INFO ResourceUtils: Resources for spark.worker:
22/05/04 08:33:20 INFO ResourceUtils: ==============================================================
22/05/04 08:33:20 INFO log: Logging initialized @2386ms to org.sparkproject.jetty.util.log.Slf4jLog
22/05/04 08:33:20 INFO Server: jetty-9.4.z-SNAPSHOT; built: 2019-04-29T20:42:08.989Z; git: e1bc35120a6617ee3df052294e433f3a25ce7097; jvm 11.0.8+10-LTS
22/05/04 08:33:20 INFO Server: Started @2469ms
22/05/04 08:33:20 INFO AbstractConnector: Started ServerConnector@15cfb4c9{HTTP/1.1,[http/1.1]}{0.0.0.0:8081}
22/05/04 08:33:20 INFO Utils: Successfully started service 'WorkerUI' on port 8081.
22/05/04 08:33:20 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5a7f3cb8{/logPage,null,AVAILABLE,@Spark}
22/05/04 08:33:20 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3c8383fa{/logPage/json,null,AVAILABLE,@Spark}
22/05/04 08:33:20 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@1ca124a0{/,null,AVAILABLE,@Spark}
22/05/04 08:33:20 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@615b2a8e{/json,null,AVAILABLE,@Spark}
22/05/04 08:33:20 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@35a9bab3{/static,null,AVAILABLE,@Spark}
22/05/04 08:33:20 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@46372bf3{/log,null,AVAILABLE,@Spark}
22/05/04 08:33:20 INFO WorkerWebUI: Bound WorkerWebUI to 0.0.0.0, and started at http://spark-cluster-w-sdpx9:8081
22/05/04 08:33:20 INFO Worker: Connecting to master spark-cluster:7077...
22/05/04 08:33:20 INFO TransportClientFactory: Successfully created connection to spark-cluster/172.30.227.230:7077 after 48 ms (0 ms spent in bootstraps)
22/05/04 08:33:20 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7b2e445f{/metrics/json,null,AVAILABLE,@Spark}
22/05/04 08:33:20 INFO Worker: Successfully registered with master spark://10.128.5.178:7077
22/05/04 08:33:20 INFO Worker: WorkerWebUI is available at //proxy/worker-20220504083320-10.129.3.77-45355 Nothing noticeable for the worker, everything seems right Logs after running a basic Pi compute function on sparkHere is the code ran from a jupyter notebook in the same project as the Spark cluster exposed on a service named from pyspark.sql import SparkSession
spark = SparkSession.builder.master("spark://spark-cluster:7077").getOrCreate()
scale = 1
n = 100000 * scale
def f(_):
from random import random
x = random()
y = random()
return 1 if x ** 2 + y ** 2 <= 1 else 0
print("Start spark computation")
count = spark.sparkContext.parallelize(
range(1, n + 1), scale).map(f).reduce(lambda x, y: x + y)
print("Spark computation done")
spark.stop()
pi = 4.0 * count / n
print(pi) Master: 22/05/04 10:49:53 INFO Master: Removing executor app-20220504104812-0000/165 because it is EXITED
22/05/04 10:49:53 INFO Master: Launching executor app-20220504104812-0000/169 on worker worker-20220504083320-10.129.3.77-45355
22/05/04 10:49:54 INFO Master: Removing executor app-20220504104812-0000/166 because it is EXITED
22/05/04 10:49:54 INFO Master: Launching executor app-20220504104812-0000/170 on worker worker-20220504083320-10.131.0.53-36173
22/05/04 10:49:54 INFO Master: Removing executor app-20220504104812-0000/167 because it is EXITED
22/05/04 10:49:54 INFO Master: Launching executor app-20220504104812-0000/171 on worker worker-20220504083320-10.129.5.153-43871
22/05/04 10:49:55 INFO Master: Removing executor app-20220504104812-0000/168 because it is EXITED Worker: 22/05/04 10:48:53 INFO Worker: Executor app-20220504104812-0000/66 finished with state EXITED message Command exited with code 1 exitStatus 1
22/05/04 10:48:53 INFO ExternalShuffleBlockResolver: Clean up non-shuffle and non-RDD files associated with the finished executor 66
22/05/04 10:48:53 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20220504104812-0000, execId=66)
22/05/04 10:48:53 INFO Worker: Asked to launch executor app-20220504104812-0000/70 for pyspark-shell
22/05/04 10:48:53 INFO SecurityManager: Changing view acls to: 185
22/05/04 10:48:53 INFO SecurityManager: Changing modify acls to: 185
22/05/04 10:48:53 INFO SecurityManager: Changing view acls groups to:
22/05/04 10:48:53 INFO SecurityManager: Changing modify acls groups to:
22/05/04 10:48:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185); groups with view permissions: Set(); users with modify permissions: Set(185); groups with modify permissions: Set()
22/05/04 10:48:53 INFO ExecutorRunner: Launch command: "/usr/lib/jvm/java-11-openjdk-11.0.8.10-0.el8_2.x86_64/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=35163" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@jupyterhub-nb-vemonet:35163" "--executor-id" "70" "--hostname" "10.129.3.77" "--cores" "1" "--app-id" "app-20220504104812-0000" "--worker-url" "spark://[email protected]:45355" DiscussionFrom the logs it seems like the master have trouble to reach the workers But what can we do about it? All of those connections and creating the required services should be handled by the Because if the operator is not creating the required kubernetes objects/permissions to connect the master to workers on our OKD cluster, then there is no reason it magically creates those objects in other Kubernetes clusters Should we look into rewriting the code for the spark-operator? I see that it is currently in java https://github.com/radanalyticsio/spark-operator/tree/master/spark-operator which is not a good programming language for writing operator (or anything that needs to be secure and maintainable) |
Hi, I haven't tried spark 3.x with the operator, because I am also not active on the project, but I think the thing here is that you are trying to run your local application (that behaves like a driver) against a spark deployed in k8s. This is not going to work. You need to deploy everything to kubernetes. I can give you some links where I was able to make it work with pySpark running in Jupyter (no OpenDataHub):
in the code there are of course another place where the collect is called and works is in the CI - https://travis-ci.org/github/radanalyticsio/spark-operator/jobs/766277393#L3530 You were also complaining about not able to find the docker images, there are none, the image is produced as the result of s2i build.. the code is in this repo https://github.com/radanalyticsio/openshift-spark and I agree, that it's over-complicated and should have advocated for much simpler approach with plain dockerfiles right in the oprator's git repo. edit: actually I found this: https://github.com/radanalyticsio/openshift-spark/blob/master/openshift-spark-build/Dockerfile I think these are generated by some tool called gl, I don't have bandwidth to help you more |
+1 to what @jkremser is saying also, just to be clear
this is only partially correct and what Jirka is saying is an important distinction. the "master" and the "driver" are 2 separate things. it is a valid topology to create a master node, and then attach a driver to it. this is the model that the radanalytics tooling expects. by running the "driver" inside the "master" you can conjoined those 2 components. this can add complexity to the network topology.
i would examine it more thoroughly to look at how it could be customized or re-written. i also do not have the bandwidth to address this problem more deeply. when we had experienced these issues in the past, i needed to manually add services to all the workers so that i could expose their ports to the master node. this is why, as Jirka suggests, it's better to run thee workloads inside of openshift. you might find some inspiration in this blog post i wrote a few years ago. https://notes.elmiko.dev/2018/08/05/attaching-notebooks-with-radanalytics.html |
Thanks a lot @elmiko and @jkremser that clarifies a lot of things already! I currently have everything deployed to kubernetes in the same namespace: the spark master/workers and a Jupyter notebook pod from where I try to run the pyspark code to compute pi I looked further and the job is properly starting on the spark master, but is getting an error about connecting to the notebook (deployed with JupyterHub from OpenDataHub) as @elmiko anticipated it First I was getting this error: Caused by: java.io.IOException: Failed to connect to jupyterhub-nb-vemonet:40411
Caused by: java.net.UnknownHostException: jupyterhub-nb-vemonet So I created a And now I am getting a different error: Caused by: java.io.IOException: Failed to connect to jupyterhub-nb-vemonet/172.30.30.154:37865
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: No route to host: jupyterhub-nb-vemonet/172.30.30.154:37865
Caused by: java.net.NoRouteToHostException: No route to host Click here to see the complete error stack traceSpark Executor Command: "/usr/lib/jvm/java-11-openjdk-11.0.8.10-0.el8_2.x86_64/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=39637" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@jupyterhub-nb-vemonet:39637" "--executor-id" "0" "--hostname" "10.129.5.153" "--cores" "1" "--app-id" "app-20220505150749-0004" "--worker-url" "spark://[email protected]:33649"
========================================
22/05/05 15:07:51 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 169661@spark-cluster-w-zdrgn
22/05/05 15:07:51 INFO SignalUtils: Registered signal handler for TERM
22/05/05 15:07:51 INFO SignalUtils: Registered signal handler for HUP
22/05/05 15:07:51 INFO SignalUtils: Registered signal handler for INT
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark-distro/spark-3.0.1-bin-hadoop3.2/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/05/05 15:07:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/05 15:07:51 INFO SecurityManager: Changing view acls to: 185,jovyan
22/05/05 15:07:51 INFO SecurityManager: Changing modify acls to: 185,jovyan
22/05/05 15:07:51 INFO SecurityManager: Changing view acls groups to:
22/05/05 15:07:51 INFO SecurityManager: Changing modify acls groups to:
22/05/05 15:07:51 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185, jovyan); groups with view permissions: Set(); users with modify permissions: Set(185, jovyan); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:283)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:272)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$3(CoarseGrainedExecutorBackend.scala:303)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$1(CoarseGrainedExecutorBackend.scala:301)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more
Caused by: java.io.IOException: Failed to connect to jupyterhub-nb-vemonet/172.30.30.154:39637
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.netty.channel.AbstractChannel$AnnotatedNoRouteToHostException: No route to host: jupyterhub-nb-vemonet/172.30.30.154:39637
Caused by: java.net.NoRouteToHostException: No route to host
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834) If I kill the job and run another one the error is similar but with a slightly different port : I am not sure what is needed there, I will check if I can find something, maybe trying to run the python as a script with Thanks a lot for the help! I'll keep updating this issue if I make advancements |
check also #252 (comment) maybe it's the same/similar issue |
Description:
I deployed the radanalytics/spark-operator on OKD 4.6 (using OpenDataHub, you can find the full ODH manifests we are using here: https://github.com/MaastrichtU-IDS/odh-manifests)
From this spark-operator I started a Spark cluster (1 master, 10 workers, no limit)
When I am trying to simply create and collect a simple dataframe on this Spark cluster, creating works, but collecting get stuck
Creating runs in about 3s:
Collecting just get stuck:
I made sure to use the exact same Spark version (3.0.1) everywhere (spark cluster images, spark local executable, pyspark version: everything is 3.0.1)
These are the logs displayed by the Master and Workers nodes does not seems to contain any interesting informations:
Workers spam this exact code block every 3 seconds (just changing a bit the IDs). The words are english, they can be read, but the sentences they are producing are not giving any relevant informations on what's happening:
Master gives even less informations:
I tried different number of cores/limitations when deploying the Spark cluster, but every single Spark cluster deployed with the Radanalytics operator never manage to
.collect()
anything. But we always manage to connect to the Spark cluster and somehow create a dataframe on it (at least it seems like, not sure if the dataframe is really created)Steps to reproduce:
alpha
channel I guess, as usual with Operators there is not an easy way to share which version we use (not sure why it was designed this way, it really make reproducibility hard to achieve, anyway who cares about reproducibility in computer science?), but you can check the OpenDataHub subscription: https://github.com/MaastrichtU-IDS/odh-manifests/blob/dsri/radanalyticsio/spark/cluster/base/subscription.yamlQuestions
.collect()
get stuck when using Spark clusters deployed using the radanalytics operator?.collect()
a Dataframe on a radanalytics Spark cluster? (maybe the problem comes from the testing python code, and not the cluster, but we could not find an example provided to test if the Spark cluster works as expectedAnyone has any idea what it could be due to? @elmiko
Is there anyone here who actually used a Spark cluster deployed by the radanalytics operator to run some real pySpark computations? Would be interested to see the code to get some inspiration!
The text was updated successfully, but these errors were encountered: