From 4df65184b6b865a26e4d5c99bbfd3c24ab7179dc Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 15 Apr 2016 15:55:31 -0700 Subject: [SPARK-14620][SQL] Use/benchmark a better hash in VectorizedHashMap ## What changes were proposed in this pull request? This PR uses a better hashing algorithm while probing the AggregateHashMap: ```java long h = 0 h = (h ^ (0x9e3779b9)) + key_1 + (h << 6) + (h >>> 2); h = (h ^ (0x9e3779b9)) + key_2 + (h << 6) + (h >>> 2); h = (h ^ (0x9e3779b9)) + key_3 + (h << 6) + (h >>> 2); ... h = (h ^ (0x9e3779b9)) + key_n + (h << 6) + (h >>> 2); return h ``` Depends on: https://github.com/apache/spark/pull/12345 ## How was this patch tested? Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- codegen = F 2417 / 2457 8.7 115.2 1.0X codegen = T hashmap = F 1554 / 1581 13.5 74.1 1.6X codegen = T hashmap = T 877 / 929 23.9 41.8 2.8X Author: Sameer Agarwal Closes #12379 from sameeragarwal/hash. --- .../aggregate/VectorizedHashMapGenerator.scala | 48 +++++++++++----------- .../sql/execution/BenchmarkWholeStageCodegen.scala | 46 +++++++++++++++++++-- 2 files changed, 66 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala index 395cc7ab91..dd9b2f097e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala @@ -86,28 +86,21 @@ class VectorizedHashMapGenerator( | private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch; | private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch; | private int[] buckets; - | private int numBuckets; - | private int maxSteps; + | private int capacity = 1 << 16; + | private double loadFactor = 0.5; + | private int numBuckets = (int) (capacity / loadFactor); + | private int maxSteps = 2; | private int numRows = 0; | private org.apache.spark.sql.types.StructType schema = $generatedSchema | private org.apache.spark.sql.types.StructType aggregateBufferSchema = | $generatedAggBufferSchema | | public $generatedClassName() { - | // TODO: These should be generated based on the schema - | int DEFAULT_CAPACITY = 1 << 16; - | double DEFAULT_LOAD_FACTOR = 0.25; - | int DEFAULT_MAX_STEPS = 2; - | assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0)); - | this.maxSteps = DEFAULT_MAX_STEPS; - | numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR); - | | batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema, - | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); - | + | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); | // TODO: Possibly generate this projection in TungstenAggregate directly | aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate( - | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY); + | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity); | for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) { | aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length})); | } @@ -130,9 +123,11 @@ class VectorizedHashMapGenerator( */ private def generateHashFunction(): String = { s""" - |// TODO: Improve this hash function |private long hash($groupingKeySignature) { - | return ${groupingKeys.map(_._2).mkString(" | ")}; + | long h = 0; + | ${groupingKeys.map(key => s"h = (h ^ (0x9e3779b9)) + ${key._2} + (h << 6) + (h >>> 2);") + .mkString("\n")} + | return h; |} """.stripMargin } @@ -201,15 +196,20 @@ class VectorizedHashMapGenerator( | while (step < maxSteps) { | // Return bucket index if it's either an empty slot or already contains the key | if (buckets[idx] == -1) { - | ${groupingKeys.zipWithIndex.map(k => - s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")} - | ${bufferValues.zipWithIndex.map(k => - s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);") - .mkString("\n")} - | buckets[idx] = numRows++; - | batch.setNumRows(numRows); - | aggregateBufferBatch.setNumRows(numRows); - | return aggregateBufferBatch.getRow(buckets[idx]); + | if (numRows < capacity) { + | ${groupingKeys.zipWithIndex.map(k => + s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")} + | ${bufferValues.zipWithIndex.map(k => + s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);") + .mkString("\n")} + | buckets[idx] = numRows++; + | batch.setNumRows(numRows); + | aggregateBufferBatch.setNumRows(numRows); + | return aggregateBufferBatch.getRow(buckets[idx]); + | } else { + | // No more space + | return null; + | } | } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) { | return aggregateBufferBatch.getRow(buckets[idx]); | } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index d23f19c480..3fb70f2eb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -150,7 +150,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - ignore("aggregate with keys") { + ignore("aggregate with linear keys") { val N = 20 << 20 val benchmark = new Benchmark("Aggregate w keys", N) @@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - codegen = F 2219 / 2392 9.4 105.8 1.0X - codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X - codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X + codegen = F 2067 / 2166 10.1 98.6 1.0X + codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X + codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X + */ + } + + ignore("aggregate with randomized keys") { + val N = 20 << 20 + + val benchmark = new Benchmark("Aggregate w keys", N) + sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test") + + def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect() + + benchmark.addCase(s"codegen = F") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + f() + } + + benchmark.addCase(s"codegen = T hashmap = F") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + f() + } + + benchmark.addCase(s"codegen = T hashmap = T") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + f() + } + + benchmark.run() + + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + codegen = F 2517 / 2608 8.3 120.0 1.0X + codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X + codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X */ } -- cgit v1.2.3