aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorSandeep Singh <sandeep@techaddict.me>2016-10-22 12:03:37 -0700
committerReynold Xin <rxin@databricks.com>2016-10-22 12:03:37 -0700
commitbc167a2a53f5a795d089e8a884569b1b3e2cd439 (patch)
tree2b71829d0cf5f6737686533fef5d1dccf040c950 /core/src/test/scala
parent4f1dcd3dce270268b42fbe59409790364fa5c5df (diff)
downloadspark-bc167a2a53f5a795d089e8a884569b1b3e2cd439.tar.gz
spark-bc167a2a53f5a795d089e8a884569b1b3e2cd439.tar.bz2
spark-bc167a2a53f5a795d089e8a884569b1b3e2cd439.zip
[SPARK-928][CORE] Add support for Unsafe-based serializer in Kryo
## What changes were proposed in this pull request? Now since we have migrated to Kryo-3.0.0 in https://issues.apache.org/jira/browse/SPARK-11416, we can gives users option to use unsafe SerDer. It can turned by setting `spark.kryo.useUnsafe` to `true` ## How was this patch tested? Ran existing tests ``` Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ basicTypes: Int unsafe:true 160 / 178 98.5 10.1 1.0X basicTypes: Long unsafe:true 210 / 218 74.9 13.4 0.8X basicTypes: Float unsafe:true 203 / 213 77.5 12.9 0.8X basicTypes: Double unsafe:true 226 / 235 69.5 14.4 0.7X Array: Int unsafe:true 1087 / 1101 14.5 69.1 0.1X Array: Long unsafe:true 2758 / 2844 5.7 175.4 0.1X Array: Float unsafe:true 1511 / 1552 10.4 96.1 0.1X Array: Double unsafe:true 2942 / 2972 5.3 187.0 0.1X Map of string->Double unsafe:true 2645 / 2739 5.9 168.2 0.1X basicTypes: Int unsafe:false 211 / 218 74.7 13.4 0.8X basicTypes: Long unsafe:false 247 / 253 63.6 15.7 0.6X basicTypes: Float unsafe:false 211 / 216 74.5 13.4 0.8X basicTypes: Double unsafe:false 227 / 233 69.2 14.4 0.7X Array: Int unsafe:false 3012 / 3032 5.2 191.5 0.1X Array: Long unsafe:false 4463 / 4515 3.5 283.8 0.0X Array: Float unsafe:false 2788 / 2868 5.6 177.2 0.1X Array: Double unsafe:false 3558 / 3752 4.4 226.2 0.0X Map of string->Double unsafe:false 2806 / 2933 5.6 178.4 0.1X ``` Author: Sandeep Singh <sandeep@techaddict.me> Author: Sandeep Singh <sandeep@origamilogic.com> Closes #12913 from techaddict/SPARK-928.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala139
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala33
3 files changed, 173 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
new file mode 100644
index 0000000000..64be966276
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.Benchmark
+
+class KryoBenchmark extends SparkFunSuite {
+ val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10)
+
+ ignore(s"Benchmark Kryo Unsafe vs safe Serialization") {
+ Seq (true, false).foreach (runBenchmark)
+ benchmark.run()
+
+ // scalastyle:off
+ /*
+ Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ basicTypes: Int with unsafe:true 151 / 170 104.2 9.6 1.0X
+ basicTypes: Long with unsafe:true 175 / 191 89.8 11.1 0.9X
+ basicTypes: Float with unsafe:true 177 / 184 88.8 11.3 0.9X
+ basicTypes: Double with unsafe:true 193 / 216 81.4 12.3 0.8X
+ Array: Int with unsafe:true 513 / 587 30.7 32.6 0.3X
+ Array: Long with unsafe:true 1211 / 1358 13.0 77.0 0.1X
+ Array: Float with unsafe:true 890 / 964 17.7 56.6 0.2X
+ Array: Double with unsafe:true 1335 / 1428 11.8 84.9 0.1X
+ Map of string->Double with unsafe:true 931 / 988 16.9 59.2 0.2X
+ basicTypes: Int with unsafe:false 197 / 217 79.9 12.5 0.8X
+ basicTypes: Long with unsafe:false 219 / 240 71.8 13.9 0.7X
+ basicTypes: Float with unsafe:false 208 / 217 75.7 13.2 0.7X
+ basicTypes: Double with unsafe:false 208 / 225 75.6 13.2 0.7X
+ Array: Int with unsafe:false 2559 / 2681 6.1 162.7 0.1X
+ Array: Long with unsafe:false 3425 / 3516 4.6 217.8 0.0X
+ Array: Float with unsafe:false 2025 / 2134 7.8 128.7 0.1X
+ Array: Double with unsafe:false 2241 / 2358 7.0 142.5 0.1X
+ Map of string->Double with unsafe:false 1044 / 1085 15.1 66.4 0.1X
+ */
+ // scalastyle:on
+ }
+
+ private def runBenchmark(useUnsafe: Boolean): Unit = {
+ def check[T: ClassTag](t: T, ser: SerializerInstance): Int = {
+ if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0
+ }
+
+ // Benchmark Primitives
+ val basicTypeCount = 1000000
+ def basicTypes[T: ClassTag](name: String, gen: () => T): Unit = {
+ lazy val ser = createSerializer(useUnsafe)
+ val arrayOfBasicType: Array[T] = Array.fill(basicTypeCount)(gen())
+
+ benchmark.addCase(s"basicTypes: $name with unsafe:$useUnsafe") { _ =>
+ var sum = 0L
+ var i = 0
+ while (i < basicTypeCount) {
+ sum += check(arrayOfBasicType(i), ser)
+ i += 1
+ }
+ sum
+ }
+ }
+ basicTypes("Int", Random.nextInt)
+ basicTypes("Long", Random.nextLong)
+ basicTypes("Float", Random.nextFloat)
+ basicTypes("Double", Random.nextDouble)
+
+ // Benchmark Array of Primitives
+ val arrayCount = 10000
+ def basicTypeArray[T: ClassTag](name: String, gen: () => T): Unit = {
+ lazy val ser = createSerializer(useUnsafe)
+ val arrayOfArrays: Array[Array[T]] =
+ Array.fill(arrayCount)(Array.fill[T](Random.nextInt(arrayCount))(gen()))
+
+ benchmark.addCase(s"Array: $name with unsafe:$useUnsafe") { _ =>
+ var sum = 0L
+ var i = 0
+ while (i < arrayCount) {
+ val arr = arrayOfArrays(i)
+ sum += check(arr, ser)
+ i += 1
+ }
+ sum
+ }
+ }
+ basicTypeArray("Int", Random.nextInt)
+ basicTypeArray("Long", Random.nextLong)
+ basicTypeArray("Float", Random.nextFloat)
+ basicTypeArray("Double", Random.nextDouble)
+
+ // Benchmark Maps
+ val mapsCount = 1000
+ lazy val ser = createSerializer(useUnsafe)
+ val arrayOfMaps: Array[Map[String, Double]] = Array.fill(mapsCount) {
+ Array.fill(Random.nextInt(mapsCount)) {
+ (Random.nextString(mapsCount / 10), Random.nextDouble())
+ }.toMap
+ }
+
+ benchmark.addCase(s"Map of string->Double with unsafe:$useUnsafe") { _ =>
+ var sum = 0L
+ var i = 0
+ while (i < mapsCount) {
+ val map = arrayOfMaps(i)
+ sum += check(map, ser)
+ i += 1
+ }
+ sum
+ }
+ }
+
+ def createSerializer(useUnsafe: Boolean): SerializerInstance = {
+ val conf = new SparkConf()
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ conf.set("spark.kryo.unsafe", useUnsafe.toString)
+
+ new KryoSerializer(conf).newInstance()
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index bc6e98365d..5040841811 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.util.Utils
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ conf.set("spark.kryo.unsafe", "false")
test("SPARK-7392 configuration limits") {
val kryoBufferProperty = "spark.kryoserializer.buffer"
diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
new file mode 100644
index 0000000000..d63a45ae4a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+class UnsafeKryoSerializerSuite extends KryoSerializerSuite {
+
+ // This test suite should run all tests in KryoSerializerSuite with kryo unsafe.
+
+ override def beforeAll() {
+ conf.set("spark.kryo.unsafe", "true")
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ conf.set("spark.kryo.unsafe", "false")
+ super.afterAll()
+ }
+}