aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZhang, Liye <liye.zhang@intel.com>2015-05-08 09:10:58 +0100
committerSean Owen <sowen@cloudera.com>2015-05-08 09:10:58 +0100
commitc2f0821aad3b82dcd327e914c9b297e92526649d (patch)
treedfed47cffa8aa569d19328cacd8fcf75aec67d9f
parentf496bf3c539a873ffdf3aa803847ef7b50135bd7 (diff)
downloadspark-c2f0821aad3b82dcd327e914c9b297e92526649d.tar.gz
spark-c2f0821aad3b82dcd327e914c9b297e92526649d.tar.bz2
spark-c2f0821aad3b82dcd327e914c9b297e92526649d.zip
[SPARK-7392] [CORE] bugfix: Kryo buffer size cannot be larger than 2M
Author: Zhang, Liye <liye.zhang@intel.com> Closes #5934 from liyezhang556520/kryoBufSize and squashes the following commits: 5707e04 [Zhang, Liye] fix import order 8693288 [Zhang, Liye] replace multiplier with ByteUnit methods 9bf93e9 [Zhang, Liye] add tests d91e5ed [Zhang, Liye] change kb to mb
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala30
2 files changed, 36 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index f9f78852f0..64ba27f34d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -32,6 +32,7 @@ import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
+import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.BoundedPriorityQueue
@@ -51,18 +52,18 @@ class KryoSerializer(conf: SparkConf)
private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k")
- if (bufferSizeKb >= 2048) {
+ if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) {
throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " +
- s"2048 mb, got: + $bufferSizeKb mb.")
+ s"2048 mb, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} mb.")
}
- private val bufferSize = (bufferSizeKb * 1024).toInt
+ private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt
val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
- if (maxBufferSizeMb >= 2048) {
+ if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) {
throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " +
s"2048 mb, got: + $maxBufferSizeMb mb.")
}
- private val maxBufferSize = maxBufferSizeMb * 1024 * 1024
+ private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index 778a7eee73..c7369de24b 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -32,6 +32,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ test("configuration limits") {
+ val conf1 = conf.clone()
+ val kryoBufferProperty = "spark.kryoserializer.buffer"
+ val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max"
+ conf1.set(kryoBufferProperty, "64k")
+ conf1.set(kryoBufferMaxProperty, "64m")
+ new KryoSerializer(conf1).newInstance()
+ // 2048m = 2097152k
+ conf1.set(kryoBufferProperty, "2097151k")
+ conf1.set(kryoBufferMaxProperty, "64m")
+ // should not throw exception when kryoBufferMaxProperty < kryoBufferProperty
+ new KryoSerializer(conf1).newInstance()
+ conf1.set(kryoBufferMaxProperty, "2097151k")
+ new KryoSerializer(conf1).newInstance()
+ val conf2 = conf.clone()
+ conf2.set(kryoBufferProperty, "2048m")
+ val thrown1 = intercept[IllegalArgumentException](new KryoSerializer(conf2).newInstance())
+ assert(thrown1.getMessage.contains(kryoBufferProperty))
+ val conf3 = conf.clone()
+ conf3.set(kryoBufferMaxProperty, "2048m")
+ val thrown2 = intercept[IllegalArgumentException](new KryoSerializer(conf3).newInstance())
+ assert(thrown2.getMessage.contains(kryoBufferMaxProperty))
+ val conf4 = conf.clone()
+ conf4.set(kryoBufferProperty, "2g")
+ conf4.set(kryoBufferMaxProperty, "3g")
+ val thrown3 = intercept[IllegalArgumentException](new KryoSerializer(conf4).newInstance())
+ assert(thrown3.getMessage.contains(kryoBufferProperty))
+ assert(!thrown3.getMessage.contains(kryoBufferMaxProperty))
+ }
+
test("basic types") {
val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {