aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorscwf <wangfei1@huawei.com>2015-01-10 14:08:04 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-10 14:08:04 -0800
commitb3e86dc62476abb03b330f86a788aa19a6565317 (patch)
tree9a3f25d6e6e4bd92d8c14a711d9bd0c1d57f38ac /sql/core
parent693a323a70aba91e6c100dd5561d218a75b7895e (diff)
downloadspark-b3e86dc62476abb03b330f86a788aa19a6565317.tar.gz
spark-b3e86dc62476abb03b330f86a788aa19a6565317.tar.bz2
spark-b3e86dc62476abb03b330f86a788aa19a6565317.zip
[SPARK-4861][SQL] Refactory command in spark sql
Follow up for #3712. This PR finally remove ```CommandStrategy``` and make all commands follow ```RunnableCommand``` so they can go with ```case r: RunnableCommand => ExecutedCommand(r) :: Nil```. One exception is the ```DescribeCommand``` of hive, which is a special case and need to distinguish hive table and temporary table, so still keep ```HiveCommandStrategy``` here. Author: scwf <wangfei1@huawei.com> Closes #3948 from scwf/followup-SPARK-4861 and squashes the following commits: 6b48e64 [scwf] minor style fix 2c62e9d [scwf] fix for hive module 5a7a819 [scwf] Refactory command in spark sql
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala97
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala2
4 files changed, 101 insertions, 21 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9962937277..6c575dd727 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -76,7 +76,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] val sqlParser = {
val fallback = new catalyst.SqlParser
- new catalyst.SparkSQLParser(fallback(_))
+ new SparkSQLParser(fallback(_))
}
protected[sql] def parseSql(sql: String): LogicalPlan = {
@@ -329,7 +329,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
def strategies: Seq[Strategy] =
extraStrategies ++ (
- CommandStrategy ::
DataSourceStrategy ::
TakeOrdered ::
HashAggregation ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
new file mode 100644
index 0000000000..65358b7d4e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.{SqlLexical, AbstractSparkSQLParser}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{UncacheTableCommand, CacheTableCommand, SetCommand}
+
+import scala.util.parsing.combinator.RegexParsers
+
+/**
+ * The top level Spark SQL parser. This parser recognizes syntaxes that are available for all SQL
+ * dialects supported by Spark SQL, and delegates all the other syntaxes to the `fallback` parser.
+ *
+ * @param fallback A function that parses an input string to a logical plan
+ */
+private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends AbstractSparkSQLParser {
+
+ // A parser for the key-value part of the "SET [key = [value ]]" syntax
+ private object SetCommandParser extends RegexParsers {
+ private val key: Parser[String] = "(?m)[^=]+".r
+
+ private val value: Parser[String] = "(?m).*$".r
+
+ private val output: Seq[Attribute] = Seq(AttributeReference("", StringType, nullable = false)())
+
+ private val pair: Parser[LogicalPlan] =
+ (key ~ ("=".r ~> value).?).? ^^ {
+ case None => SetCommand(None, output)
+ case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
+ }
+
+ def apply(input: String): LogicalPlan = parseAll(pair, input) match {
+ case Success(plan, _) => plan
+ case x => sys.error(x.toString)
+ }
+ }
+
+ protected val AS = Keyword("AS")
+ protected val CACHE = Keyword("CACHE")
+ protected val LAZY = Keyword("LAZY")
+ protected val SET = Keyword("SET")
+ protected val TABLE = Keyword("TABLE")
+ protected val UNCACHE = Keyword("UNCACHE")
+
+ protected implicit def asParser(k: Keyword): Parser[String] =
+ lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
+
+ private val reservedWords: Seq[String] =
+ this
+ .getClass
+ .getMethods
+ .filter(_.getReturnType == classOf[Keyword])
+ .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+ override val lexical = new SqlLexical(reservedWords)
+
+ override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | others
+
+ private lazy val cache: Parser[LogicalPlan] =
+ CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
+ case isLazy ~ tableName ~ plan =>
+ CacheTableCommand(tableName, plan.map(fallback), isLazy.isDefined)
+ }
+
+ private lazy val uncache: Parser[LogicalPlan] =
+ UNCACHE ~ TABLE ~> ident ^^ {
+ case tableName => UncacheTableCommand(tableName)
+ }
+
+ private lazy val set: Parser[LogicalPlan] =
+ SET ~> restInput ^^ {
+ case input => SetCommandParser(input)
+ }
+
+ private lazy val others: Parser[LogicalPlan] =
+ wholeInput ^^ {
+ case input => fallback(input)
+ }
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index ce878c137e..99b6611d3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -259,6 +259,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def numPartitions = self.numPartitions
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case r: RunnableCommand => ExecutedCommand(r) :: Nil
+
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil
@@ -308,22 +310,4 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case _ => Nil
}
}
-
- case object CommandStrategy extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case r: RunnableCommand => ExecutedCommand(r) :: Nil
- case logical.SetCommand(kv) =>
- Seq(ExecutedCommand(execution.SetCommand(kv, plan.output)))
- case logical.ExplainCommand(logicalPlan, extended) =>
- Seq(ExecutedCommand(
- execution.ExplainCommand(logicalPlan, plan.output, extended)))
- case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
- Seq(ExecutedCommand(
- execution.CacheTableCommand(tableName, optPlan, isLazy)))
- case logical.UncacheTableCommand(tableName) =>
- Seq(ExecutedCommand(
- execution.UncacheTableCommand(tableName)))
- case _ => Nil
- }
- }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index b8fa4b0199..0d765c4c92 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -113,7 +113,7 @@ case class SetCommand(
@DeveloperApi
case class ExplainCommand(
logicalPlan: LogicalPlan,
- override val output: Seq[Attribute], extended: Boolean) extends RunnableCommand {
+ override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand {
// Run through the optimizer to generate the physical plan.
override def run(sqlContext: SQLContext) = try {