diff options
author | Zhang, Liye <liye.zhang@intel.com> | 2015-05-08 09:10:58 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-05-08 09:10:58 +0100 |
commit | c2f0821aad3b82dcd327e914c9b297e92526649d (patch) | |
tree | dfed47cffa8aa569d19328cacd8fcf75aec67d9f /core | |
parent | f496bf3c539a873ffdf3aa803847ef7b50135bd7 (diff) | |
download | spark-c2f0821aad3b82dcd327e914c9b297e92526649d.tar.gz spark-c2f0821aad3b82dcd327e914c9b297e92526649d.tar.bz2 spark-c2f0821aad3b82dcd327e914c9b297e92526649d.zip |
[SPARK-7392] [CORE] bugfix: Kryo buffer size cannot be larger than 2M
Author: Zhang, Liye <liye.zhang@intel.com>
Closes #5934 from liyezhang556520/kryoBufSize and squashes the following commits:
5707e04 [Zhang, Liye] fix import order
8693288 [Zhang, Liye] replace multiplier with ByteUnit methods
9bf93e9 [Zhang, Liye] add tests
d91e5ed [Zhang, Liye] change kb to mb
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala | 11 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 30 |
2 files changed, 36 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index f9f78852f0..64ba27f34d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -32,6 +32,7 @@ import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock} +import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ import org.apache.spark.util.BoundedPriorityQueue @@ -51,18 +52,18 @@ class KryoSerializer(conf: SparkConf) private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") - if (bufferSizeKb >= 2048) { + if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + - s"2048 mb, got: + $bufferSizeKb mb.") + s"2048 mb, got: + ${ByteUnit.KiB.toMiB(bufferSizeKb)} mb.") } - private val bufferSize = (bufferSizeKb * 1024).toInt + private val bufferSize = ByteUnit.KiB.toBytes(bufferSizeKb).toInt val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt - if (maxBufferSizeMb >= 2048) { + if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2)) { throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " + s"2048 mb, got: + $maxBufferSizeMb mb.") } - private val maxBufferSize = maxBufferSizeMb * 1024 * 1024 + private val maxBufferSize = ByteUnit.MiB.toBytes(maxBufferSizeMb).toInt private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 778a7eee73..c7369de24b 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -32,6 +32,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) + test("configuration limits") { + val conf1 = conf.clone() + val kryoBufferProperty = "spark.kryoserializer.buffer" + val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max" + conf1.set(kryoBufferProperty, "64k") + conf1.set(kryoBufferMaxProperty, "64m") + new KryoSerializer(conf1).newInstance() + // 2048m = 2097152k + conf1.set(kryoBufferProperty, "2097151k") + conf1.set(kryoBufferMaxProperty, "64m") + // should not throw exception when kryoBufferMaxProperty < kryoBufferProperty + new KryoSerializer(conf1).newInstance() + conf1.set(kryoBufferMaxProperty, "2097151k") + new KryoSerializer(conf1).newInstance() + val conf2 = conf.clone() + conf2.set(kryoBufferProperty, "2048m") + val thrown1 = intercept[IllegalArgumentException](new KryoSerializer(conf2).newInstance()) + assert(thrown1.getMessage.contains(kryoBufferProperty)) + val conf3 = conf.clone() + conf3.set(kryoBufferMaxProperty, "2048m") + val thrown2 = intercept[IllegalArgumentException](new KryoSerializer(conf3).newInstance()) + assert(thrown2.getMessage.contains(kryoBufferMaxProperty)) + val conf4 = conf.clone() + conf4.set(kryoBufferProperty, "2g") + conf4.set(kryoBufferMaxProperty, "3g") + val thrown3 = intercept[IllegalArgumentException](new KryoSerializer(conf4).newInstance()) + assert(thrown3.getMessage.contains(kryoBufferProperty)) + assert(!thrown3.getMessage.contains(kryoBufferMaxProperty)) + } + test("basic types") { val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { |