aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala30
-rw-r--r--docs/configuration.md16
3 files changed, 44 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index fa79b25759..e60b802a86 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -48,11 +48,12 @@ class KryoSerializer(conf: SparkConf)
with Serializable {
private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
+ private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator")
- def newKryoOutput() = new KryoOutput(bufferSize)
+ def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 79280d1a06..789b773bae 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -209,6 +209,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
}
+class KryoSerializerResizableOutputSuite extends FunSuite {
+ import org.apache.spark.SparkConf
+ import org.apache.spark.SparkContext
+ import org.apache.spark.LocalSparkContext
+ import org.apache.spark.SparkException
+
+ // trial and error showed this will not serialize with 1mb buffer
+ val x = (1 to 400000).toArray
+
+ test("kryo without resizable output buffer should fail on large array") {
+ val conf = new SparkConf(false)
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set("spark.kryoserializer.buffer.mb", "1")
+ conf.set("spark.kryoserializer.buffer.max.mb", "1")
+ val sc = new SparkContext("local", "test", conf)
+ intercept[SparkException](sc.parallelize(x).collect)
+ LocalSparkContext.stop(sc)
+ }
+
+ test("kryo with resizable output buffer should succeed on large array") {
+ val conf = new SparkConf(false)
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set("spark.kryoserializer.buffer.mb", "1")
+ conf.set("spark.kryoserializer.buffer.max.mb", "2")
+ val sc = new SparkContext("local", "test", conf)
+ assert(sc.parallelize(x).collect === x)
+ LocalSparkContext.stop(sc)
+ }
+}
+
object KryoTest {
case class CaseClass(i: Int, s: String) {}
diff --git a/docs/configuration.md b/docs/configuration.md
index 2e6c85cc2b..ea69057b5b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -414,10 +414,18 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.kryoserializer.buffer.mb</code></td>
<td>2</td>
<td>
- Maximum object size to allow within Kryo (the library needs to create a buffer at least as
- large as the largest single object you'll serialize). Increase this if you get a "buffer limit
- exceeded" exception inside Kryo. Note that there will be one buffer <i>per core</i> on each
- worker.
+ Initial size of Kryo's serialization buffer, in megabytes. Note that there will be one buffer
+ <i>per core</i> on each worker. This buffer will grow up to
+ <code>spark.kryoserializer.buffer.max.mb</code> if needed.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.kryoserializer.buffer.max.mb</code></td>
+ <td>64</td>
+ <td>
+ Maximum allowable size of Kryo serialization buffer, in megabytes. This must be larger than any
+ object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception
+ inside Kryo.
</td>
</tr>
</table>