aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-08 22:57:38 -0700
committerReynold Xin <rxin@apache.org>2013-10-08 22:57:38 -0700
commite67d5b962a2adddc073cfc9c99be9012fbb69838 (patch)
treeb23bdee35d6ccc397811bc70540ee86a190f6839 /core
parentea34c521025d3408d44d45ab5c132fd9791794f6 (diff)
parenta8725bf8f82ffea215afe7dd6c9ea1df36618e5b (diff)
downloadspark-e67d5b962a2adddc073cfc9c99be9012fbb69838.tar.gz
spark-e67d5b962a2adddc073cfc9c99be9012fbb69838.tar.bz2
spark-e67d5b962a2adddc073cfc9c99be9012fbb69838.zip
Merge pull request #43 from mateiz/kryo-fix
Don't allocate Kryo buffers unless needed I noticed that the Kryo serializer could be slower than the Java one by 2-3x on small shuffles because it spend a lot of time initializing Kryo Input and Output objects. This is because our default buffer size for them is very large. Since the serializer is often used on streams, I made the initialization lazy for that, and used a smaller buffer (auto-managed by Kryo) for input.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 24ef204aa1..6c500bad92 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -38,8 +38,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
def newKryoOutput() = new KryoOutput(bufferSize)
- def newKryoInput() = new KryoInput(bufferSize)
-
def newKryo(): Kryo = {
val instantiator = new ScalaKryoInstantiator
val kryo = instantiator.newKryo()
@@ -118,8 +116,10 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
val kryo = ks.newKryo()
- val output = ks.newKryoOutput()
- val input = ks.newKryoInput()
+
+ // Make these lazy vals to avoid creating a buffer unless we use them
+ lazy val output = ks.newKryoOutput()
+ lazy val input = new KryoInput()
def serialize[T](t: T): ByteBuffer = {
output.clear()