Skip to content

Commit

Permalink
[SPARK-10128] [STREAMING] Used correct classloader to deserialize WAL…
Browse files Browse the repository at this point in the history
… data

Recovering Kinesis sequence numbers from WAL leads to classnotfoundexception because the ObjectInputStream does not use the correct classloader and the SequenceNumberRanges class (in streaming-kinesis-asl package) cannot be found (added through spark-submit) while deserializing. The solution is to use `Thread.currentThread().getContextClassLoader` while deserializing.

Author: Tathagata Das <[email protected]>

Closes apache#8328 from tdas/SPARK-10128 and squashes the following commits:

f19b1c2 [Tathagata Das] Used correct classloader to deserialize WAL data
  • Loading branch information
tdas committed Aug 20, 2015
1 parent 73431d8 commit b762f99
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, Utils}
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.{Logging, SparkConf}

/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
Expand Down Expand Up @@ -199,7 +199,8 @@ private[streaming] class ReceivedBlockTracker(
import scala.collection.JavaConversions._
writeAheadLog.readAll().foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match {
Utils.deserialize[ReceivedBlockTrackerLogEvent](
byteBuffer.array, Thread.currentThread().getContextClassLoader) match {
case BlockAdditionEvent(receivedBlockInfo) =>
insertAddedBlock(receivedBlockInfo)
case BatchAllocationEvent(time, allocatedBlocks) =>
Expand Down

0 comments on commit b762f99

Please sign in to comment.