aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-10-21 21:53:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-10-21 21:53:09 -0700
commit6bb56faea8d238ea22c2de33db93b1b39f492b3a (patch)
tree4f172ee623cd64827b6c78d8459554cd395b1660 /core
parent856b081729057f9da31a86e4bfa0dc0013492042 (diff)
downloadspark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.tar.gz
spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.tar.bz2
spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.zip
SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Author: Sandy Ryza <sandy@cloudera.com> Closes #789 from sryza/sandy-spark-1813 and squashes the following commits: 48b05e9 [Sandy Ryza] Simplify b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time 6a15bb7 [Sandy Ryza] Small fix a2278c0 [Sandy Ryza] Respond to review comments 6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala43
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java12
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala62
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala6
5 files changed, 119 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 605df0e929..dbbcc23305 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -18,7 +18,8 @@
package org.apache.spark
import scala.collection.JavaConverters._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, LinkedHashSet}
+import org.apache.spark.serializer.KryoSerializer
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
@@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}
+ /**
+ * Use Kryo serialization and register the given set of classes with Kryo.
+ * If called multiple times, this will append the classes from all calls together.
+ */
+ def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
+ val allClassNames = new LinkedHashSet[String]()
+ allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
+ allClassNames ++= classes.map(_.getName)
+
+ set("spark.kryo.classesToRegister", allClassNames.mkString(","))
+ set("spark.serializer", classOf[KryoSerializer].getName)
+ this
+ }
+
/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
settings.remove(key)
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index d6386f8c06..621a951c27 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -53,7 +53,18 @@ class KryoSerializer(conf: SparkConf)
private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
- private val registrator = conf.getOption("spark.kryo.registrator")
+ private val userRegistrator = conf.getOption("spark.kryo.registrator")
+ private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
+ .split(',')
+ .filter(!_.isEmpty)
+ .map { className =>
+ try {
+ Class.forName(className)
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Failed to load class to register with Kryo", e)
+ }
+ }
def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
@@ -80,22 +91,20 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
- // Allow the user to register their own classes by setting spark.kryo.registrator
- for (regCls <- registrator) {
- logDebug("Running user registrator: " + regCls)
- try {
- val reg = Class.forName(regCls, true, classLoader).newInstance()
- .asInstanceOf[KryoRegistrator]
-
- // Use the default classloader when calling the user registrator.
- Thread.currentThread.setContextClassLoader(classLoader)
- reg.registerClasses(kryo)
- } catch {
- case e: Exception =>
- throw new SparkException(s"Failed to invoke $regCls", e)
- } finally {
- Thread.currentThread.setContextClassLoader(oldClassLoader)
- }
+ try {
+ // Use the default classloader when calling the user registrator.
+ Thread.currentThread.setContextClassLoader(classLoader)
+ // Register classes given through spark.kryo.classesToRegister.
+ classesToRegister.foreach { clazz => kryo.register(clazz) }
+ // Allow the user to register their own classes by setting spark.kryo.registrator.
+ userRegistrator
+ .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator])
+ .foreach { reg => reg.registerClasses(kryo) }
+ } catch {
+ case e: Exception =>
+ throw new SparkException(s"Failed to register classes with Kryo", e)
+ } finally {
+ Thread.currentThread.setContextClassLoader(oldClassLoader)
}
// Register Chill's classes; we do this after our ranges and the user's own classes to let
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 3190148fb5..814e40c4f7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1418,4 +1418,16 @@ public class JavaAPISuite implements Serializable {
}
}
+ static class Class1 {}
+ static class Class2 {}
+
+ @Test
+ public void testRegisterKryoClasses() {
+ SparkConf conf = new SparkConf();
+ conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class });
+ Assert.assertEquals(
+ Class1.class.getName() + "," + Class2.class.getName(),
+ conf.get("spark.kryo.classesToRegister"));
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 87e9012622..5d018ea986 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark
import org.scalatest.FunSuite
+import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
+import com.esotericsoftware.kryo.Kryo
class SparkConfSuite extends FunSuite with LocalSparkContext {
test("loading from system properties") {
@@ -133,4 +135,64 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
System.clearProperty("spark.test.a.b.c")
}
}
+
+ test("register kryo classes through registerKryoClasses") {
+ val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+
+ conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
+ assert(conf.get("spark.kryo.classesToRegister") ===
+ classOf[Class1].getName + "," + classOf[Class2].getName)
+
+ conf.registerKryoClasses(Array(classOf[Class3]))
+ assert(conf.get("spark.kryo.classesToRegister") ===
+ classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
+
+ conf.registerKryoClasses(Array(classOf[Class2]))
+ assert(conf.get("spark.kryo.classesToRegister") ===
+ classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
+
+ // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
+ // blow up.
+ val serializer = new KryoSerializer(conf)
+ serializer.newInstance().serialize(new Class1())
+ serializer.newInstance().serialize(new Class2())
+ serializer.newInstance().serialize(new Class3())
+ }
+
+ test("register kryo classes through registerKryoClasses and custom registrator") {
+ val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+
+ conf.registerKryoClasses(Array(classOf[Class1]))
+ assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)
+
+ conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)
+
+ // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
+ // blow up.
+ val serializer = new KryoSerializer(conf)
+ serializer.newInstance().serialize(new Class1())
+ serializer.newInstance().serialize(new Class2())
+ }
+
+ test("register kryo classes through conf") {
+ val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
+ conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
+ conf.set("spark.serializer", classOf[KryoSerializer].getName)
+
+ // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
+ // blow up.
+ val serializer = new KryoSerializer(conf)
+ serializer.newInstance().serialize(new StringBuffer())
+ }
+
+}
+
+class Class1 {}
+class Class2 {}
+class Class3 {}
+
+class CustomRegistrator extends KryoRegistrator {
+ def registerClasses(kryo: Kryo) {
+ kryo.register(classOf[Class2])
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index e1e35b688d..64ac6d2d92 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -210,13 +210,13 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
test("kryo with nonexistent custom registrator should fail") {
- import org.apache.spark.{SparkConf, SparkException}
+ import org.apache.spark.SparkException
val conf = new SparkConf(false)
conf.set("spark.kryo.registrator", "this.class.does.not.exist")
-
+
val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance())
- assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist"))
+ assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
}
test("default class loader can be set by a different thread") {