diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-04-08 13:52:28 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-08 13:52:28 -0700 |
commit | f8c9beca38f1f396eb3220b23db6d77112a50293 (patch) | |
tree | aa5a5e8867443c98fb07aa373f2a0900ee2cd0bc /streaming | |
parent | 02757535b58069ce8258108d89d8172a53c358e5 (diff) | |
download | spark-f8c9beca38f1f396eb3220b23db6d77112a50293.tar.gz spark-f8c9beca38f1f396eb3220b23db6d77112a50293.tar.bz2 spark-f8c9beca38f1f396eb3220b23db6d77112a50293.zip |
[SPARK-14394][SQL] Generate AggregateHashMap class for LongTypes during TungstenAggregate codegen
## What changes were proposed in this pull request?
This PR adds support for generating the `AggregateHashMap` class in `TungstenAggregate` if the aggregate group by keys/value are of `LongType`. Note that currently this generate aggregate is not actually used.
NB: This currently only supports `LongType` keys/values (please see `isAggregateHashMapSupported` in `TungstenAggregate`) and will be generalized to other data types in a subsequent PR.
## How was this patch tested?
Manually inspected the generated code. This is what the generated map looks like for 2 keys:
```java
/* 068 */ public class agg_GeneratedAggregateHashMap {
/* 069 */ private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
/* 070 */ private int[] buckets;
/* 071 */ private int numBuckets;
/* 072 */ private int maxSteps;
/* 073 */ private int numRows = 0;
/* 074 */ private org.apache.spark.sql.types.StructType schema =
/* 075 */ new org.apache.spark.sql.types.StructType()
/* 076 */ .add("k1", org.apache.spark.sql.types.DataTypes.LongType)
/* 077 */ .add("k2", org.apache.spark.sql.types.DataTypes.LongType)
/* 078 */ .add("sum", org.apache.spark.sql.types.DataTypes.LongType);
/* 079 */
/* 080 */ public agg_GeneratedAggregateHashMap(int capacity, double loadFactor, int maxSteps) {
/* 081 */ assert (capacity > 0 && ((capacity & (capacity - 1)) == 0));
/* 082 */ this.maxSteps = maxSteps;
/* 083 */ numBuckets = (int) (capacity / loadFactor);
/* 084 */ batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
/* 085 */ org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
/* 086 */ buckets = new int[numBuckets];
/* 087 */ java.util.Arrays.fill(buckets, -1);
/* 088 */ }
/* 089 */
/* 090 */ public agg_GeneratedAggregateHashMap() {
/* 091 */ new agg_GeneratedAggregateHashMap(1 << 16, 0.25, 5);
/* 092 */ }
/* 093 */
/* 094 */ public org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row findOrInsert(long agg_key, long agg_key1) {
/* 095 */ long h = hash(agg_key, agg_key1);
/* 096 */ int step = 0;
/* 097 */ int idx = (int) h & (numBuckets - 1);
/* 098 */ while (step < maxSteps) {
/* 099 */ // Return bucket index if it's either an empty slot or already contains the key
/* 100 */ if (buckets[idx] == -1) {
/* 101 */ batch.column(0).putLong(numRows, agg_key);
/* 102 */ batch.column(1).putLong(numRows, agg_key1);
/* 103 */ batch.column(2).putLong(numRows, 0);
/* 104 */ buckets[idx] = numRows++;
/* 105 */ return batch.getRow(buckets[idx]);
/* 106 */ } else if (equals(idx, agg_key, agg_key1)) {
/* 107 */ return batch.getRow(buckets[idx]);
/* 108 */ }
/* 109 */ idx = (idx + 1) & (numBuckets - 1);
/* 110 */ step++;
/* 111 */ }
/* 112 */ // Didn't find it
/* 113 */ return null;
/* 114 */ }
/* 115 */
/* 116 */ private boolean equals(int idx, long agg_key, long agg_key1) {
/* 117 */ return batch.column(0).getLong(buckets[idx]) == agg_key && batch.column(1).getLong(buckets[idx]) == agg_key1;
/* 118 */ }
/* 119 */
/* 120 */ // TODO: Improve this Hash Function
/* 121 */ private long hash(long agg_key, long agg_key1) {
/* 122 */ return agg_key ^ agg_key1;
/* 123 */ }
/* 124 */
/* 125 */ }
```
Author: Sameer Agarwal <sameer@databricks.com>
Closes #12161 from sameeragarwal/tungsten-aggregate.
Diffstat (limited to 'streaming')
0 files changed, 0 insertions, 0 deletions