aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-04-15 17:48:41 -0700
committerReynold Xin <rxin@databricks.com>2016-04-15 17:48:41 -0700
commitb2dfa849599843269a43e6e0f2ab8c539dfc32b6 (patch)
tree7ab85d25e84b91529b5ef7aa310154a645027c56
parent4df65184b6b865a26e4d5c99bbfd3c24ab7179dc (diff)
downloadspark-b2dfa849599843269a43e6e0f2ab8c539dfc32b6.tar.gz
spark-b2dfa849599843269a43e6e0f2ab8c539dfc32b6.tar.bz2
spark-b2dfa849599843269a43e6e0f2ab8c539dfc32b6.zip
[SPARK-14668][SQL] Move CurrentDatabase to Catalyst
## What changes were proposed in this pull request? This PR moves `CurrentDatabase` from sql/hive package to sql/catalyst. It also adds the function description, which looks like the following. ``` scala> sqlContext.sql("describe function extended current_database").collect.foreach(println) [Function: current_database] [Class: org.apache.spark.sql.execution.command.CurrentDatabase] [Usage: current_database() - Returns the current database.] [Extended Usage: > SELECT current_database()] ``` ## How was this patch tested? Existing tests Author: Yin Huai <yhuai@databricks.com> Closes #12424 from yhuai/SPARK-14668.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala1
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala24
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala18
7 files changed, 47 insertions, 24 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index f2abf136da..028463ed4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -337,6 +337,7 @@ object FunctionRegistry {
expression[SparkPartitionID]("spark_partition_id"),
expression[InputFileName]("input_file_name"),
expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
+ expression[CurrentDatabase]("current_database"),
// grouping sets
expression[Cube]("cube"),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 4bd918ed01..113fc862c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -512,3 +512,15 @@ object XxHash64Function extends InterpretedHashFunction {
XXH64.hashUnsafeBytes(base, offset, len, seed)
}
}
+
+/**
+ * Returns the current database of the SessionCatalog.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_() - Returns the current database.",
+ extended = "> SELECT _FUNC_()")
+private[sql] case class CurrentDatabase() extends LeafExpression with Unevaluable {
+ override def dataType: DataType = StringType
+ override def foldable: Boolean = true
+ override def nullable: Boolean = false
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f5172b213a..bb68ef826f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet
-import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases}
+import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf}
+import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
@@ -34,7 +36,9 @@ import org.apache.spark.sql.types._
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
-abstract class Optimizer extends RuleExecutor[LogicalPlan] {
+abstract class Optimizer(
+ conf: CatalystConf,
+ sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
@@ -43,6 +47,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
ComputeCurrentTime,
+ GetCurrentDatabase(sessionCatalog),
DistinctAggregationRewriter) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
@@ -117,7 +122,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
* To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
* specific rules go to the subclasses
*/
-object DefaultOptimizer extends Optimizer
+object DefaultOptimizer
+ extends Optimizer(
+ EmptyConf,
+ new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf))
/**
* Pushes operations down into a Sample.
@@ -1399,6 +1407,16 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
}
}
+/** Replaces the expression of CurrentDatabase with the current database name. */
+case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan transformAllExpressions {
+ case CurrentDatabase() =>
+ Literal.create(sessionCatalog.getCurrentDatabase, StringType)
+ }
+ }
+}
+
/**
* Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a
* [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
index 6e5672ddc3..55af6c5d6a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
@@ -38,7 +40,10 @@ class OptimizerExtendableSuite extends SparkFunSuite {
* This class represents a dummy extended optimizer that takes the batches of the
* Optimizer and adds custom ones.
*/
- class ExtendedOptimizer extends Optimizer {
+ class ExtendedOptimizer
+ extends Optimizer(
+ EmptyConf,
+ new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) {
// rules set to DummyRule, would not be executed anyways
val myBatches: Seq[Batch] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index cbde777d98..8dfbba779d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -18,9 +18,14 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
-class SparkOptimizer(experimentalMethods: ExperimentalMethods) extends Optimizer {
+class SparkOptimizer(
+ conf: CatalystConf,
+ sessionCatalog: SessionCatalog,
+ experimentalMethods: ExperimentalMethods) extends Optimizer(conf, sessionCatalog) {
override def batches: Seq[Batch] = super.batches :+ Batch(
"User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 69e3358d4e..10497e4fdf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Logical query plan optimizer.
*/
- lazy val optimizer: Optimizer = new SparkOptimizer(experimentalMethods)
+ lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, experimentalMethods)
/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ff93bfc4a3..7ebdad1a68 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -61,19 +61,6 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
/**
- * Returns the current database of metadataHive.
- */
-private[hive] case class CurrentDatabase(ctx: HiveContext)
- extends LeafExpression with CodegenFallback {
- override def dataType: DataType = StringType
- override def foldable: Boolean = true
- override def nullable: Boolean = false
- override def eval(input: InternalRow): Any = {
- UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase)
- }
-}
-
-/**
* An instance of the Spark SQL execution engine that integrates with data stored in Hive.
* Configuration for Hive is read from hive-site.xml on the classpath.
*
@@ -133,11 +120,6 @@ class HiveContext private[hive](
@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)
- // The Hive UDF current_database() is foldable, will be evaluated by optimizer,
- // but the optimizer can't access the SessionState of metadataHive.
- sessionState.functionRegistry.registerFunction(
- "current_database", (e: Seq[Expression]) => new CurrentDatabase(self))
-
/**
* When true, enables an experimental feature where metastore tables that use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan, instead of the Hive