diff options
-rw-r--r-- | sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java | 107 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala | 45 |
2 files changed, 142 insertions, 10 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java new file mode 100644 index 0000000000..abe8db589d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized; + +import java.util.Arrays; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.LongType; + +/** + * This is an illustrative implementation of an append-only single-key/single value aggregate hash + * map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates + * (and fall back to the `BytesToBytesMap` if a given key isn't found). This can be potentially + * 'codegened' in TungstenAggregate to speed up aggregates w/ key. + * + * It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the + * key-value pairs. The index lookups in the array rely on linear probing (with a small number of + * maximum tries) and use an inexpensive hash function which makes it really efficient for a + * majority of lookups. However, using linear probing and an inexpensive hash function also makes it + * less robust as compared to the `BytesToBytesMap` (especially for a large number of keys or even + * for certain distribution of keys) and requires us to fall back on the latter for correctness. + */ +public class AggregateHashMap { + public ColumnarBatch batch; + public int[] buckets; + + private int numBuckets; + private int numRows = 0; + private int maxSteps = 3; + + private static int DEFAULT_CAPACITY = 1 << 16; + private static double DEFAULT_LOAD_FACTOR = 0.25; + private static int DEFAULT_MAX_STEPS = 3; + + public AggregateHashMap(StructType schema, int capacity, double loadFactor, int maxSteps) { + + // We currently only support single key-value pair that are both longs + assert (schema.size() == 2 && schema.fields()[0].dataType() == LongType && + schema.fields()[1].dataType() == LongType); + + // capacity should be a power of 2 + assert (capacity > 0 && ((capacity & (capacity - 1)) == 0)); + + this.maxSteps = maxSteps; + numBuckets = (int) (capacity / loadFactor); + batch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, capacity); + buckets = new int[numBuckets]; + Arrays.fill(buckets, -1); + } + + public AggregateHashMap(StructType schema) { + this(schema, DEFAULT_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_MAX_STEPS); + } + + public int findOrInsert(long key) { + int idx = find(key); + if (idx != -1 && buckets[idx] == -1) { + batch.column(0).putLong(numRows, key); + batch.column(1).putLong(numRows, 0); + buckets[idx] = numRows++; + } + return idx; + } + + public int find(long key) { + long h = hash(key); + int step = 0; + int idx = (int) h & (numBuckets - 1); + while (step < maxSteps) { + // Return bucket index if it's either an empty slot or already contains the key + if (buckets[idx] == -1) { + return idx; + } else if (equals(idx, key)) { + return idx; + } + idx = (idx + 1) & (numBuckets - 1); + step++; + } + // Didn't find it + return -1; + } + + private long hash(long key) { + return key; + } + + private boolean equals(int idx, long key1) { + return batch.column(0).getLong(buckets[idx]) == key1; + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index a16092e7d7..003d3e062e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -23,8 +23,9 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.vectorized.AggregateHashMap import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{IntegerType, LongType, StructType} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 import org.apache.spark.unsafe.map.BytesToBytesMap @@ -463,18 +464,42 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } } + benchmark.addCase("Aggregate HashMap") { iter => + var i = 0 + val numKeys = 65536 + val schema = new StructType() + .add("key", LongType) + .add("value", LongType) + val map = new AggregateHashMap(schema) + while (i < numKeys) { + val idx = map.findOrInsert(i.toLong) + map.batch.column(1).putLong(map.buckets(idx), + map.batch.column(1).getLong(map.buckets(idx)) + 1) + i += 1 + } + var s = 0 + i = 0 + while (i < N) { + if (map.find(i % 100000) != -1) { + s += 1 + } + i += 1 + } + } + /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - hash 651 / 678 80.0 12.5 1.0X - fast hash 336 / 343 155.9 6.4 1.9X - arrayEqual 417 / 428 125.0 8.0 1.6X - Java HashMap (Long) 145 / 168 72.2 13.8 0.8X - Java HashMap (two ints) 157 / 164 66.8 15.0 0.8X - Java HashMap (UnsafeRow) 538 / 573 19.5 51.3 0.2X - BytesToBytesMap (off Heap) 2594 / 2664 20.2 49.5 0.2X - BytesToBytesMap (on Heap) 2693 / 2989 19.5 51.4 0.2X + hash 112 / 116 93.2 10.7 1.0X + fast hash 65 / 69 160.9 6.2 1.7X + arrayEqual 66 / 69 159.1 6.3 1.7X + Java HashMap (Long) 137 / 182 76.3 13.1 0.8X + Java HashMap (two ints) 182 / 230 57.8 17.3 0.6X + Java HashMap (UnsafeRow) 511 / 565 20.5 48.8 0.2X + BytesToBytesMap (off Heap) 481 / 515 21.8 45.9 0.2X + BytesToBytesMap (on Heap) 529 / 600 19.8 50.5 0.2X + Aggregate HashMap 56 / 62 187.9 5.3 2.0X */ benchmark.run() } |