aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-25 00:03:11 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-25 00:03:11 -0700
commit51c24276184928fc3e4d44dc20624cb90b61ef6a (patch)
tree9e8f9f5142e9ff77b477bfdfe350db60d3135b07
parentc258718606a2960649dde0a4925fcf385d617c37 (diff)
parente56aa75de0f3c00e9942f0863c0fb8c57aab5321 (diff)
downloadspark-51c24276184928fc3e4d44dc20624cb90b61ef6a.tar.gz
spark-51c24276184928fc3e4d44dc20624cb90b61ef6a.tar.bz2
spark-51c24276184928fc3e4d44dc20624cb90b61ef6a.zip
Merge pull request #732 from ryanlecompte/master
Refactor Kryo serializer support to use chill/chill-java
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala137
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala14
-rw-r--r--pom.xml11
-rw-r--r--project/SparkBuild.scala5
4 files changed, 43 insertions, 124 deletions
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index ee37da7948..eeb2993d8a 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -19,24 +19,16 @@ package spark
import java.io._
import java.nio.ByteBuffer
-import java.nio.channels.Channels
-
-import scala.collection.immutable
-import scala.collection.mutable
-
-import com.esotericsoftware.kryo._
-import com.esotericsoftware.kryo.{Serializer => KSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
-
+import com.twitter.chill.ScalaKryoInstantiator
import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
import spark.broadcast._
import spark.storage._
private[spark]
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
-
val output = new KryoOutput(outStream)
def writeObject[T](t: T): SerializationStream = {
@@ -50,7 +42,6 @@ class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends Seria
private[spark]
class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
-
val input = new KryoInput(inStream)
def readObject[T](): T = {
@@ -58,7 +49,7 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
kryo.readClassAndObject(input).asInstanceOf[T]
} catch {
// DeserializationStream uses the EOF exception to indicate stopping condition.
- case e: com.esotericsoftware.kryo.KryoException => throw new java.io.EOFException
+ case _: KryoException => throw new EOFException
}
}
@@ -69,10 +60,9 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser
}
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
-
- val kryo = ks.kryo.get()
- val output = ks.output.get()
- val input = ks.input.get()
+ val kryo = ks.newKryo()
+ val output = ks.newKryoOutput()
+ val input = ks.newKryoInput()
def serialize[T](t: T): ByteBuffer = {
output.clear()
@@ -108,125 +98,51 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
* serialization.
*/
trait KryoRegistrator {
- def registerClasses(kryo: Kryo): Unit
+ def registerClasses(kryo: Kryo)
}
/**
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
*/
class KryoSerializer extends spark.serializer.Serializer with Logging {
+ private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
- val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+ def newKryoOutput() = new KryoOutput(bufferSize)
- val kryo = new ThreadLocal[Kryo] {
- override def initialValue = createKryo()
- }
-
- val output = new ThreadLocal[KryoOutput] {
- override def initialValue = new KryoOutput(bufferSize)
- }
-
- val input = new ThreadLocal[KryoInput] {
- override def initialValue = new KryoInput(bufferSize)
- }
+ def newKryoInput() = new KryoInput(bufferSize)
- def createKryo(): Kryo = {
- val kryo = new KryoReflectionFactorySupport()
+ def newKryo(): Kryo = {
+ val instantiator = new ScalaKryoInstantiator
+ val kryo = instantiator.newKryo()
+ val classLoader = Thread.currentThread.getContextClassLoader
// Register some commonly used classes
val toRegister: Seq[AnyRef] = Seq(
- // Arrays
- Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), Array(("", "")),
- Array(new java.lang.Object), Array(1.toByte), Array(true), Array('c'),
- // Specialized Tuple2s
- ("", ""), ("", 1), (1, 1), (1.0, 1.0), (1L, 1L),
- (1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1),
- // Scala collections
- List(1), mutable.ArrayBuffer(1),
- // Options and Either
- Some(1), Left(1), Right(1),
- // Higher-dimensional tuples
- (1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1),
- None,
ByteBuffer.allocate(1),
StorageLevel.MEMORY_ONLY,
PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock("1", ByteBuffer.allocate(1)),
GetBlock("1")
)
- for (obj <- toRegister) {
- kryo.register(obj.getClass)
- }
+
+ for (obj <- toRegister) kryo.register(obj.getClass)
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
- // Register some commonly used Scala singleton objects. Because these
- // are singletons, we must return the exact same local object when we
- // deserialize rather than returning a clone as FieldSerializer would.
- class SingletonSerializer[T](obj: T) extends KSerializer[T] {
- override def write(kryo: Kryo, output: KryoOutput, obj: T) {}
- override def read(kryo: Kryo, input: KryoInput, cls: java.lang.Class[T]): T = obj
- }
- kryo.register(None.getClass, new SingletonSerializer[AnyRef](None))
- kryo.register(Nil.getClass, new SingletonSerializer[AnyRef](Nil))
-
- // Register maps with a special serializer since they have complex internal structure
- class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
- extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
-
- //hack, look at https://groups.google.com/forum/#!msg/kryo-users/Eu5V4bxCfws/k-8UQ22y59AJ
- private final val FAKE_REFERENCE = new Object()
- override def write(
- kryo: Kryo,
- output: KryoOutput,
- obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
- val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
- output.writeInt(map.size)
- for ((k, v) <- map) {
- kryo.writeClassAndObject(output, k)
- kryo.writeClassAndObject(output, v)
- }
- }
- override def read (
- kryo: Kryo,
- input: KryoInput,
- cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
- : Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
- kryo.reference(FAKE_REFERENCE)
- val size = input.readInt()
- val elems = new Array[(Any, Any)](size)
- for (i <- 0 until size) {
- val k = kryo.readClassAndObject(input)
- val v = kryo.readClassAndObject(input)
- elems(i)=(k,v)
- }
- buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
+ // Allow the user to register their own classes by setting spark.kryo.registrator
+ try {
+ Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+ logDebug("Running user registrator: " + regCls)
+ val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+ reg.registerClasses(kryo)
}
+ } catch {
+ case _: Exception => println("Failed to register spark.kryo.registrator")
}
- kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _))
- // TODO: add support for immutable maps too; this is more annoying because there are many
- // subclasses of immutable.Map for small maps (with <= 4 entries)
- val map1 = Map[Any, Any](1 -> 1)
- val map2 = Map[Any, Any](1 -> 1, 2 -> 2)
- val map3 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3)
- val map4 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4)
- val map5 = Map[Any, Any](1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4, 5 -> 5)
- kryo.register(map1.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map2.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map3.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map4.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- kryo.register(map5.getClass, new ScalaMapSerializer(mutable.HashMap() ++ _ toMap))
- // Allow the user to register their own classes by setting spark.kryo.registrator
- val regCls = System.getProperty("spark.kryo.registrator")
- if (regCls != null) {
- logInfo("Running user registrator: " + regCls)
- val classLoader = Thread.currentThread.getContextClassLoader
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
- reg.registerClasses(kryo)
- }
+ kryo.setClassLoader(classLoader)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
@@ -235,7 +151,6 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
}
def newInstance(): SerializerInstance = {
- this.kryo.get().setClassLoader(Thread.currentThread().getContextClassLoader)
new KryoSerializerInstance(this)
}
-}
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index c3323dcbb3..30d2d5282b 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -18,13 +18,10 @@
package spark
import scala.collection.mutable
-import scala.collection.immutable
import org.scalatest.FunSuite
import com.esotericsoftware.kryo._
-import SparkContext._
-
class KryoSerializerSuite extends FunSuite {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
@@ -53,6 +50,7 @@ class KryoSerializerSuite extends FunSuite {
check(Array(true, false, true))
check(Array('a', 'b', 'c'))
check(Array[Int]())
+ check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
}
test("pairs") {
@@ -103,7 +101,7 @@ class KryoSerializerSuite extends FunSuite {
}
test("custom registrator") {
- import spark.test._
+ import KryoTest._
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
val ser = (new KryoSerializer).newInstance()
@@ -123,14 +121,14 @@ class KryoSerializerSuite extends FunSuite {
val hashMap = new java.util.HashMap[String, String]
hashMap.put("foo", "bar")
check(hashMap)
-
+
System.clearProperty("spark.kryo.registrator")
}
}
-package test {
+object KryoTest {
case class CaseClass(i: Int, s: String) {}
-
+
class ClassWithNoArgConstructor {
var x: Int = 0
override def equals(other: Any) = other match {
@@ -154,4 +152,4 @@ package test {
k.register(classOf[java.util.HashMap[_, _]])
}
}
-}
+} \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4cf9589b07..469dc64329 100644
--- a/pom.xml
+++ b/pom.xml
@@ -193,9 +193,14 @@
<version>2.4.1</version>
</dependency>
<dependency>
- <groupId>de.javakaffee</groupId>
- <artifactId>kryo-serializers</artifactId>
- <version>0.22</version>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill</artifactId>
+ <version>0.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-java</artifactId>
+ <version>0.3.0</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index d4d70afdd5..9920e00a67 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -170,7 +170,6 @@ object SparkBuild extends Build {
"com.ning" % "compress-lzf" % "0.8.4",
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
- "de.javakaffee" % "kryo-serializers" % "0.22",
"com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-remote" % "2.0.5" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty),
@@ -181,7 +180,9 @@ object SparkBuild extends Build {
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"com.codahale.metrics" % "metrics-core" % "3.0.0",
- "com.codahale.metrics" % "metrics-jvm" % "3.0.0"
+ "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
+ "com.twitter" % "chill_2.9.3" % "0.3.0",
+ "com.twitter" % "chill-java" % "0.3.0"
) ++ (
if (HADOOP_MAJOR_VERSION == "2") {
if (HADOOP_YARN) {