aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorKazuaki Ishizaki <ishizaki@jp.ibm.com>2016-09-27 14:18:32 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-27 14:18:32 +0800
commit85b0a157543201895557d66306b38b3ca52f2151 (patch)
treecfeae0712fd72172e7d0fa1c16f50df32e7777f6 /sql/core/src/test
parent6ee28423ad1b2e6089b82af64a31d77d3552bb38 (diff)
downloadspark-85b0a157543201895557d66306b38b3ca52f2151.tar.gz
spark-85b0a157543201895557d66306b38b3ca52f2151.tar.bz2
spark-85b0a157543201895557d66306b38b3ca52f2151.zip
[SPARK-15962][SQL] Introduce implementation with a dense format for UnsafeArrayData
## What changes were proposed in this pull request? This PR introduces more compact representation for ```UnsafeArrayData```. ```UnsafeArrayData``` needs to accept ```null``` value in each entry of an array. In the current version, it has three parts ``` [numElements] [offsets] [values] ``` `Offsets` has the number of `numElements`, and represents `null` if its value is negative. It may increase memory footprint, and introduces an indirection for accessing each of `values`. This PR uses bitvectors to represent nullability for each element like `UnsafeRow`, and eliminates an indirection for accessing each element. The new ```UnsafeArrayData``` has four parts. ``` [numElements][null bits][values or offset&length][variable length portion] ``` In the `null bits` region, we store 1 bit per element, represents whether an element is null. Its total size is ceil(numElements / 8) bytes, and it is aligned to 8-byte boundaries. In the `values or offset&length` region, we store the content of elements. For fields that hold fixed-length primitive types, such as long, double, or int, we store the value directly in the field. For fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the base address of the array) that points to the beginning of the variable-length field and length (they are combined into a long). Each is word-aligned. For `variable length portion`, each is aligned to 8-byte boundaries. The new format can reduce memory footprint and improve performance of accessing each element. An example of memory foot comparison: 1024x1024 elements integer array Size of ```baseObject``` for ```UnsafeArrayData```: 8 + 1024x1024 + 1024x1024 = 2M bytes Size of ```baseObject``` for ```UnsafeArrayData```: 8 + 1024x1024/8 + 1024x1024 = 1.25M bytes In summary, we got 1.0-2.6x performance improvements over the code before applying this PR. Here are performance results of [benchmark programs](https://github.com/kiszk/spark/blob/04d2e4b6dbdc4eff43ce18b3c9b776e0129257c7/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala): **Read UnsafeArrayData**: 1.7x and 1.6x performance improvements over the code before applying this PR ```` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without SPARK-15962 Read UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 430 / 436 390.0 2.6 1.0X Double 456 / 485 367.8 2.7 0.9X With SPARK-15962 Read UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 252 / 260 666.1 1.5 1.0X Double 281 / 292 597.7 1.7 0.9X ```` **Write UnsafeArrayData**: 1.0x and 1.1x performance improvements over the code before applying this PR ```` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without SPARK-15962 Write UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 203 / 273 103.4 9.7 1.0X Double 239 / 356 87.9 11.4 0.8X With SPARK-15962 Write UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 196 / 249 107.0 9.3 1.0X Double 227 / 367 92.3 10.8 0.9X ```` **Get primitive array from UnsafeArrayData**: 2.6x and 1.6x performance improvements over the code before applying this PR ```` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without SPARK-15962 Get primitive array from UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 207 / 217 304.2 3.3 1.0X Double 257 / 363 245.2 4.1 0.8X With SPARK-15962 Get primitive array from UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 151 / 198 415.8 2.4 1.0X Double 214 / 394 293.6 3.4 0.7X ```` **Create UnsafeArrayData from primitive array**: 1.7x and 2.1x performance improvements over the code before applying this PR ```` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without SPARK-15962 Create UnsafeArrayData from primitive array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 340 / 385 185.1 5.4 1.0X Double 479 / 705 131.3 7.6 0.7X With SPARK-15962 Create UnsafeArrayData from primitive array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ Int 206 / 211 306.0 3.3 1.0X Double 232 / 406 271.6 3.7 0.9X ```` 1.7x and 1.4x performance improvements in [```UDTSerializationBenchmark```](https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala) over the code before applying this PR ```` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without SPARK-15962 VectorUDT de/serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ serialize 442 / 533 0.0 441927.1 1.0X deserialize 217 / 274 0.0 217087.6 2.0X With SPARK-15962 VectorUDT de/serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ serialize 265 / 318 0.0 265138.5 1.0X deserialize 155 / 197 0.0 154611.4 1.7X ```` ## How was this patch tested? Added unit tests into ```UnsafeArraySuite``` Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes #13680 from kiszk/SPARK-15962.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala232
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala4
2 files changed, 234 insertions, 2 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
new file mode 100644
index 0000000000..6c7779b579
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter}
+import org.apache.spark.util.Benchmark
+
+/**
+ * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData
+ * To run this:
+ * 1. replace ignore(...) with test(...)
+ * 2. build/sbt "sql/test-only *benchmark.UnsafeArrayDataBenchmark"
+ *
+ * Benchmarks in this file are skipped in normal builds.
+ */
+class UnsafeArrayDataBenchmark extends BenchmarkBase {
+
+ def calculateHeaderPortionInBytes(count: Int) : Int = {
+ /* 4 + 4 * count // Use this expression for SPARK-15962 */
+ UnsafeArrayData.calculateHeaderPortionInBytes(count)
+ }
+
+ def readUnsafeArray(iters: Int): Unit = {
+ val count = 1024 * 1024 * 16
+ val rand = new Random(42)
+
+ val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+ val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+ val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+ val readIntArray = { i: Int =>
+ var n = 0
+ while (n < iters) {
+ val len = intUnsafeArray.numElements
+ var sum = 0
+ var i = 0
+ while (i < len) {
+ sum += intUnsafeArray.getInt(i)
+ i += 1
+ }
+ n += 1
+ }
+ }
+
+ val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble }
+ val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+ val doubleUnsafeArray = doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+ val readDoubleArray = { i: Int =>
+ var n = 0
+ while (n < iters) {
+ val len = doubleUnsafeArray.numElements
+ var sum = 0.0
+ var i = 0
+ while (i < len) {
+ sum += doubleUnsafeArray.getDouble(i)
+ i += 1
+ }
+ n += 1
+ }
+ }
+
+ val benchmark = new Benchmark("Read UnsafeArrayData", count * iters)
+ benchmark.addCase("Int")(readIntArray)
+ benchmark.addCase("Double")(readDoubleArray)
+ benchmark.run
+ /*
+ OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
+ Intel Xeon E3-12xx v2 (Ivy Bridge)
+ Read UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Int 252 / 260 666.1 1.5 1.0X
+ Double 281 / 292 597.7 1.7 0.9X
+ */
+ }
+
+ def writeUnsafeArray(iters: Int): Unit = {
+ val count = 1024 * 1024 * 2
+ val rand = new Random(42)
+
+ var intTotalLength: Int = 0
+ val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+ val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+ val writeIntArray = { i: Int =>
+ var len = 0
+ var n = 0
+ while (n < iters) {
+ len += intEncoder.toRow(intPrimitiveArray).getArray(0).numElements()
+ n += 1
+ }
+ intTotalLength = len
+ }
+
+ var doubleTotalLength: Int = 0
+ val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble }
+ val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+ val writeDoubleArray = { i: Int =>
+ var len = 0
+ var n = 0
+ while (n < iters) {
+ len += doubleEncoder.toRow(doublePrimitiveArray).getArray(0).numElements()
+ n += 1
+ }
+ doubleTotalLength = len
+ }
+
+ val benchmark = new Benchmark("Write UnsafeArrayData", count * iters)
+ benchmark.addCase("Int")(writeIntArray)
+ benchmark.addCase("Double")(writeDoubleArray)
+ benchmark.run
+ /*
+ OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
+ Intel Xeon E3-12xx v2 (Ivy Bridge)
+ Write UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Int 196 / 249 107.0 9.3 1.0X
+ Double 227 / 367 92.3 10.8 0.9X
+ */
+ }
+
+ def getPrimitiveArray(iters: Int): Unit = {
+ val count = 1024 * 1024 * 12
+ val rand = new Random(42)
+
+ var intTotalLength: Int = 0
+ val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+ val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
+ val intUnsafeArray = intEncoder.toRow(intPrimitiveArray).getArray(0)
+ val readIntArray = { i: Int =>
+ var len = 0
+ var n = 0
+ while (n < iters) {
+ len += intUnsafeArray.toIntArray.length
+ n += 1
+ }
+ intTotalLength = len
+ }
+
+ var doubleTotalLength: Int = 0
+ val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble }
+ val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
+ val doubleUnsafeArray = doubleEncoder.toRow(doublePrimitiveArray).getArray(0)
+ val readDoubleArray = { i: Int =>
+ var len = 0
+ var n = 0
+ while (n < iters) {
+ len += doubleUnsafeArray.toDoubleArray.length
+ n += 1
+ }
+ doubleTotalLength = len
+ }
+
+ val benchmark = new Benchmark("Get primitive array from UnsafeArrayData", count * iters)
+ benchmark.addCase("Int")(readIntArray)
+ benchmark.addCase("Double")(readDoubleArray)
+ benchmark.run
+ /*
+ OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
+ Intel Xeon E3-12xx v2 (Ivy Bridge)
+ Get primitive array from UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Int 151 / 198 415.8 2.4 1.0X
+ Double 214 / 394 293.6 3.4 0.7X
+ */
+ }
+
+ def putPrimitiveArray(iters: Int): Unit = {
+ val count = 1024 * 1024 * 12
+ val rand = new Random(42)
+
+ var intTotalLen: Int = 0
+ val intPrimitiveArray = Array.fill[Int](count) { rand.nextInt }
+ val createIntArray = { i: Int =>
+ var len = 0
+ var n = 0
+ while (n < iters) {
+ len += UnsafeArrayData.fromPrimitiveArray(intPrimitiveArray).numElements
+ n += 1
+ }
+ intTotalLen = len
+ }
+
+ var doubleTotalLen: Int = 0
+ val doublePrimitiveArray = Array.fill[Double](count) { rand.nextDouble }
+ val createDoubleArray = { i: Int =>
+ var len = 0
+ var n = 0
+ while (n < iters) {
+ len += UnsafeArrayData.fromPrimitiveArray(doublePrimitiveArray).numElements
+ n += 1
+ }
+ doubleTotalLen = len
+ }
+
+ val benchmark = new Benchmark("Create UnsafeArrayData from primitive array", count * iters)
+ benchmark.addCase("Int")(createIntArray)
+ benchmark.addCase("Double")(createDoubleArray)
+ benchmark.run
+ /*
+ OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
+ Intel Xeon E3-12xx v2 (Ivy Bridge)
+ Create UnsafeArrayData from primitive array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ Int 206 / 211 306.0 3.3 1.0X
+ Double 232 / 406 271.6 3.7 0.9X
+ */
+ }
+
+ ignore("Benchmark UnsafeArrayData") {
+ readUnsafeArray(10)
+ writeUnsafeArray(10)
+ getPrimitiveArray(5)
+ putPrimitiveArray(5)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
index 052f4cbaeb..0b93c633b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala
@@ -73,8 +73,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4)
checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
- checkActualSize(ARRAY_TYPE, Array[Any](1), 16)
- checkActualSize(MAP_TYPE, Map(1 -> "a"), 29)
+ checkActualSize(ARRAY_TYPE, Array[Any](1), 8 + 8 + 8 + 8)
+ checkActualSize(MAP_TYPE, Map(1 -> "a"), 8 + (8 + 8 + 8 + 8) + (8 + 8 + 8 + 8))
checkActualSize(STRUCT_TYPE, Row("hello"), 28)
}