Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warning in pyspark: NullPointerException in RecordReader.close() #20

Open
surjikal opened this issue Jan 24, 2017 · 2 comments
Open

Warning in pyspark: NullPointerException in RecordReader.close() #20

surjikal opened this issue Jan 24, 2017 · 2 comments

Comments

@surjikal
Copy link
Contributor

I compressed a file (~30mb, just for testing) using the 4mc tool:

$ 4mc data.txt 
Compressed filename will be : data.txt.4mc 
Compression: LZ4
Compressed (fast) 30288541 bytes into 2865501 bytes ==> 9.46% (Ratio=10.570)   

Then I tried to open the compressed file in (py)spark:

$ pyspark --master local[1]
>>> sc.textFile('file:///data.txt.4mc').count()
17/01/24 09:28:04 WARN org.apache.spark.rdd.HadoopRDD: Exception in RecordReader.close()
java.lang.NullPointerException
        at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
        at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224)
        at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:288)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:276)
        at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
        at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1953)
        at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
4634

I got the result, but it warned me about a null pointer exception. Thought you guys might want to know!

@surjikal surjikal changed the title NullPointerException warning when calling sc.textFile in pyspark Warning in pyspark: NullPointerException in RecordReader.close() Jan 24, 2017
@gtinjr
Copy link

gtinjr commented Mar 20, 2023

Similar issue with pyspark. It seems that the decompressor has already been destroyed/finalized when reset buffer is getting called so the buffer does not exist anymore and it throws an Null Pointer in the Lz4Decompressor.

23/03/20 12:12:24 INFO CodecPool: Got brand-new decompressor [.4mc]
23/03/20 12:12:24 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at com.fing.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:224)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:243)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.close(RecordReaderIterator.scala:62)
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.close(HadoopFileLinesReader.scala:73)
at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1$$anonfun$apply$1$$anonfun$apply$2.apply(TextFileFormat.scala:123)
at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1$$anonfun$apply$1$$anonfun$apply$2.apply(TextFileFormat.scala:123)
at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.org$apache$spark$executor$Executor$TaskRunner$$anonfun$$res$1(Executor.scala:412)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:419)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1359)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:430)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
23/03/20 12:12:24 ERROR Executor: Exception in task 0.3 in stage 0.0 (TID 3)
org.apache.spark.util.TaskCompletionListenerException: null
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.org$apache$spark$executor$Executor$TaskRunner$$anonfun$$res$1(Executor.scala:412)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:419)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1359)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:430)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
23/03/20 12:13:25 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
23/03/20 12:13:25 INFO DiskBlockManager: Shutdown hook called
23/03/20 12:13:25 INFO ShutdownHookManager: Shutdown hook called

@gtinjr
Copy link

gtinjr commented Mar 23, 2023

This is a problem on FourMcInputStream.close() method calling LZ4Decompressor.releaseDirectBuffers. when reset gets called the buffers have been previously set to null and therefore it causes the NullPointerException. Commenting this out on FourMcInputStream.close() method fix this issue.
//((Lz4Decompressor)decompressor).releaseDirectBuffers();
I dont now the reason why this is getting called at this. I looked at a similar project hadoop-lzo from twitter and it does not releaseDirectBuffers in its inputstream.close() method. It is very destructive to make this call here since the decompressor will later be return to the hadoop CodecPool and it will be a bad decompressor in the pool since it is internal buffers have been destroyed.

The following scala code can be used to reproduce this issue, using spark 2.4.4-scala-2.11 and hadoop 2.7.0.
This will fail here val raw_df = spark.read.option("header", value = true).csv("file:///c:/lift/V1_sep_test_900.psv.4mc") when it uses the inputstream to do a partial read to create the dataframe and its schema, then it was supposed to return the decompressor back to the pool and later reused on this line raw_df.show(5) but it fails after calling calling reset on the decompressor after its directbuffers were released in the close method.

object Decompressor {
def createSparkSession(): SparkSession = {
System.setProperty("hadoop.home.dir", "C:\opt\mapr\hadoop\hadoop-2.7.0")
System.setProperty("SPARK_HOME", "C:\opt\mapr\spark\spark-2.4.4")
return SparkSession.builder()
.master("local[1]")
.appName("Decompressor - 4mc debug")
.config("spark.some.config.option", "config-value")
.getOrCreate()
}

def main(args: Array[String]): Unit = {
val spark = createSparkSession()
val raw_df = spark.read.option("header", value = true)
.csv("file:///c:/lift/V1_sep_test_900.psv.4mc")
raw_df.show(5)
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants