aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-03-08 12:36:36 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-03-08 12:36:36 -0800
commit7febdfbe2952aeef5e5f0a1cdbc95df4a274fd78 (patch)
tree4d1265af744689c0d94aa3891568ecfd324cb689
parent7408230bfab0d374eb2eefd3f181e4d997c66061 (diff)
downloadspark-7febdfbe2952aeef5e5f0a1cdbc95df4a274fd78.tar.gz
spark-7febdfbe2952aeef5e5f0a1cdbc95df4a274fd78.tar.bz2
spark-7febdfbe2952aeef5e5f0a1cdbc95df4a274fd78.zip
Better reuse of buffers in Kryo serialization
-rw-r--r--core/src/main/scala/spark/KryoSerialization.scala26
1 files changed, 15 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/KryoSerialization.scala b/core/src/main/scala/spark/KryoSerialization.scala
index 462dee217e..63e22ae4ae 100644
--- a/core/src/main/scala/spark/KryoSerialization.scala
+++ b/core/src/main/scala/spark/KryoSerialization.scala
@@ -58,10 +58,8 @@ object ZigZag {
}
}
-class KryoSerializationStream(kryo: Kryo, out: OutputStream)
+class KryoSerializationStream(kryo: Kryo, buf: ByteBuffer, out: OutputStream)
extends SerializationStream {
- val buf = ByteBuffer.allocateDirect(1024*1024)
-
def writeObject[T](t: T) {
kryo.writeClassAndObject(buf, t)
ZigZag.writeInt(buf.position(), out)
@@ -74,10 +72,8 @@ extends SerializationStream {
def close() { out.close() }
}
-class KryoDeserializationStream(kryo: Kryo, in: InputStream)
+class KryoDeserializationStream(buf: ObjectBuffer, in: InputStream)
extends DeserializationStream {
- val buf = new ObjectBuffer(kryo, 1024*1024)
-
def readObject[T](): T = {
val len = ZigZag.readInt(in)
buf.readClassAndObject(in, len).asInstanceOf[T]
@@ -86,8 +82,8 @@ extends DeserializationStream {
def close() { in.close() }
}
-class KryoSerializer(kryo: Kryo) extends Serializer {
- val buf = new ObjectBuffer(kryo, 1024*1024)
+class KryoSerializer(strat: KryoSerialization) extends Serializer {
+ val buf = strat.threadBuf.get()
def serialize[T](t: T): Array[Byte] = {
buf.writeClassAndObject(t)
@@ -98,11 +94,11 @@ class KryoSerializer(kryo: Kryo) extends Serializer {
}
def outputStream(s: OutputStream): SerializationStream = {
- new KryoSerializationStream(kryo, s)
+ new KryoSerializationStream(strat.kryo, strat.threadByteBuf.get(), s)
}
def inputStream(s: InputStream): DeserializationStream = {
- new KryoDeserializationStream(kryo, s)
+ new KryoDeserializationStream(buf, s)
}
}
@@ -114,6 +110,14 @@ trait KryoRegistrator {
class KryoSerialization extends SerializationStrategy with Logging {
val kryo = createKryo()
+ val threadBuf = new ThreadLocal[ObjectBuffer] {
+ override def initialValue = new ObjectBuffer(kryo, 128*1024*1024)
+ }
+
+ val threadByteBuf = new ThreadLocal[ByteBuffer] {
+ override def initialValue = ByteBuffer.allocate(128*1024*1024)
+ }
+
def createKryo(): Kryo = {
val kryo = new Kryo()
@@ -158,5 +162,5 @@ class KryoSerialization extends SerializationStrategy with Logging {
kryo
}
- def newSerializer(): Serializer = new KryoSerializer(kryo)
+ def newSerializer(): Serializer = new KryoSerializer(this)
}