aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakeshi YAMAMURO <linguin.m.s@gmail.com>2016-02-25 20:17:48 -0800
committerReynold Xin <rxin@databricks.com>2016-02-25 20:17:48 -0800
commit1b39fafa75a162f183824ff2daa61d73b05ebc83 (patch)
treecbf89db45aaab350cae604ec7179623f5c076a0e /sql
parent633d63a48ad98754dc7c56f9ac150fc2aa4e42c5 (diff)
downloadspark-1b39fafa75a162f183824ff2daa61d73b05ebc83.tar.gz
spark-1b39fafa75a162f183824ff2daa61d73b05ebc83.tar.bz2
spark-1b39fafa75a162f183824ff2daa61d73b05ebc83.zip
[SPARK-13361][SQL] Add benchmark codes for Encoder#compress() in CompressionSchemeBenchmark
This pr added benchmark codes for Encoder#compress(). Also, it replaced the benchmark results with new ones because the output format of `Benchmark` changed. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes #11236 from maropu/CompressionSpike.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala282
1 files changed, 193 insertions, 89 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index 95eb5cf912..0000a5d1ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -17,19 +17,13 @@
package org.apache.spark.sql.execution.columnar.compression
-import java.nio.ByteBuffer
-import java.nio.ByteOrder
+import java.nio.{ByteBuffer, ByteOrder}
import org.apache.commons.lang3.RandomStringUtils
import org.apache.commons.math3.distribution.LogNormalDistribution
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
-import org.apache.spark.sql.execution.columnar.BOOLEAN
-import org.apache.spark.sql.execution.columnar.INT
-import org.apache.spark.sql.execution.columnar.LONG
-import org.apache.spark.sql.execution.columnar.NativeColumnType
-import org.apache.spark.sql.execution.columnar.SHORT
-import org.apache.spark.sql.execution.columnar.STRING
+import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING}
import org.apache.spark.sql.types.AtomicType
import org.apache.spark.util.Benchmark
import org.apache.spark.util.Utils._
@@ -53,35 +47,70 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
() => rng.sample
}
- private[this] def runBenchmark[T <: AtomicType](
+ private[this] def prepareEncodeInternal[T <: AtomicType](
+ count: Int,
+ tpe: NativeColumnType[T],
+ supportedScheme: CompressionScheme,
+ input: ByteBuffer): ((ByteBuffer, ByteBuffer) => ByteBuffer, Double, ByteBuffer) = {
+ assert(supportedScheme.supports(tpe))
+
+ def toRow(d: Any) = new GenericInternalRow(Array[Any](d))
+ val encoder = supportedScheme.encoder(tpe)
+ for (i <- 0 until count) {
+ encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0)
+ }
+ input.rewind()
+
+ val compressedSize = if (encoder.compressedSize == 0) {
+ input.remaining()
+ } else {
+ encoder.compressedSize
+ }
+
+ (encoder.compress, encoder.compressionRatio, allocateLocal(4 + compressedSize))
+ }
+
+ private[this] def runEncodeBenchmark[T <: AtomicType](
name: String,
iters: Int,
count: Int,
tpe: NativeColumnType[T],
input: ByteBuffer): Unit = {
-
val benchmark = new Benchmark(name, iters * count)
schemes.filter(_.supports(tpe)).map { scheme =>
- def toRow(d: Any) = new GenericInternalRow(Array[Any](d))
- val encoder = scheme.encoder(tpe)
- for (i <- 0 until count) {
- encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0)
- }
- input.rewind()
+ val (compressFunc, compressionRatio, buf) = prepareEncodeInternal(count, tpe, scheme, input)
+ val label = s"${getFormattedClassName(scheme)}(${compressionRatio.formatted("%.3f")})"
- val label = s"${getFormattedClassName(scheme)}(${encoder.compressionRatio.formatted("%.3f")})"
benchmark.addCase(label)({ i: Int =>
- val compressedSize = if (encoder.compressedSize == 0) {
- input.remaining()
- } else {
- encoder.compressedSize
+ for (n <- 0L until iters) {
+ compressFunc(input, buf)
+ input.rewind()
+ buf.rewind()
}
+ })
+ }
+
+ benchmark.run()
+ }
- val buf = allocateLocal(4 + compressedSize)
+ private[this] def runDecodeBenchmark[T <: AtomicType](
+ name: String,
+ iters: Int,
+ count: Int,
+ tpe: NativeColumnType[T],
+ input: ByteBuffer): Unit = {
+ val benchmark = new Benchmark(name, iters * count)
+
+ schemes.filter(_.supports(tpe)).map { scheme =>
+ val (compressFunc, _, buf) = prepareEncodeInternal(count, tpe, scheme, input)
+ val compressedBuf = compressFunc(input, buf)
+ val label = s"${getFormattedClassName(scheme)}"
+
+ input.rewind()
+
+ benchmark.addCase(label)({ i: Int =>
val rowBuf = new GenericMutableRow(1)
- val compressedBuf = encoder.compress(input, buf)
- input.rewind()
for (n <- 0L until iters) {
compressedBuf.rewind.position(4)
@@ -96,16 +125,10 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
benchmark.run()
}
- def bitDecode(iters: Int): Unit = {
+ def bitEncodingBenchmark(iters: Int): Unit = {
val count = 65536
val testData = allocateLocal(count * BOOLEAN.defaultSize)
- // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // BOOLEAN Decode: Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 124.98 536.96 1.00 X
- // RunLengthEncoding(2.494) 631.37 106.29 0.20 X
- // BooleanBitSet(0.125) 1200.36 55.91 0.10 X
val g = {
val rng = genLowerSkewData()
() => (rng().toInt % 2).toByte
@@ -113,110 +136,176 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
for (i <- 0 until count) {
testData.put(i * BOOLEAN.defaultSize, g())
}
- runBenchmark("BOOLEAN Decode", iters, count, BOOLEAN, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // BOOLEAN Encode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 3 / 4 19300.2 0.1 1.0X
+ // RunLengthEncoding(2.491) 923 / 939 72.7 13.8 0.0X
+ // BooleanBitSet(0.125) 359 / 363 187.1 5.3 0.0X
+ runEncodeBenchmark("BOOLEAN Encode", iters, count, BOOLEAN, testData)
+
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // BOOLEAN Decode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 129 / 136 519.8 1.9 1.0X
+ // RunLengthEncoding 613 / 623 109.4 9.1 0.2X
+ // BooleanBitSet 1196 / 1222 56.1 17.8 0.1X
+ runDecodeBenchmark("BOOLEAN Decode", iters, count, BOOLEAN, testData)
}
- def shortDecode(iters: Int): Unit = {
+ def shortEncodingBenchmark(iters: Int): Unit = {
val count = 65536
val testData = allocateLocal(count * SHORT.defaultSize)
- // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // SHORT Decode (Lower Skew): Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 376.87 178.07 1.00 X
- // RunLengthEncoding(1.498) 831.59 80.70 0.45 X
val g1 = genLowerSkewData()
for (i <- 0 until count) {
testData.putShort(i * SHORT.defaultSize, g1().toShort)
}
- runBenchmark("SHORT Decode (Lower Skew)", iters, count, SHORT, testData)
// Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // SHORT Decode (Higher Skew): Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 426.83 157.23 1.00 X
- // RunLengthEncoding(1.996) 845.56 79.37 0.50 X
+ // SHORT Encode (Lower Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 6 / 7 10971.4 0.1 1.0X
+ // RunLengthEncoding(1.510) 1526 / 1542 44.0 22.7 0.0X
+ runEncodeBenchmark("SHORT Encode (Lower Skew)", iters, count, SHORT, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // SHORT Decode (Lower Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 811 / 837 82.8 12.1 1.0X
+ // RunLengthEncoding 1219 / 1266 55.1 18.2 0.7X
+ runDecodeBenchmark("SHORT Decode (Lower Skew)", iters, count, SHORT, testData)
+
val g2 = genHigherSkewData()
for (i <- 0 until count) {
testData.putShort(i * SHORT.defaultSize, g2().toShort)
}
- runBenchmark("SHORT Decode (Higher Skew)", iters, count, SHORT, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // SHORT Encode (Higher Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 7 / 7 10112.4 0.1 1.0X
+ // RunLengthEncoding(2.009) 1623 / 1661 41.4 24.2 0.0X
+ runEncodeBenchmark("SHORT Encode (Higher Skew)", iters, count, SHORT, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // SHORT Decode (Higher Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 818 / 827 82.0 12.2 1.0X
+ // RunLengthEncoding 1202 / 1237 55.8 17.9 0.7X
+ runDecodeBenchmark("SHORT Decode (Higher Skew)", iters, count, SHORT, testData)
}
- def intDecode(iters: Int): Unit = {
+ def intEncodingBenchmark(iters: Int): Unit = {
val count = 65536
val testData = allocateLocal(count * INT.defaultSize)
- // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // INT Decode(Lower Skew): Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 325.16 206.39 1.00 X
- // RunLengthEncoding(0.997) 1219.44 55.03 0.27 X
- // DictionaryEncoding(0.500) 955.51 70.23 0.34 X
- // IntDelta(0.250) 1146.02 58.56 0.28 X
val g1 = genLowerSkewData()
for (i <- 0 until count) {
testData.putInt(i * INT.defaultSize, g1().toInt)
}
- runBenchmark("INT Decode(Lower Skew)", iters, count, INT, testData)
// Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // INT Decode(Higher Skew): Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 1133.45 59.21 1.00 X
- // RunLengthEncoding(1.334) 1399.00 47.97 0.81 X
- // DictionaryEncoding(0.501) 1032.87 64.97 1.10 X
- // IntDelta(0.250) 948.02 70.79 1.20 X
+ // INT Encode (Lower Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 18 / 19 3716.4 0.3 1.0X
+ // RunLengthEncoding(1.001) 1992 / 2056 33.7 29.7 0.0X
+ // DictionaryEncoding(0.500) 723 / 739 92.8 10.8 0.0X
+ // IntDelta(0.250) 368 / 377 182.2 5.5 0.0X
+ runEncodeBenchmark("INT Encode (Lower Skew)", iters, count, INT, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // INT Decode (Lower Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 821 / 845 81.8 12.2 1.0X
+ // RunLengthEncoding 1246 / 1256 53.9 18.6 0.7X
+ // DictionaryEncoding 757 / 766 88.6 11.3 1.1X
+ // IntDelta 680 / 689 98.7 10.1 1.2X
+ runDecodeBenchmark("INT Decode (Lower Skew)", iters, count, INT, testData)
+
val g2 = genHigherSkewData()
for (i <- 0 until count) {
testData.putInt(i * INT.defaultSize, g2().toInt)
}
- runBenchmark("INT Decode(Higher Skew)", iters, count, INT, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // INT Encode (Higher Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 17 / 19 3888.4 0.3 1.0X
+ // RunLengthEncoding(1.339) 2127 / 2148 31.5 31.7 0.0X
+ // DictionaryEncoding(0.501) 960 / 972 69.9 14.3 0.0X
+ // IntDelta(0.250) 362 / 366 185.5 5.4 0.0X
+ runEncodeBenchmark("INT Encode (Higher Skew)", iters, count, INT, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // INT Decode (Higher Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 838 / 884 80.1 12.5 1.0X
+ // RunLengthEncoding 1287 / 1311 52.1 19.2 0.7X
+ // DictionaryEncoding 844 / 859 79.5 12.6 1.0X
+ // IntDelta 764 / 784 87.8 11.4 1.1X
+ runDecodeBenchmark("INT Decode (Higher Skew)", iters, count, INT, testData)
}
- def longDecode(iters: Int): Unit = {
+ def longEncodingBenchmark(iters: Int): Unit = {
val count = 65536
val testData = allocateLocal(count * LONG.defaultSize)
- // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // LONG Decode(Lower Skew): Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 1101.07 60.95 1.00 X
- // RunLengthEncoding(0.756) 1372.57 48.89 0.80 X
- // DictionaryEncoding(0.250) 947.80 70.81 1.16 X
- // LongDelta(0.125) 721.51 93.01 1.53 X
val g1 = genLowerSkewData()
for (i <- 0 until count) {
testData.putLong(i * LONG.defaultSize, g1().toLong)
}
- runBenchmark("LONG Decode(Lower Skew)", iters, count, LONG, testData)
// Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // LONG Decode(Higher Skew): Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 986.71 68.01 1.00 X
- // RunLengthEncoding(1.013) 1348.69 49.76 0.73 X
- // DictionaryEncoding(0.251) 865.48 77.54 1.14 X
- // LongDelta(0.125) 816.90 82.15 1.21 X
+ // LONG Encode (Lower Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 37 / 38 1804.8 0.6 1.0X
+ // RunLengthEncoding(0.748) 2065 / 2094 32.5 30.8 0.0X
+ // DictionaryEncoding(0.250) 950 / 962 70.6 14.2 0.0X
+ // LongDelta(0.125) 475 / 482 141.2 7.1 0.1X
+ runEncodeBenchmark("LONG Encode (Lower Skew)", iters, count, LONG, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // LONG Decode (Lower Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 888 / 894 75.5 13.2 1.0X
+ // RunLengthEncoding 1301 / 1311 51.6 19.4 0.7X
+ // DictionaryEncoding 887 / 904 75.7 13.2 1.0X
+ // LongDelta 693 / 735 96.8 10.3 1.3X
+ runDecodeBenchmark("LONG Decode (Lower Skew)", iters, count, LONG, testData)
+
val g2 = genHigherSkewData()
for (i <- 0 until count) {
testData.putLong(i * LONG.defaultSize, g2().toLong)
}
- runBenchmark("LONG Decode(Higher Skew)", iters, count, LONG, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // LONG Encode (Higher Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 34 / 35 1963.9 0.5 1.0X
+ // RunLengthEncoding(0.999) 2260 / 3021 29.7 33.7 0.0X
+ // DictionaryEncoding(0.251) 1270 / 1438 52.8 18.9 0.0X
+ // LongDelta(0.125) 496 / 509 135.3 7.4 0.1X
+ runEncodeBenchmark("LONG Encode (Higher Skew)", iters, count, LONG, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // LONG Decode (Higher Skew): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 965 / 1494 69.5 14.4 1.0X
+ // RunLengthEncoding 1350 / 1378 49.7 20.1 0.7X
+ // DictionaryEncoding 892 / 924 75.2 13.3 1.1X
+ // LongDelta 817 / 847 82.2 12.2 1.2X
+ runDecodeBenchmark("LONG Decode (Higher Skew)", iters, count, LONG, testData)
}
- def stringDecode(iters: Int): Unit = {
+ def stringEncodingBenchmark(iters: Int): Unit = {
val count = 65536
val strLen = 8
val tableSize = 16
val testData = allocateLocal(count * (4 + strLen))
- // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
- // STRING Decode: Avg Time(ms) Avg Rate(M/s) Relative Rate
- // -------------------------------------------------------------------------------
- // PassThrough(1.000) 2277.05 29.47 1.00 X
- // RunLengthEncoding(0.893) 2624.35 25.57 0.87 X
- // DictionaryEncoding(0.167) 2672.28 25.11 0.85 X
val g = {
val dataTable = (0 until tableSize).map(_ => RandomStringUtils.randomAlphabetic(strLen))
val rng = genHigherSkewData()
@@ -227,14 +316,29 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
testData.put(g().getBytes)
}
testData.rewind()
- runBenchmark("STRING Decode", iters, count, STRING, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // STRING Encode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough(1.000) 56 / 57 1197.9 0.8 1.0X
+ // RunLengthEncoding(0.893) 4892 / 4937 13.7 72.9 0.0X
+ // DictionaryEncoding(0.167) 2968 / 2992 22.6 44.2 0.0X
+ runEncodeBenchmark("STRING Encode", iters, count, STRING, testData)
+
+ // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+ // STRING Decode: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ // -------------------------------------------------------------------------------------------
+ // PassThrough 2422 / 2449 27.7 36.1 1.0X
+ // RunLengthEncoding 2885 / 3018 23.3 43.0 0.8X
+ // DictionaryEncoding 2716 / 2752 24.7 40.5 0.9X
+ runDecodeBenchmark("STRING Decode", iters, count, STRING, testData)
}
def main(args: Array[String]): Unit = {
- bitDecode(1024)
- shortDecode(1024)
- intDecode(1024)
- longDecode(1024)
- stringDecode(1024)
+ bitEncodingBenchmark(1024)
+ shortEncodingBenchmark(1024)
+ intEncodingBenchmark(1024)
+ longEncodingBenchmark(1024)
+ stringEncodingBenchmark(1024)
}
}