diff options
author | Eric Liang <ekl@databricks.com> | 2016-08-22 16:32:14 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-08-22 16:32:14 -0700 |
commit | 8e223ea67acf5aa730ccf688802f17f6fc10907c (patch) | |
tree | b60988fb0166c27148e44636326ed20399327d94 /core/src/main | |
parent | 71afeeea4ec8e67edc95b5d504c557c88a2598b9 (diff) | |
download | spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.gz spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.bz2 spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.zip |
[SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block manager replication
## What changes were proposed in this pull request?
This is a straightforward clone of JoshRosen 's original patch. I have follow-up changes to fix block replication for repl-defined classes as well, but those appear to be flaking tests so I'm going to leave that for SPARK-17042
## How was this patch tested?
End-to-end test in ReplSuite (also more tests in DistributedSuite from the original patch).
Author: Eric Liang <ekl@databricks.com>
Closes #14311 from ericl/spark-16550.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 13 |
2 files changed, 23 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 9dc274c9fe..07caadbe40 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -68,7 +68,7 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar * loaded yet. */ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf) - private def canUseKryo(ct: ClassTag[_]): Boolean = { + def canUseKryo(ct: ClassTag[_]): Boolean = { primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag } @@ -128,8 +128,18 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar /** Serializes into a chunked byte buffer. */ def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = { + dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]]) + } + + /** Serializes into a chunked byte buffer. */ + def dataSerializeWithExplicitClassTag( + blockId: BlockId, + values: Iterator[_], + classTag: ClassTag[_]): ChunkedByteBuffer = { val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate) - dataSerializeStream(blockId, bbos, values) + val byteStream = new BufferedOutputStream(bbos) + val ser = getSerializer(classTag).newInstance() + ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() bbos.toChunkedByteBuffer } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 015e71d126..fe84652798 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -498,7 +498,8 @@ private[spark] class BlockManager( diskStore.getBytes(blockId) } else if (level.useMemory && memoryStore.contains(blockId)) { // The block was not found on disk, so serialize an in-memory copy: - serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get) + serializerManager.dataSerializeWithExplicitClassTag( + blockId, memoryStore.getValues(blockId).get, info.classTag) } else { handleLocalReadFailure(blockId) } @@ -973,8 +974,16 @@ private[spark] class BlockManager( if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis val bytesToReplicate = doGetLocalBytes(blockId, info) + // [SPARK-16550] Erase the typed classTag when using default serialization, since + // NettyBlockRpcServer crashes when deserializing repl-defined classes. + // TODO(ekl) remove this once the classloader issue on the remote end is fixed. + val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) { + scala.reflect.classTag[Any] + } else { + classTag + } try { - replicate(blockId, bytesToReplicate, level, classTag) + replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { bytesToReplicate.dispose() } |