aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorJoseph Batchik <josephbatchik@gmail.com>2015-07-28 14:39:25 -0700
committerReynold Xin <rxin@databricks.com>2015-07-28 14:39:25 -0700
commitb88b868eb378bdb7459978842b5572a0b498f412 (patch)
tree4396c86c3a5fe95e1cb8b0325cd3f7e95e01aee1 /sql/hive
parent8d5bb5283c3cc9180ef34b05be4a715d83073b1e (diff)
downloadspark-b88b868eb378bdb7459978842b5572a0b498f412.tar.gz
spark-b88b868eb378bdb7459978842b5572a0b498f412.tar.bz2
spark-b88b868eb378bdb7459978842b5572a0b498f412.zip
[SPARK-8003][SQL] Added virtual column support to Spark
Added virtual column support by adding a new resolution role to the query analyzer. Additional virtual columns can be added by adding case expressions to [the new rule](https://github.com/JDrit/spark/blob/virt_columns/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L1026) and my modifying the [logical plan](https://github.com/JDrit/spark/blob/virt_columns/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L216) to resolve them. This also solves [SPARK-8003](https://issues.apache.org/jira/browse/SPARK-8003) This allows you to perform queries such as: ```sql select spark__partition__id, count(*) as c from table group by spark__partition__id; ``` Author: Joseph Batchik <josephbatchik@gmail.com> Author: JD <jd@csh.rit.edu> Closes #7478 from JDrit/virt_columns and squashes the following commits: 7932bf0 [Joseph Batchik] adding spark__partition__id to hive as well f8a9c6c [Joseph Batchik] merging in master e49da48 [JD] fixes for @rxin's suggestions 60e120b [JD] fixing test in merge 4bf8554 [JD] merging in master c68bc0f [Joseph Batchik] Adding function register ability to SQLContext and adding a function for spark__partition__id()
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala9
2 files changed, 19 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 110f51a305..8b35c1275f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -38,6 +38,9 @@ import org.apache.spark.Logging
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression, FunctionBuilder}
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+import org.apache.spark.sql.execution.expressions.SparkPartitionID
import org.apache.spark.sql.SQLConf.SQLConfEntry
import org.apache.spark.sql.SQLConf.SQLConfEntry._
import org.apache.spark.sql.catalyst.{TableIdentifier, ParserDialect}
@@ -372,8 +375,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
// Note that HiveUDFs will be overridden by functions registered in this context.
@transient
- override protected[sql] lazy val functionRegistry: FunctionRegistry =
- new HiveFunctionRegistry(FunctionRegistry.builtin)
+ override protected[sql] lazy val functionRegistry: FunctionRegistry = {
+ val reg = new HiveFunctionRegistry(FunctionRegistry.builtin)
+ val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))](
+ FunctionExpression[SparkPartitionID]("spark__partition__id")
+ )
+ extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name, info, fun) }
+ reg
+ }
/* An analyzer that uses the Hive metastore. */
@transient
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
index 4056dee777..9cea5d413c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.{Row, QueryTest}
case class FunctionResult(f1: String, f2: String)
class UDFSuite extends QueryTest {
private lazy val ctx = org.apache.spark.sql.hive.test.TestHive
+ import ctx.implicits._
test("UDF case insensitive") {
ctx.udf.register("random0", () => { Math.random() })
@@ -33,4 +34,10 @@ class UDFSuite extends QueryTest {
assert(ctx.sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0)
assert(ctx.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5)
}
+
+ test("SPARK-8003 spark__partition__id") {
+ val df = Seq((1, "Two Fiiiiive")).toDF("id", "saying")
+ ctx.registerDataFrameAsTable(df, "test_table")
+ checkAnswer(ctx.sql("select spark__partition__id() from test_table LIMIT 1").toDF(), Row(0))
+ }
}