diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-01-04 18:49:41 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-04 18:49:41 -0800 |
commit | b1a771231e20df157fb3e780287390a883c0cc6f (patch) | |
tree | 980b90de0e7b173df0fb48a42f0faac251a9ff36 /sql/hive | |
parent | 77ab49b8575d2ebd678065fa70b0343d532ab9c2 (diff) | |
download | spark-b1a771231e20df157fb3e780287390a883c0cc6f.tar.gz spark-b1a771231e20df157fb3e780287390a883c0cc6f.tar.bz2 spark-b1a771231e20df157fb3e780287390a883c0cc6f.zip |
[SPARK-12480][SQL] add Hash expression that can calculate hash value for a group of expressions
just write the arguments into unsafe row and use murmur3 to calculate hash code
Author: Wenchen Fan <wenchen@databricks.com>
Closes #10435 from cloud-fan/hash-expr.
Diffstat (limited to 'sql/hive')
3 files changed, 27 insertions, 3 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 2b0e48dbfc..bd1a52e5f3 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -53,6 +53,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, 5) // Enable in-memory partition pruning for testing purposes TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) + // Use Hive hash expression instead of the native one + TestHive.functionRegistry.unregisterFunction("hash") RuleExecutor.resetTime() } @@ -62,6 +64,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { Locale.setDefault(originalLocale) TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) + TestHive.functionRegistry.restore() // For debugging dump some statistics about how much time was spent in various optimizer rules. logWarning(RuleExecutor.dumpTimeSpent()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 013fbab0a8..66d5f20d88 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -31,10 +31,13 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.sql.{SQLContext, SQLConf} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.execution.HiveNativeCommand +import org.apache.spark.sql.hive.client.ClientWrapper import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.{SparkConf, SparkContext} @@ -451,6 +454,27 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { logError("FATAL ERROR: Failed to reset TestDB state.", e) } } + + @transient + override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry( + org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive) +} + +private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: ClientWrapper) + extends HiveFunctionRegistry(fr, client) { + + private val removedFunctions = + collection.mutable.ArrayBuffer.empty[(String, (ExpressionInfo, FunctionBuilder))] + + def unregisterFunction(name: String): Unit = { + fr.functionBuilders.remove(name).foreach(f => removedFunctions += name -> f) + } + + def restore(): Unit = { + removedFunctions.foreach { + case (name, (info, builder)) => fr.registerFunction(name, info, builder) + } + } } private[hive] object TestHiveContext { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 8a5acaf3e1..acd1130f27 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -387,9 +387,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { createQueryTest("partitioned table scan", "SELECT ds, hr, key, value FROM srcpart") - createQueryTest("hash", - "SELECT hash('test') FROM src LIMIT 1") - createQueryTest("create table as", """ |CREATE TABLE createdtable AS SELECT * FROM src; |