From 29cfab3f1524c5690be675d24dda0a9a1806d6ff Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 6 Sep 2016 15:07:28 -0700 Subject: [SPARK-17110] Fix StreamCorruptionException in BlockManager.getRemoteValues() ## What changes were proposed in this pull request? This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException. We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long. This patch addresses the bug by modifying `BlockManager`'s `get()` and `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`) ## How was this patch tested? Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed. Author: Josh Rosen Closes #14952 from JoshRosen/SPARK-17110. --- .../org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 5 +++-- .../scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 53fccd8d5e..0b2ec29813 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -120,7 +120,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( val blockId = partition.blockId def getBlockFromBlockManager(): Option[Iterator[T]] = { - blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]]) + blockManager.get[T](blockId).map(_.data.asInstanceOf[Iterator[T]]) } def getBlockFromWriteAheadLog(): Iterator[T] = { @@ -163,7 +163,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( dataRead.rewind() } serializerManager - .dataDeserializeStream(blockId, new ChunkedByteBuffer(dataRead).toInputStream()) + .dataDeserializeStream( + blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag) .asInstanceOf[Iterator[T]] } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index feb5c30c6a..7e665454a5 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.postfixOps +import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, Matchers} @@ -163,7 +164,7 @@ class ReceivedBlockHandlerSuite val bytes = reader.read(fileSegment) reader.close() serializerManager.dataDeserializeStream( - generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream()).toList + generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList } loggedData shouldEqual data } -- cgit v1.2.3