diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-10-21 21:53:09 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-10-21 21:53:09 -0700 |
commit | 6bb56faea8d238ea22c2de33db93b1b39f492b3a (patch) | |
tree | 4f172ee623cd64827b6c78d8459554cd395b1660 /core | |
parent | 856b081729057f9da31a86e4bfa0dc0013492042 (diff) | |
download | spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.tar.gz spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.tar.bz2 spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.zip |
SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Author: Sandy Ryza <sandy@cloudera.com>
Closes #789 from sryza/sandy-spark-1813 and squashes the following commits:
48b05e9 [Sandy Ryza] Simplify
b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time
6a15bb7 [Sandy Ryza] Small fix
a2278c0 [Sandy Ryza] Respond to review comments
6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Diffstat (limited to 'core')
5 files changed, 119 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e929..dbbcc23305 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -18,7 +18,8 @@ package org.apache.spark import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap +import scala.collection.mutable.{HashMap, LinkedHashSet} +import org.apache.spark.serializer.KryoSerializer /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + /** + * Use Kryo serialization and register the given set of classes with Kryo. + * If called multiple times, this will append the classes from all calls together. + */ + def registerKryoClasses(classes: Array[Class[_]]): SparkConf = { + val allClassNames = new LinkedHashSet[String]() + allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty) + allClassNames ++= classes.map(_.getName) + + set("spark.kryo.classesToRegister", allClassNames.mkString(",")) + set("spark.serializer", classOf[KryoSerializer].getName) + this + } + /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { settings.remove(key) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index d6386f8c06..621a951c27 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -53,7 +53,18 @@ class KryoSerializer(conf: SparkConf) private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val registrator = conf.getOption("spark.kryo.registrator") + private val userRegistrator = conf.getOption("spark.kryo.registrator") + private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") + .split(',') + .filter(!_.isEmpty) + .map { className => + try { + Class.forName(className) + } catch { + case e: Exception => + throw new SparkException("Failed to load class to register with Kryo", e) + } + } def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -80,22 +91,20 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) - // Allow the user to register their own classes by setting spark.kryo.registrator - for (regCls <- registrator) { - logDebug("Running user registrator: " + regCls) - try { - val reg = Class.forName(regCls, true, classLoader).newInstance() - .asInstanceOf[KryoRegistrator] - - // Use the default classloader when calling the user registrator. - Thread.currentThread.setContextClassLoader(classLoader) - reg.registerClasses(kryo) - } catch { - case e: Exception => - throw new SparkException(s"Failed to invoke $regCls", e) - } finally { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } + try { + // Use the default classloader when calling the user registrator. + Thread.currentThread.setContextClassLoader(classLoader) + // Register classes given through spark.kryo.classesToRegister. + classesToRegister.foreach { clazz => kryo.register(clazz) } + // Allow the user to register their own classes by setting spark.kryo.registrator. + userRegistrator + .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]) + .foreach { reg => reg.registerClasses(kryo) } + } catch { + case e: Exception => + throw new SparkException(s"Failed to register classes with Kryo", e) + } finally { + Thread.currentThread.setContextClassLoader(oldClassLoader) } // Register Chill's classes; we do this after our ranges and the user's own classes to let diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 3190148fb5..814e40c4f7 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1418,4 +1418,16 @@ public class JavaAPISuite implements Serializable { } } + static class Class1 {} + static class Class2 {} + + @Test + public void testRegisterKryoClasses() { + SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class }); + Assert.assertEquals( + Class1.class.getName() + "," + Class2.class.getName(), + conf.get("spark.kryo.classesToRegister")); + } + } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 87e9012622..5d018ea986 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark import org.scalatest.FunSuite +import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} +import com.esotericsoftware.kryo.Kryo class SparkConfSuite extends FunSuite with LocalSparkContext { test("loading from system properties") { @@ -133,4 +135,64 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { System.clearProperty("spark.test.a.b.c") } } + + test("register kryo classes through registerKryoClasses") { + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + + conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") === + classOf[Class1].getName + "," + classOf[Class2].getName) + + conf.registerKryoClasses(Array(classOf[Class3])) + assert(conf.get("spark.kryo.classesToRegister") === + classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + + conf.registerKryoClasses(Array(classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") === + classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new Class1()) + serializer.newInstance().serialize(new Class2()) + serializer.newInstance().serialize(new Class3()) + } + + test("register kryo classes through registerKryoClasses and custom registrator") { + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + + conf.registerKryoClasses(Array(classOf[Class1])) + assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName) + + conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new Class1()) + serializer.newInstance().serialize(new Class2()) + } + + test("register kryo classes through conf") { + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") + conf.set("spark.serializer", classOf[KryoSerializer].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new StringBuffer()) + } + +} + +class Class1 {} +class Class2 {} +class Class3 {} + +class CustomRegistrator extends KryoRegistrator { + def registerClasses(kryo: Kryo) { + kryo.register(classOf[Class2]) + } } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index e1e35b688d..64ac6d2d92 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -210,13 +210,13 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("kryo with nonexistent custom registrator should fail") { - import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.SparkException val conf = new SparkConf(false) conf.set("spark.kryo.registrator", "this.class.does.not.exist") - + val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance()) - assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist")) + assert(thrown.getMessage.contains("Failed to register classes with Kryo")) } test("default class loader can be set by a different thread") { |