-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
structured streaming #120
Comments
As a general rule, you can follow the official documentation with a few context-specific modifications. I don't have a proper setup to test it myself, but I'd start with this doc. According to it, first you need to add spark = SparkSession.builder()
.appName("...")
.master("spark://ip:7077")
.config("spark.jars", "/path/,/path/to/another.jar")
.getOrCreate() then just follow the examples on the linked page (Python or Scala API): df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") Please, let me know if it worked for you. |
Thanks for your response. I followed your suggestion but I am still getting the same error. This is what I did. I downloaded spark-sql-kafka into a folder. Let's call it spark-jars. Then I updated the spark session as follows spark = SparkSession.builder.appName("Main").master("spark://IP:7077").config("spark.jars", "/absolute/path/to/spark-jars/spark-sql-kafka-0-10_2.12-3.5.2.jar").getOrCreate() Next, I added the streaming code as follows stream_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "st-streaming-session").load() But I am still getting the following exception: Note that for the kafkaesque.boostrap.servers, I tried the domain name, the IP address and localhost. All three are throwing the same exception. What am I doing wrong? |
Can you post the full stacktrace? Edit: or even better the full log or its pieces related to JAR sending to workers. |
Below is the full stack trace of the error I get Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide. at org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError(QueryCompilationErrors.scala:1567) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:645) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:158) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145) and below is the latest log from the master for my latest run 24/09/24 05:59:24 INFO Master: Start scheduling for app app-20240924055923-0004 with rpId: 0 24/09/24 06:02:27 INFO Master: 196.216.167.103:36600 got disassociated, removin g it. 24/09/24 06:02:27 INFO Master: COMPSCSRV04.nust.na:33649 got disassociated, rem oving it. 24/09/24 06:02:27 INFO Master: Removing app app-20240924055923-0004 24/09/24 06:02:27 WARN Master: Got status update for unknown executor app-20240 924055923-0004/0 24/09/24 06:02:27 WARN Master: Got status update for unknown executor app-20240 924055923-0004/1 24/09/24 06:04:53 INFO Master: Registering app Main 24/09/24 06:04:53 INFO Master: Registered app Main with ID app-20240924060453-0 005 24/09/24 06:04:53 INFO Master: Start scheduling for app app-20240924060453-0005 with rpId: 0 24/09/24 06:04:53 INFO Master: Launching executor app-20240924060453-0005/0 on worker worker-20240923111146-196.216.167.102-45805 24/09/24 06:04:53 INFO Master: Launching executor app-20240924060453-0005/1 on worker worker-20240923111223-196.216.167.105-36589 24/09/24 06:04:54 INFO Master: Start scheduling for app app-20240924060453-0005 with rpId: 0 24/09/24 06:04:54 INFO Master: Start scheduling for app app-20240924060453-0005 with rpId: 0 |
Stack trace geterror() @ core.jl:544 _jcall(::JavaCall.JavaObject{Symbol("org.apache.spark.sql.streaming.DataStreamReader")}, ::Ptr{Nothing}, ::Type, ::Tuple{}; callmethod::typeof(JavaCall.JNI.CallObjectMethodA)) @ core.jl:482 _jcall(::JavaCall.JavaObject{Symbol("org.apache.spark.sql.streaming.DataStreamReader")}, ::Ptr{Nothing}, ::Type, ::Tuple{}) @ core.jl:475 jcall(::JavaCall.JavaObject{Symbol("org.apache.spark.sql.streaming.DataStreamReader")}, ::String, ::Type, ::Tuple{}) @ core.jl:371 load @ streaming.jl:54 DotChainer @ chainable.jl:13 |
For some reason, the added jar file is not propagated to your workers. Debugging this issue via Github issues isn't easy, but as a short in the night let's try to add Kafka connector as a package instead of a plain jar: spark = SparkSession.builder()
.appName("...")
.master("spark://ip:7077")
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1' ) # <-- this line changed
.getOrCreate() |
It keeps throwing the same error. Suspecting that the added jar was not propagated, I created the same folder on each worker node and added the jar. But I am still getting the same error.
But the error persists. I am just confused now. |
What's your cluster config, e.g. cluster manager, remote or local workers, etc.? Usually, Spark creates project directories on workers dynamically, so putting jar files to some directory on workers beforehand doesn't take effect. |
I am running spark in a standalone mode on a 3-node cluster. I have one master and two workers. Each spark instance is on a different node. The kafka instance is running on the same node as the master. That's pretty much it. |
I made a few changes. I reverted the session back to spark.jars as follows: spark = SparkSession.builder.appName("SoftwareTools").config("spark.jars", "/path/to/spark-sql-kafka-0-10_2.12-3.5.2.jar").master("spark://IP:7077").getOrCreate() I also upgraded the version of scala to 2.12.2 to match the version of the jar file. But I am now getting the error below. Any hint? Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.KafkaSourceProvider$ at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145) |
It looks like Kafka jar is now being picked up, but versions of libraries still mismatch. Let's try to align them. In your local installation of Spark.jl, find a file called
to this:
Then in Julia console type: ] build or
Once the build is complete (and successful), try to test your code again. |
I made the suggested change. But my code is in a Pluto notebook. Do you think it will rebuild automatically if I restart the notebook? Spark is not available in the global space in Julia. |
After following the suggestion above and successfully building the Spark package, I am now observing two types of error: 1 - When I use the Exception in thread "main" java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.KafkaSourceProvider$ at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145) When I revert back to the configuration using xception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:599) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala) at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 10 more |
I added the Kafka clients package and it finally worked. Thanks for your assistance on that. PS: This is the error UndefVarError: `selectExpr` not defined |
May I please have a concrete example of how to read and write from and to the kafka stream based on the Spark.jl implementation? I can't find any example in the package doc. I did the following but it is not working. rec_evts = stream_df.select(Column("key"), Column("value"))
query = rec_evts.writeStream().outputMode("complete").format("console").start()
query.awaitTermination() Thanks |
Great! Could you please add a bit more details on what actions helped you to resolve the issue? E.g. what exact package did you add and how (jars, packages, manually)? This way others will be able to resolve similar issues quicker. Thanks in advance!
Can you post the error you are getting? I haven't tested Spark/Kafka integration thoroughly, but the I don't see immediately any red flags in you code. |
Sorry, I forgot to address this. You can also control Spark and Scala versions using ENV["BUILD_SPARK_VERSION"] = "3.5.2"
ENV["BUILD_SCALA_VERSION"] = "2.12.2"
] build Spark I guess this approach is easier to maintain in a notebook. |
Thanks for that. Could you also provide documentation on how to read a stream and write a stream using the library. I've struggled to get it to access data. |
As I mentioned above, I don't have proper environment to test it myself, but your code looks correct to me. If you could post the error you encounter, I may be able to suggest a fix. |
I've updated the session as follows: spark = SparkSession.builder.appName("SoftwareTools").master("spark://196.216.167.103:7077").config("spark.jars","/home/sysdev/spark-3.5.2-bin-hadoop3/jars/commons-pool2-2.11.1.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-sql-kafka-0-10_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-streaming_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/kafka-clients-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-token-provider-kafka-0-10_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-tags-2_12-3.5.2.jar").getOrCreate() Then when I do stream_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "st-streaming-session").option("includeHeaders", "true").option("startingOffsets", "earliest").load()
rec_evts = stream_df.select(Column("key"), Column("value"))
query = rec_evts.writeStream().outputMode("append").format("console").start().awaitTermination() I get the following error java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$ at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.(KafkaBatchPartitionReader.scala:53) at org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:41) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:84) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:441) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) What am I missing? |
is it normal that the print to console |
I have an idea what may have gone wrong. Could you please try ] add Spark#streaming-fixes and then run your code again? |
it still doesn't print anything. I get the following print out Batch: 0+-----+ Although it does a microbactch execution and prints the details below out 24/10/01 17:55:43 INFO MicroBatchExecution: Streaming query made progress: { "id" : "f5550c88-a331-4f4d-bef6-a8059ce42b03", "runId" : "323c29a5-5be2-4099-bf3c-bbe70c9484ba", "name" : null, "timestamp" : "2024-10-01T15:55:38.385Z", "batchId" : 0, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "addBatch" : 2754, "commitOffsets" : 71, "getBatch" : 42, "latestOffset" : 1334, "queryPlanning" : 693, "triggerExecution" : 5014, "walCommit" : 44 }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaV2[Subscribe[st-streaming-session]]", "startOffset" : null, "endOffset" : { "st-streaming-session" : { "0" : 15 } }, "latestOffset" : { "st-streaming-session" : { "0" : 15 } }, "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "metrics" : { "avgOffsetsBehindLatest" : "0.0", "maxOffsetsBehindLatest" : "0", "minOffsetsBehindLatest" : "0" } } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@2169e18e", "numOutputRows" : 0 } } |
Can you show complete Julia and Scala code? Spark.jl is nothing more than a shallow wrapper around Spark API. For instance, the function format(writer::DataStreamWriter, fmt::String)
jwriter = jcall(writer.jwriter, "format", JDataStreamWriter, (JString,), fmt)
return DataStreamWriter(jwriter)
end Here jcall(writer.jwriter, "format", JDataStreamWriter, (JString,), fmt )
# ^ ^ ^ ^ ^
# java-object fn-name return-type arg-types arg-val is thus just a JNI call to Yet, some Julia wrappers may be broken - as I said earlier, I never had a chance to extensively test streaming. If there's really no difference between Scala and Julia version, you can call Spark API directly, e.g.: using JavaCall
using Spark
import Spark: JDataStreamWriter
...
jwriter = rec_evts.writeStream().jwriter
jwriter = jcall(jwriter, "format", JDataStreamWriter, (JString,), fmt)
jwriter = jcall(jwriter, "outputMode", JDataStreamWriter, (JString,), m)
... |
Please find below both the Julia and the Scala versions. The scala version shows the messages in spark-shell. using Spark
spark = SparkSession.builder.appName("SoftwareTools").master("spark://IP:7077").config("saprk.jars", "/home/sysdev/spark-3.5.2-bin-hadoop3/jars/commons-pool2-2.12.0.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-streaming_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-sql-kafka-0-10_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/kafka-clients-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-token-provider-kafka-0-10_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-tags-2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-streaming_2.12-3.5.2.jar").getOrCreate()
stream_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "st-streaming-session").option("includeHeaders", "true").option("startingOffsets", "latest").load()
rec_evts = stream_df.select(Column("value"))
rec_evts.writeStream().format("console").outputMode("append").start().awaitTermination() 2 - Scala import org.apache.spark.sql.SparkSession
import org.apache.commons.pool2.impl._
val sp_session = SparkSession.builder().appName("Test App").master("spark://IP:7077").config("spark.jars","/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-sql-kafka-0-10_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/kafka-clients-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-token-provider-kafka-0-10_2.12-3.5.2.jar,/home/sysdev/spark-3.5.2-bin-hadoop3/jars/spark-tags-2_12-3.5.2.jar").getOrCreate()
val stream_df = sp_session.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","st-streaming-session").option("startingOffsets", "earliest").load()
val values = stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String,String)]
values.writeStream.outputMode("append").format("console").start().awaitErmination() |
You use latest offset in the Julia version and earliest in the Scala version. Is it intended? How many messages do you add to the topic during the test? |
I've actually tried both offsets. But I get no output. |
Here I'm trying to set up a dev environment to check this issue, but today I've run out of time. I'll try to allocate more time in the coming days. |
By the way, what version of Java are you running? |
openjdk version "1.8.0_412" |
How did you manage to fix this issue after all? I'm getting a similar error both - in Julia and Scala version. |
I downloaded the spark-token-provider-kafka-0-10_2.12-3.5.2.jar, commons-pool2-2.12.0.jar and spark-tags-2.12-3.5.2.jar |
Please tell me, based on the JWriter example, how do I start writing to the console? I am bit lost on that part |
It should be something like this: import Spark: JDataStreamWriter, JStreamingQuery
rec_evts = ... # same as previously
jwriter = rec_evts.writeStream().jwriter
jwriter = jcall(jwriter, "format", JDataStreamWriter, (JString,), "console")
jwriter = jcall(jwriter, "outputMode", JDataStreamWriter, (JString,), "append")
jquery = jcall(jwriter, "start", JStreamingQuery, ())
jcall(jquery, "awaitTermination", Nothing, ()) If it works, then the problem is in Spark.jl wrappers. If it doesn't, there may be a misconfiguration on the Apache Spark's side.
It didn't work for me in a container, so I'm digging deeper. |
any luck with this so far? |
Unfortunately, no. Apparently, years of not using Spark flashed away my experience setting up the environment. I will give it another try next week. Meanwhile, did you have a chance to test |
The jcall-based example is still not printing. An almost similar example in scala is working though. |
So the problem must be in Spark's setup. One possible issue is a conflict of versions of Julia / JavaCall / Java / Scala / Spark. In my docker env, I noticed that some other pretty basic functions don't work too. Could you please run this simple code and tell if it works? using JavaCall
JavaCall.init()
listmethods(JString) For me (Julia 1.10.5, JavaCall 0.8.0, OpenJDK 1.8.0_422) this gives: Error calling Java: java.lang.NoSuchMethodError: forName which is very unexpected. Do you experience it as well? |
I just tried it in the REPL but got no error. I got a list of methods. I also checked the versions of Julia, JavaCall and jdk. Same as yours. java.lang.String toString() int hashCode() int compareTo(java.lang.Object) int compareTo(java.lang.String) int indexOf(java.lang.String, int) int indexOf(int) int indexOf(java.lang.String) int indexOf(int, int) java.lang.String valueOf(char) java.lang.String valueOf(java.lang.Object) java.lang.String valueOf(boolean) java.lang.String valueOf(char[], int, int) java.lang.String valueOf(char[]) java.lang.String valueOf(double) java.lang.String valueOf(float) ... |
Perhaps its due to the underlying OSX. I will try it on Linux VM a bit later. |
Okay! I am using Cent OS 7 |
Spark was not designed to be extended from outside. A good example of it is In main branch, there's also a bug: we override the provided "spark.jars" with our own By the way, moving to Linux indeed fixed the issue with JavaCall. |
I will look into that later this week. I shall get back to you |
@dfdx is there a way I can contact you directly by email? I'd like to suggest some ideas regarding a PR. |
@joques Please check out the Github notifications for a comment with my email (I've deleted it from the issue itself) |
Hi all, how do I get Spark.jl to read a stream from and write to Kafka? I need help finding documentation on that.
The text was updated successfully, but these errors were encountered: