diff options
author | scwf <wangfei1@huawei.com> | 2015-01-10 14:08:04 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-01-10 14:08:04 -0800 |
commit | b3e86dc62476abb03b330f86a788aa19a6565317 (patch) | |
tree | 9a3f25d6e6e4bd92d8c14a711d9bd0c1d57f38ac /sql/core | |
parent | 693a323a70aba91e6c100dd5561d218a75b7895e (diff) | |
download | spark-b3e86dc62476abb03b330f86a788aa19a6565317.tar.gz spark-b3e86dc62476abb03b330f86a788aa19a6565317.tar.bz2 spark-b3e86dc62476abb03b330f86a788aa19a6565317.zip |
[SPARK-4861][SQL] Refactory command in spark sql
Follow up for #3712.
This PR finally remove ```CommandStrategy``` and make all commands follow ```RunnableCommand``` so they can go with ```case r: RunnableCommand => ExecutedCommand(r) :: Nil```.
One exception is the ```DescribeCommand``` of hive, which is a special case and need to distinguish hive table and temporary table, so still keep ```HiveCommandStrategy``` here.
Author: scwf <wangfei1@huawei.com>
Closes #3948 from scwf/followup-SPARK-4861 and squashes the following commits:
6b48e64 [scwf] minor style fix
2c62e9d [scwf] fix for hive module
5a7a819 [scwf] Refactory command in spark sql
Diffstat (limited to 'sql/core')
4 files changed, 101 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 9962937277..6c575dd727 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -76,7 +76,7 @@ class SQLContext(@transient val sparkContext: SparkContext) @transient protected[sql] val sqlParser = { val fallback = new catalyst.SqlParser - new catalyst.SparkSQLParser(fallback(_)) + new SparkSQLParser(fallback(_)) } protected[sql] def parseSql(sql: String): LogicalPlan = { @@ -329,7 +329,6 @@ class SQLContext(@transient val sparkContext: SparkContext) def strategies: Seq[Strategy] = extraStrategies ++ ( - CommandStrategy :: DataSourceStrategy :: TakeOrdered :: HashAggregation :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala new file mode 100644 index 0000000000..65358b7d4e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.{SqlLexical, AbstractSparkSQLParser} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand} + +import scala.util.parsing.combinator.RegexParsers + +/** + * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL + * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser. + * + * @param fallback A function that parses an input string to a logical plan + */ +private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser { + + // A parser for the key-value part of the "SET [key = [value ]]" syntax + private object SetCommandParser extends RegexParsers { + private val key: Parser[String] = "(?m)[^=]+".r + + private val value: Parser[String] = "(?m).*$".r + + private val output: Seq[Attribute] = Seq(AttributeReference("", StringType, nullable = false)()) + + private val pair: Parser[LogicalPlan] = + (key ~ ("=".r ~> value).?).? ^^ { + case None => SetCommand(None, output) + case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output) + } + + def apply(input: String): LogicalPlan = parseAll(pair, input) match { + case Success(plan, _) => plan + case x => sys.error(x.toString) + } + } + + protected val AS = Keyword("AS") + protected val CACHE = Keyword("CACHE") + protected val LAZY = Keyword("LAZY") + protected val SET = Keyword("SET") + protected val TABLE = Keyword("TABLE") + protected val UNCACHE = Keyword("UNCACHE") + + protected implicit def asParser(k: Keyword): Parser[String] = + lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) + + private val reservedWords: Seq[String] = + this + .getClass + .getMethods + .filter(_.getReturnType == classOf[Keyword]) + .map(_.invoke(this).asInstanceOf[Keyword].str) + + override val lexical = new SqlLexical(reservedWords) + + override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others + + private lazy val cache: Parser[LogicalPlan] = + CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ { + case isLazy ~ tableName ~ plan => + CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined) + } + + private lazy val uncache: Parser[LogicalPlan] = + UNCACHE ~ TABLE ~> ident ^^ { + case tableName => UncacheTableCommand(tableName) + } + + private lazy val set: Parser[LogicalPlan] = + SET ~> restInput ^^ { + case input => SetCommandParser(input) + } + + private lazy val others: Parser[LogicalPlan] = + wholeInput ^^ { + case input => fallback(input) + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ce878c137e..99b6611d3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -259,6 +259,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def numPartitions = self.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case r: RunnableCommand => ExecutedCommand(r) :: Nil + case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct(partial = true, planLater(child))) :: Nil @@ -308,22 +310,4 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => Nil } } - - case object CommandStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case r: RunnableCommand => ExecutedCommand(r) :: Nil - case logical.SetCommand(kv) => - Seq(ExecutedCommand(execution.SetCommand(kv, plan.output))) - case logical.ExplainCommand(logicalPlan, extended) => - Seq(ExecutedCommand( - execution.ExplainCommand(logicalPlan, plan.output, extended))) - case logical.CacheTableCommand(tableName, optPlan, isLazy) => - Seq(ExecutedCommand( - execution.CacheTableCommand(tableName, optPlan, isLazy))) - case logical.UncacheTableCommand(tableName) => - Seq(ExecutedCommand( - execution.UncacheTableCommand(tableName))) - case _ => Nil - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index b8fa4b0199..0d765c4c92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -113,7 +113,7 @@ case class SetCommand( @DeveloperApi case class ExplainCommand( logicalPlan: LogicalPlan, - override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand { + override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand { // Run through the optimizer to generate the physical plan. override def run(sqlContext: SQLContext) = try { |