aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-07-28 17:03:59 -0700
committerReynold Xin <rxin@databricks.com>2015-07-28 17:03:59 -0700
commitb7f54119f86f916481aeccc67f07e77dc2a924c7 (patch)
treea57ed36e55ba810eaf2b4516abdf8c4c3a5f3d6d
parentc5ed36953f840018f603dfde94fcb4651e5246ac (diff)
downloadspark-b7f54119f86f916481aeccc67f07e77dc2a924c7.tar.gz
spark-b7f54119f86f916481aeccc67f07e77dc2a924c7.tar.bz2
spark-b7f54119f86f916481aeccc67f07e77dc2a924c7.zip
[SPARK-9420][SQL] Move expressions in sql/core package to catalyst.
Since catalyst package already depends on Spark core, we can move those expressions into catalyst, and simplify function registry. This is a followup of #7478. Author: Reynold Xin <rxin@databricks.com> Closes #7735 from rxin/SPARK-8003 and squashes the following commits: 2ffbdc3 [Reynold Xin] [SPARK-8003][SQL] Move expressions in sql/core package to catalyst.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala17
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala)3
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala)3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala)4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala4
11 files changed, 23 insertions, 66 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a723e92114..a309ee35ee 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.analysis
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2, AggregateFunction2}
import org.apache.spark.sql.catalyst.expressions._
@@ -25,7 +27,6 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf}
import org.apache.spark.sql.types._
-import scala.collection.mutable.ArrayBuffer
/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 9b60943a1e..372f80d4a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -161,13 +161,6 @@ object FunctionRegistry {
expression[ToDegrees]("degrees"),
expression[ToRadians]("radians"),
- // misc functions
- expression[Md5]("md5"),
- expression[Sha2]("sha2"),
- expression[Sha1]("sha1"),
- expression[Sha1]("sha"),
- expression[Crc32]("crc32"),
-
// aggregate functions
expression[Average]("avg"),
expression[Count]("count"),
@@ -229,7 +222,15 @@ object FunctionRegistry {
expression[Year]("year"),
// collection functions
- expression[Size]("size")
+ expression[Size]("size"),
+
+ // misc functions
+ expression[Crc32]("crc32"),
+ expression[Md5]("md5"),
+ expression[Sha1]("sha"),
+ expression[Sha1]("sha1"),
+ expression[Sha2]("sha2"),
+ expression[SparkPartitionID]("spark_partition_id")
)
val builtin: FunctionRegistry = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
index eca36b3274..291b7a5bc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.expressions
+package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types.{LongType, DataType}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
index 98c8eab837..3f6480bbf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala
@@ -15,11 +15,10 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.expressions
+package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types.{IntegerType, DataType}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
index b6e79ff9cc..82894822ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala
@@ -15,11 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.expression
+package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions. ExpressionEvalHelper
-import org.apache.spark.sql.execution.expressions.{SparkPartitionID, MonotonicallyIncreasingID}
class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
test("MonotonicallyIncreasingID") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 56cd8f22e7..dbb2a09846 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,8 +31,6 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression, FunctionBuilder}
-import org.apache.spark.sql.execution.expressions.SparkPartitionID
import org.apache.spark.sql.SQLConf.SQLConfEntry
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.errors.DialectException
@@ -142,14 +140,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
// TODO how to handle the temp function per user session?
@transient
- protected[sql] lazy val functionRegistry: FunctionRegistry = {
- val reg = FunctionRegistry.builtin
- val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))](
- FunctionExpression[SparkPartitionID]("spark__partition__id")
- )
- extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name, info, fun) }
- reg
- }
+ protected[sql] lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin
@transient
protected[sql] lazy val analyzer: Analyzer =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
deleted file mode 100644
index 568b7ac2c5..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-/**
- * Package containing expressions that are specific to Spark runtime.
- */
-package object expressions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 0148991512..4261a5e7cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -634,7 +634,7 @@ object functions {
* @group normal_funcs
* @since 1.4.0
*/
- def monotonicallyIncreasingId(): Column = execution.expressions.MonotonicallyIncreasingID()
+ def monotonicallyIncreasingId(): Column = MonotonicallyIncreasingID()
/**
* Return an alternative value `r` if `l` is NaN.
@@ -741,7 +741,7 @@ object functions {
* @group normal_funcs
* @since 1.4.0
*/
- def sparkPartitionId(): Column = execution.expressions.SparkPartitionID()
+ def sparkPartitionId(): Column = SparkPartitionID()
/**
* Computes the square root of the specified float value.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 9b326c1635..d9c8b380ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -51,10 +51,10 @@ class UDFSuite extends QueryTest {
df.selectExpr("count(distinct a)")
}
- test("SPARK-8003 spark__partition__id") {
+ test("SPARK-8003 spark_partition_id") {
val df = Seq((1, "Tearing down the walls that divide us")).toDF("id", "saying")
df.registerTempTable("tmp_table")
- checkAnswer(ctx.sql("select spark__partition__id() from tmp_table").toDF(), Row(0))
+ checkAnswer(ctx.sql("select spark_partition_id() from tmp_table").toDF(), Row(0))
ctx.dropTempTable("tmp_table")
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 8b35c1275f..110f51a305 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -38,9 +38,6 @@ import org.apache.spark.Logging
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{expression => FunctionExpression, FunctionBuilder}
-import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
-import org.apache.spark.sql.execution.expressions.SparkPartitionID
import org.apache.spark.sql.SQLConf.SQLConfEntry
import org.apache.spark.sql.SQLConf.SQLConfEntry._
import org.apache.spark.sql.catalyst.{TableIdentifier, ParserDialect}
@@ -375,14 +372,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
// Note that HiveUDFs will be overridden by functions registered in this context.
@transient
- override protected[sql] lazy val functionRegistry: FunctionRegistry = {
- val reg = new HiveFunctionRegistry(FunctionRegistry.builtin)
- val extendedFunctions = List[(String, (ExpressionInfo, FunctionBuilder))](
- FunctionExpression[SparkPartitionID]("spark__partition__id")
- )
- extendedFunctions.foreach { case(name, (info, fun)) => reg.registerFunction(name, info, fun) }
- reg
- }
+ override protected[sql] lazy val functionRegistry: FunctionRegistry =
+ new HiveFunctionRegistry(FunctionRegistry.builtin)
/* An analyzer that uses the Hive metastore. */
@transient
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
index 9cea5d413c..37afc2142a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala
@@ -35,9 +35,9 @@ class UDFSuite extends QueryTest {
assert(ctx.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5)
}
- test("SPARK-8003 spark__partition__id") {
+ test("SPARK-8003 spark_partition_id") {
val df = Seq((1, "Two Fiiiiive")).toDF("id", "saying")
ctx.registerDataFrameAsTable(df, "test_table")
- checkAnswer(ctx.sql("select spark__partition__id() from test_table LIMIT 1").toDF(), Row(0))
+ checkAnswer(ctx.sql("select spark_partition_id() from test_table LIMIT 1").toDF(), Row(0))
}
}