diff options
author | Yin Huai <yhuai@databricks.com> | 2016-04-15 17:48:41 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-15 17:48:41 -0700 |
commit | b2dfa849599843269a43e6e0f2ab8c539dfc32b6 (patch) | |
tree | 7ab85d25e84b91529b5ef7aa310154a645027c56 /sql | |
parent | 4df65184b6b865a26e4d5c99bbfd3c24ab7179dc (diff) | |
download | spark-b2dfa849599843269a43e6e0f2ab8c539dfc32b6.tar.gz spark-b2dfa849599843269a43e6e0f2ab8c539dfc32b6.tar.bz2 spark-b2dfa849599843269a43e6e0f2ab8c539dfc32b6.zip |
[SPARK-14668][SQL] Move CurrentDatabase to Catalyst
## What changes were proposed in this pull request?
This PR moves `CurrentDatabase` from sql/hive package to sql/catalyst. It also adds the function description, which looks like the following.
```
scala> sqlContext.sql("describe function extended current_database").collect.foreach(println)
[Function: current_database]
[Class: org.apache.spark.sql.execution.command.CurrentDatabase]
[Usage: current_database() - Returns the current database.]
[Extended Usage:
> SELECT current_database()]
```
## How was this patch tested?
Existing tests
Author: Yin Huai <yhuai@databricks.com>
Closes #12424 from yhuai/SPARK-14668.
Diffstat (limited to 'sql')
7 files changed, 47 insertions, 24 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index f2abf136da..028463ed4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -337,6 +337,7 @@ object FunctionRegistry { expression[SparkPartitionID]("spark_partition_id"), expression[InputFileName]("input_file_name"), expression[MonotonicallyIncreasingID]("monotonically_increasing_id"), + expression[CurrentDatabase]("current_database"), // grouping sets expression[Cube]("cube"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 4bd918ed01..113fc862c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -512,3 +512,15 @@ object XxHash64Function extends InterpretedHashFunction { XXH64.hashUnsafeBytes(base, offset, len, seed) } } + +/** + * Returns the current database of the SessionCatalog. + */ +@ExpressionDescription( + usage = "_FUNC_() - Returns the current database.", + extended = "> SELECT _FUNC_()") +private[sql] case class CurrentDatabase() extends LeafExpression with Unevaluable { + override def dataType: DataType = StringType + override def foldable: Boolean = true + override def nullable: Boolean = false +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f5172b213a..bb68ef826f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases} +import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf} +import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} @@ -34,7 +36,9 @@ import org.apache.spark.sql.types._ * Abstract class all optimizers should inherit of, contains the standard batches (extending * Optimizers can override this. */ -abstract class Optimizer extends RuleExecutor[LogicalPlan] { +abstract class Optimizer( + conf: CatalystConf, + sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] { def batches: Seq[Batch] = { // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). @@ -43,6 +47,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { Batch("Finish Analysis", Once, EliminateSubqueryAliases, ComputeCurrentTime, + GetCurrentDatabase(sessionCatalog), DistinctAggregationRewriter) :: ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here @@ -117,7 +122,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while * specific rules go to the subclasses */ -object DefaultOptimizer extends Optimizer +object DefaultOptimizer + extends Optimizer( + EmptyConf, + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) /** * Pushes operations down into a Sample. @@ -1399,6 +1407,16 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { } } +/** Replaces the expression of CurrentDatabase with the current database name. */ +case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = { + plan transformAllExpressions { + case CurrentDatabase() => + Literal.create(sessionCatalog.getCurrentDatabase, StringType) + } + } +} + /** * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] beneath it and a * [[SerializeFromObject]] above it. If these serializations can't be eliminated, we should embed diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala index 6e5672ddc3..55af6c5d6a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule @@ -38,7 +40,10 @@ class OptimizerExtendableSuite extends SparkFunSuite { * This class represents a dummy extended optimizer that takes the batches of the * Optimizer and adds custom ones. */ - class ExtendedOptimizer extends Optimizer { + class ExtendedOptimizer + extends Optimizer( + EmptyConf, + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) { // rules set to DummyRule, would not be executed anyways val myBatches: Seq[Batch] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index cbde777d98..8dfbba779d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -18,9 +18,14 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.ExperimentalMethods +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer -class SparkOptimizer(experimentalMethods: ExperimentalMethods) extends Optimizer { +class SparkOptimizer( + conf: CatalystConf, + sessionCatalog: SessionCatalog, + experimentalMethods: ExperimentalMethods) extends Optimizer(conf, sessionCatalog) { override def batches: Seq[Batch] = super.batches :+ Batch( "User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 69e3358d4e..10497e4fdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Logical query plan optimizer. */ - lazy val optimizer: Optimizer = new SparkOptimizer(experimentalMethods) + lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, experimentalMethods) /** * Parser that extracts expressions, plans, table identifiers etc. from SQL texts. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ff93bfc4a3..7ebdad1a68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -61,19 +61,6 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils /** - * Returns the current database of metadataHive. - */ -private[hive] case class CurrentDatabase(ctx: HiveContext) - extends LeafExpression with CodegenFallback { - override def dataType: DataType = StringType - override def foldable: Boolean = true - override def nullable: Boolean = false - override def eval(input: InternalRow): Any = { - UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase) - } -} - -/** * An instance of the Spark SQL execution engine that integrates with data stored in Hive. * Configuration for Hive is read from hive-site.xml on the classpath. * @@ -133,11 +120,6 @@ class HiveContext private[hive]( @transient protected[sql] override lazy val sessionState = new HiveSessionState(self) - // The Hive UDF current_database() is foldable, will be evaluated by optimizer, - // but the optimizer can't access the SessionState of metadataHive. - sessionState.functionRegistry.registerFunction( - "current_database", (e: Seq[Expression]) => new CurrentDatabase(self)) - /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive |