aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-19 21:15:58 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-19 21:15:58 -0700
commitb762f9920f7587d3c08493c49dd2fede62110b88 (patch)
tree2f56aa8dc9de38917f54d9d46feafbc0995176d3 /streaming
parent73431d8afb41b93888d2642a1ce2d011f03fb740 (diff)
downloadspark-b762f9920f7587d3c08493c49dd2fede62110b88.tar.gz
spark-b762f9920f7587d3c08493c49dd2fede62110b88.tar.bz2
spark-b762f9920f7587d3c08493c49dd2fede62110b88.zip
[SPARK-10128] [STREAMING] Used correct classloader to deserialize WAL data
Recovering Kinesis sequence numbers from WAL leads to classnotfoundexception because the ObjectInputStream does not use the correct classloader and the SequenceNumberRanges class (in streaming-kinesis-asl package) cannot be found (added through spark-submit) while deserializing. The solution is to use `Thread.currentThread().getContextClassLoader` while deserializing. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8328 from tdas/SPARK-10128 and squashes the following commits: f19b1c2 [Tathagata Das] Used correct classloader to deserialize WAL data
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 7720259a5d..53b96d51c9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, Utils}
-import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.{Logging, SparkConf}
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
@@ -199,7 +199,8 @@ private[streaming] class ReceivedBlockTracker(
import scala.collection.JavaConversions._
writeAheadLog.readAll().foreach { byteBuffer =>
logTrace("Recovering record " + byteBuffer)
- Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) match {
+ Utils.deserialize[ReceivedBlockTrackerLogEvent](
+ byteBuffer.array, Thread.currentThread().getContextClassLoader) match {
case BlockAdditionEvent(receivedBlockInfo) =>
insertAddedBlock(receivedBlockInfo)
case BatchAllocationEvent(time, allocatedBlocks) =>