aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala10
2 files changed, 16 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 407cb9db6e..85944eabcf 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -79,15 +79,16 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
// Allow the user to register their own classes by setting spark.kryo.registrator
- try {
- for (regCls <- registrator) {
- logDebug("Running user registrator: " + regCls)
+ for (regCls <- registrator) {
+ logDebug("Running user registrator: " + regCls)
+ try {
val reg = Class.forName(regCls, true, classLoader).newInstance()
.asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
+ } catch {
+ case e: Exception =>
+ throw new SparkException(s"Failed to invoke $regCls", e)
}
- } catch {
- case e: Exception => logError("Failed to run spark.kryo.registrator", e)
}
// Register Chill's classes; we do this after our ranges and the user's own classes to let
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 789b773bae..3bf9efebb3 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -207,6 +207,16 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
.fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
assert(10 + control.sum === result)
}
+
+ test("kryo with nonexistent custom registrator should fail") {
+ import org.apache.spark.{SparkConf, SparkException}
+
+ val conf = new SparkConf(false)
+ conf.set("spark.kryo.registrator", "this.class.does.not.exist")
+
+ val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance())
+ assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist"))
+ }
}
class KryoSerializerResizableOutputSuite extends FunSuite {