aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-13 19:03:59 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-13 19:03:59 -0800
commitd4cd5debf4ac11884e4e703f482ffc17427e277f (patch)
tree5794aca161a6c8b63316b20dd3d8384715a890d2 /core
parentc3816de5040e3c48e58ed4762d2f4eb606812938 (diff)
downloadspark-d4cd5debf4ac11884e4e703f482ffc17427e277f.tar.gz
spark-d4cd5debf4ac11884e4e703f482ffc17427e277f.tar.bz2
spark-d4cd5debf4ac11884e4e703f482ffc17427e277f.zip
Fix for Kryo Serializer
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala15
1 files changed, 13 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 2eef6a7c10..2cf46e82b0 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
/**
@@ -333,7 +333,18 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (!eof) {
try {
if (objectsRead == serializerBatchSize) {
- deserializeStream = ser.deserializeStream(compressedStream)
+ val newInputStream = deserializeStream match {
+ case stream: KryoDeserializationStream =>
+ // Kryo's serializer stores an internal buffer that pre-fetches from the underlying
+ // stream. We need to capture this buffer and feed it to the new serialization
+ // stream so that the bytes are not lost.
+ val kryoInput = stream.input
+ val remainingBytes = kryoInput.limit() - kryoInput.position()
+ val extraBuf = kryoInput.readBytes(remainingBytes)
+ new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream)
+ case _ => compressedStream
+ }
+ deserializeStream = ser.deserializeStream(newInputStream)
objectsRead = 0
}
objectsRead += 1