diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-13 19:03:59 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-13 19:03:59 -0800 |
commit | d4cd5debf4ac11884e4e703f482ffc17427e277f (patch) | |
tree | 5794aca161a6c8b63316b20dd3d8384715a890d2 /core | |
parent | c3816de5040e3c48e58ed4762d2f4eb606812938 (diff) | |
download | spark-d4cd5debf4ac11884e4e703f482ffc17427e277f.tar.gz spark-d4cd5debf4ac11884e4e703f482ffc17427e277f.tar.bz2 spark-d4cd5debf4ac11884e4e703f482ffc17427e277f.zip |
Fix for Kryo Serializer
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 2eef6a7c10..2cf46e82b0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.serializer.Serializer +import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer} import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter} /** @@ -333,7 +333,18 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( if (!eof) { try { if (objectsRead == serializerBatchSize) { - deserializeStream = ser.deserializeStream(compressedStream) + val newInputStream = deserializeStream match { + case stream: KryoDeserializationStream => + // Kryo's serializer stores an internal buffer that pre-fetches from the underlying + // stream. We need to capture this buffer and feed it to the new serialization + // stream so that the bytes are not lost. + val kryoInput = stream.input + val remainingBytes = kryoInput.limit() - kryoInput.position() + val extraBuf = kryoInput.readBytes(remainingBytes) + new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream) + case _ => compressedStream + } + deserializeStream = ser.deserializeStream(newInputStream) objectsRead = 0 } objectsRead += 1 |