aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-08-22 16:32:14 -0700
committerReynold Xin <rxin@databricks.com>2016-08-22 16:32:14 -0700
commit8e223ea67acf5aa730ccf688802f17f6fc10907c (patch)
treeb60988fb0166c27148e44636326ed20399327d94 /core/src/main
parent71afeeea4ec8e67edc95b5d504c557c88a2598b9 (diff)
downloadspark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.gz
spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.bz2
spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.zip
[SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block manager replication
## What changes were proposed in this pull request? This is a straightforward clone of JoshRosen 's original patch. I have follow-up changes to fix block replication for repl-defined classes as well, but those appear to be flaking tests so I'm going to leave that for SPARK-17042 ## How was this patch tested? End-to-end test in ReplSuite (also more tests in DistributedSuite from the original patch). Author: Eric Liang <ekl@databricks.com> Closes #14311 from ericl/spark-16550.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala13
2 files changed, 23 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 9dc274c9fe..07caadbe40 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -68,7 +68,7 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
* loaded yet. */
private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
- private def canUseKryo(ct: ClassTag[_]): Boolean = {
+ def canUseKryo(ct: ClassTag[_]): Boolean = {
primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
}
@@ -128,8 +128,18 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
/** Serializes into a chunked byte buffer. */
def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
+ dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
+ }
+
+ /** Serializes into a chunked byte buffer. */
+ def dataSerializeWithExplicitClassTag(
+ blockId: BlockId,
+ values: Iterator[_],
+ classTag: ClassTag[_]): ChunkedByteBuffer = {
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
- dataSerializeStream(blockId, bbos, values)
+ val byteStream = new BufferedOutputStream(bbos)
+ val ser = getSerializer(classTag).newInstance()
+ ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
bbos.toChunkedByteBuffer
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 015e71d126..fe84652798 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -498,7 +498,8 @@ private[spark] class BlockManager(
diskStore.getBytes(blockId)
} else if (level.useMemory && memoryStore.contains(blockId)) {
// The block was not found on disk, so serialize an in-memory copy:
- serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
+ serializerManager.dataSerializeWithExplicitClassTag(
+ blockId, memoryStore.getValues(blockId).get, info.classTag)
} else {
handleLocalReadFailure(blockId)
}
@@ -973,8 +974,16 @@ private[spark] class BlockManager(
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
val bytesToReplicate = doGetLocalBytes(blockId, info)
+ // [SPARK-16550] Erase the typed classTag when using default serialization, since
+ // NettyBlockRpcServer crashes when deserializing repl-defined classes.
+ // TODO(ekl) remove this once the classloader issue on the remote end is fixed.
+ val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
+ scala.reflect.classTag[Any]
+ } else {
+ classTag
+ }
try {
- replicate(blockId, bytesToReplicate, level, classTag)
+ replicate(blockId, bytesToReplicate, level, remoteClassTag)
} finally {
bytesToReplicate.dispose()
}