aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLev Khomich <levkhomich@gmail.com>2015-03-10 10:55:42 +0000
committerSean Owen <sowen@cloudera.com>2015-03-10 10:55:42 +0000
commitc4c4b07bf61cab01d92fde4f902d8c06abdce240 (patch)
tree96a4fdb10e564ef955a445e20a3f526674701149
parent9a0272fbb322042788f14e9cd99e2db86b456225 (diff)
downloadspark-c4c4b07bf61cab01d92fde4f902d8c06abdce240.tar.gz
spark-c4c4b07bf61cab01d92fde4f902d8c06abdce240.tar.bz2
spark-c4c4b07bf61cab01d92fde4f902d8c06abdce240.zip
[SPARK-6087][CORE] Provide actionable exception if Kryo buffer is not large enough
A simple try-catch wrapping KryoException to be more informative. Author: Lev Khomich <levkhomich@gmail.com> Closes #4947 from levkhomich/master and squashes the following commits: 0f7a947 [Lev Khomich] [SPARK-6087][CORE] Provide actionable exception if Kryo buffer is not large enough
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala14
2 files changed, 21 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 9ce64d41fb..dc7aa99738 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -158,7 +158,13 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.clear()
- kryo.writeClassAndObject(output, t)
+ try {
+ kryo.writeClassAndObject(output, t)
+ } catch {
+ case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
+ throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
+ "increase spark.kryoserializer.buffer.max.mb value.")
+ }
ByteBuffer.wrap(output.toBytes)
}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 523d898207..6198df84fa 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -261,6 +261,20 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes))
}
}
+
+ test("serialization buffer overflow reporting") {
+ import org.apache.spark.SparkException
+ val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max.mb"
+
+ val largeObject = (1 to 1000000).toArray
+
+ val conf = new SparkConf(false)
+ conf.set(kryoBufferMaxProperty, "1")
+
+ val ser = new KryoSerializer(conf).newInstance()
+ val thrown = intercept[SparkException](ser.serialize(largeObject))
+ assert(thrown.getMessage.contains(kryoBufferMaxProperty))
+ }
}