aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-01-04 18:49:41 -0800
committerReynold Xin <rxin@databricks.com>2016-01-04 18:49:41 -0800
commitb1a771231e20df157fb3e780287390a883c0cc6f (patch)
tree980b90de0e7b173df0fb48a42f0faac251a9ff36 /sql/hive
parent77ab49b8575d2ebd678065fa70b0343d532ab9c2 (diff)
downloadspark-b1a771231e20df157fb3e780287390a883c0cc6f.tar.gz
spark-b1a771231e20df157fb3e780287390a883c0cc6f.tar.bz2
spark-b1a771231e20df157fb3e780287390a883c0cc6f.zip
[SPARK-12480][SQL] add Hash expression that can calculate hash value for a group of expressions
just write the arguments into unsafe row and use murmur3 to calculate hash code Author: Wenchen Fan <wenchen@databricks.com> Closes #10435 from cloud-fan/hash-expr.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala24
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala3
3 files changed, 27 insertions, 3 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 2b0e48dbfc..bd1a52e5f3 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -53,6 +53,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, 5)
// Enable in-memory partition pruning for testing purposes
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
+ // Use Hive hash expression instead of the native one
+ TestHive.functionRegistry.unregisterFunction("hash")
RuleExecutor.resetTime()
}
@@ -62,6 +64,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
Locale.setDefault(originalLocale)
TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
+ TestHive.functionRegistry.restore()
// For debugging dump some statistics about how much time was spent in various optimizer rules.
logWarning(RuleExecutor.dumpTimeSpent())
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 013fbab0a8..66d5f20d88 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -31,10 +31,13 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.spark.sql.{SQLContext, SQLConf}
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
+import org.apache.spark.sql.hive.client.ClientWrapper
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.{SparkConf, SparkContext}
@@ -451,6 +454,27 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
logError("FATAL ERROR: Failed to reset TestDB state.", e)
}
}
+
+ @transient
+ override protected[sql] lazy val functionRegistry = new TestHiveFunctionRegistry(
+ org.apache.spark.sql.catalyst.analysis.FunctionRegistry.builtin.copy(), this.executionHive)
+}
+
+private[hive] class TestHiveFunctionRegistry(fr: SimpleFunctionRegistry, client: ClientWrapper)
+ extends HiveFunctionRegistry(fr, client) {
+
+ private val removedFunctions =
+ collection.mutable.ArrayBuffer.empty[(String, (ExpressionInfo, FunctionBuilder))]
+
+ def unregisterFunction(name: String): Unit = {
+ fr.functionBuilders.remove(name).foreach(f => removedFunctions += name -> f)
+ }
+
+ def restore(): Unit = {
+ removedFunctions.foreach {
+ case (name, (info, builder)) => fr.registerFunction(name, info, builder)
+ }
+ }
}
private[hive] object TestHiveContext {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 8a5acaf3e1..acd1130f27 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -387,9 +387,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
createQueryTest("partitioned table scan",
"SELECT ds, hr, key, value FROM srcpart")
- createQueryTest("hash",
- "SELECT hash('test') FROM src LIMIT 1")
-
createQueryTest("create table as",
"""
|CREATE TABLE createdtable AS SELECT * FROM src;