From 3edcc40223c8af12f64c2286420dc77817ab770e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Mar 2016 15:24:38 -0800 Subject: [SPARK-13632][SQL] Move commands.scala to command package ## What changes were proposed in this pull request? This patch simply moves things to a new package in an effort to reduce the size of the diff in #11048. Currently the new package only has one file, but in the future we'll add many new commands in SPARK-13139. ## How was this patch tested? Jenkins. Author: Andrew Or Closes #11482 from andrewor14/commands-package. --- .../scala/org/apache/spark/sql/DataFrame.scala | 3 +- .../scala/org/apache/spark/sql/SQLContext.scala | 1 + .../org/apache/spark/sql/execution/SparkQl.scala | 1 + .../spark/sql/execution/SparkStrategies.scala | 2 +- .../spark/sql/execution/command/commands.scala | 423 +++++++++++++++++++++ .../org/apache/spark/sql/execution/commands.scala | 421 -------------------- .../execution/datasources/DataSourceStrategy.scala | 5 +- .../datasources/InsertIntoDataSource.scala | 2 +- .../datasources/InsertIntoHadoopFsRelation.scala | 3 +- .../spark/sql/execution/datasources/ddl.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../SparkExecuteStatementOperation.scala | 2 +- .../org/apache/spark/sql/hive/HiveContext.scala | 1 + .../org/apache/spark/sql/hive/HiveStrategies.scala | 3 +- .../sql/hive/execution/CreateTableAsSelect.scala | 2 +- .../sql/hive/execution/CreateViewAsSelect.scala | 2 +- .../hive/execution/DescribeHiveTableCommand.scala | 2 +- .../sql/hive/execution/HiveNativeCommand.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 2 +- .../org/apache/spark/sql/hive/test/TestHive.scala | 2 +- .../sql/hive/execution/HiveComparisonTest.scala | 2 +- .../org/apache/spark/sql/hive/parquetSuites.scala | 3 +- 22 files changed, 449 insertions(+), 439 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 844bc0fdd1..339e61e572 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -36,8 +36,9 @@ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression -import org.apache.spark.sql.execution.{ExplainCommand, FileRelation, LogicalRDD, Queryable, +import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index cb4a6397b2..39dad16e40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.{SessionState, SQLConf} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index 8c41d80bf1..bc690f6634 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.parser.{ASTNode, ParserConf, SimpleParserConf} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0255103b63..debd04aa95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution -import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation} +import org.apache.spark.sql.execution.command.{DescribeCommand => RunnableDescribeCommand, _} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.internal.SQLConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala new file mode 100644 index 0000000000..877456ca48 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import java.util.NoSuchElementException + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + + +/** + * A logical command that is executed for its side-effects. `RunnableCommand`s are + * wrapped in `ExecutedCommand` during execution. + */ +private[sql] trait RunnableCommand extends LogicalPlan with logical.Command { + override def output: Seq[Attribute] = Seq.empty + override def children: Seq[LogicalPlan] = Seq.empty + def run(sqlContext: SQLContext): Seq[Row] +} + +/** + * A physical operator that executes the run method of a `RunnableCommand` and + * saves the result to prevent multiple executions. + */ +private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { + /** + * A concrete command should override this lazy field to wrap up any side effects caused by the + * command or any other computation that should be evaluated exactly once. The value of this field + * can be used as the contents of the corresponding RDD generated from the physical plan of this + * command. + * + * The `execute()` method of all the physical command classes should reference `sideEffectResult` + * so that the command can be executed eagerly right after the command query is created. + */ + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val converter = CatalystTypeConverters.createToCatalystConverter(schema) + cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow]) + } + + override def output: Seq[Attribute] = cmd.output + + override def children: Seq[SparkPlan] = Nil + + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + + override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray + + protected override def doExecute(): RDD[InternalRow] = { + sqlContext.sparkContext.parallelize(sideEffectResult, 1) + } + + override def argString: String = cmd.toString +} + + +case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging { + + private def keyValueOutput: Seq[Attribute] = { + val schema = StructType( + StructField("key", StringType, false) :: + StructField("value", StringType, false) :: Nil) + schema.toAttributes + } + + private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match { + // Configures the deprecated "mapred.reduce.tasks" property. + case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + if (value.toInt < 1) { + val msg = + s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + + "determining the number of reducers is not supported." + throw new IllegalArgumentException(msg) + } else { + sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value) + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) + } + } + (keyValueOutput, runFunc) + + case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " + + s"External sort will continue to be used.") + Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true")) + } + (keyValueOutput, runFunc) + + case Some((SQLConf.Deprecated.USE_SQL_AGGREGATE2, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " + + s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " + + s"continue to be true.") + Seq(Row(SQLConf.Deprecated.USE_SQL_AGGREGATE2, "true")) + } + (keyValueOutput, runFunc) + + case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " + + s"will be ignored. Tungsten will continue to be used.") + Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true")) + } + (keyValueOutput, runFunc) + + case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " + + s"will be ignored. Codegen will continue to be used.") + Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true")) + } + (keyValueOutput, runFunc) + + case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " + + s"will be ignored. Unsafe mode will continue to be used.") + Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true")) + } + (keyValueOutput, runFunc) + + case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " + + s"will be ignored. Sort merge join will continue to be used.") + Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true")) + } + (keyValueOutput, runFunc) + + // Configures a single property. + case Some((key, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.setConf(key, value) + Seq(Row(key, value)) + } + (keyValueOutput, runFunc) + + // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) + // Queries all key-value pairs that are set in the SQLConf of the sqlContext. + case None => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq + } + (keyValueOutput, runFunc) + + // Queries all properties along with their default values and docs that are defined in the + // SQLConf of the sqlContext. + case Some(("-v", None)) => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) => + Row(key, defaultValue, doc) + } + } + val schema = StructType( + StructField("key", StringType, false) :: + StructField("default", StringType, false) :: + StructField("meaning", StringType, false) :: Nil) + (schema.toAttributes, runFunc) + + // Queries the deprecated "mapred.reduce.tasks" property. + case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + + s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString)) + } + (keyValueOutput, runFunc) + + // Queries a single property. + case Some((key, None)) => + val runFunc = (sqlContext: SQLContext) => { + val value = + try sqlContext.getConf(key) catch { + case _: NoSuchElementException => "" + } + Seq(Row(key, value)) + } + (keyValueOutput, runFunc) + } + + override val output: Seq[Attribute] = _output + + override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext) + +} + +/** + * An explain command for users to see how a command will be executed. + * + * Note that this command takes in a logical plan, runs the optimizer on the logical plan + * (but do NOT actually execute it). + */ +case class ExplainCommand( + logicalPlan: LogicalPlan, + override val output: Seq[Attribute] = + Seq(AttributeReference("plan", StringType, nullable = true)()), + extended: Boolean = false) + extends RunnableCommand { + + // Run through the optimizer to generate the physical plan. + override def run(sqlContext: SQLContext): Seq[Row] = try { + // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. + val queryExecution = sqlContext.executePlan(logicalPlan) + val outputString = if (extended) queryExecution.toString else queryExecution.simpleString + + outputString.split("\n").map(Row(_)) + } catch { case cause: TreeNodeException[_] => + ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) + } +} + + +case class CacheTableCommand( + tableName: String, + plan: Option[LogicalPlan], + isLazy: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + plan.foreach { logicalPlan => + sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName) + } + sqlContext.cacheTable(tableName) + + if (!isLazy) { + // Performs eager caching + sqlContext.table(tableName).count() + } + + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + + +case class UncacheTableCommand(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.table(tableName).unpersist(blocking = false) + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + +/** + * Clear all cached data from the in-memory cache. + */ +case object ClearCacheCommand extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.clearCache() + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + + +case class DescribeCommand( + child: SparkPlan, + override val output: Seq[Attribute], + isExtended: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + child.schema.fields.map { field => + val cmtKey = "comment" + val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" + Row(field.name, field.dataType.simpleString, comment) + } + } +} + +/** + * A command for users to get tables in the given database. + * If a databaseName is not given, the current database will be used. + * The syntax of using this command in SQL is: + * {{{ + * SHOW TABLES [IN databaseName] + * }}} + */ +case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand { + + // The result of SHOW TABLES has two columns, tableName and isTemporary. + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("tableName", StringType, false) :: + StructField("isTemporary", BooleanType, false) :: Nil) + + schema.toAttributes + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + // Since we need to return a Seq of rows, we will call getTables directly + // instead of calling tables in sqlContext. + val rows = sqlContext.catalog.getTables(databaseName).map { + case (tableName, isTemporary) => Row(tableName, isTemporary) + } + + rows + } +} + +/** + * A command for users to list all of the registered functions. + * The syntax of using this command in SQL is: + * {{{ + * SHOW FUNCTIONS + * }}} + * TODO currently we are simply ignore the db + */ +case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand { + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("function", StringType, nullable = false) :: Nil) + + schema.toAttributes + } + + override def run(sqlContext: SQLContext): Seq[Row] = pattern match { + case Some(p) => + try { + val regex = java.util.regex.Pattern.compile(p) + sqlContext.functionRegistry.listFunction().filter(regex.matcher(_).matches()).map(Row(_)) + } catch { + // probably will failed in the regex that user provided, then returns empty row. + case _: Throwable => Seq.empty[Row] + } + case None => + sqlContext.functionRegistry.listFunction().map(Row(_)) + } +} + +/** + * A command for users to get the usage of a registered function. + * The syntax of using this command in SQL is + * {{{ + * DESCRIBE FUNCTION [EXTENDED] upper; + * }}} + */ +case class DescribeFunction( + functionName: String, + isExtended: Boolean) extends RunnableCommand { + + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("function_desc", StringType, nullable = false) :: Nil) + + schema.toAttributes + } + + private def replaceFunctionName(usage: String, functionName: String): String = { + if (usage == null) { + "To be added." + } else { + usage.replaceAll("_FUNC_", functionName) + } + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.functionRegistry.lookupFunction(functionName) match { + case Some(info) => + val result = + Row(s"Function: ${info.getName}") :: + Row(s"Class: ${info.getClassName}") :: + Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil + + if (isExtended) { + result :+ Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}") + } else { + result + } + + case None => Seq(Row(s"Function: $functionName not found.")) + } + } +} + +case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.catalog.setCurrentDatabase(databaseName) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala deleted file mode 100644 index 5574645741..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ /dev/null @@ -1,421 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import java.util.NoSuchElementException - -import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types._ - -/** - * A logical command that is executed for its side-effects. `RunnableCommand`s are - * wrapped in `ExecutedCommand` during execution. - */ -private[sql] trait RunnableCommand extends LogicalPlan with logical.Command { - override def output: Seq[Attribute] = Seq.empty - override def children: Seq[LogicalPlan] = Seq.empty - def run(sqlContext: SQLContext): Seq[Row] -} - -/** - * A physical operator that executes the run method of a `RunnableCommand` and - * saves the result to prevent multiple executions. - */ -private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan { - /** - * A concrete command should override this lazy field to wrap up any side effects caused by the - * command or any other computation that should be evaluated exactly once. The value of this field - * can be used as the contents of the corresponding RDD generated from the physical plan of this - * command. - * - * The `execute()` method of all the physical command classes should reference `sideEffectResult` - * so that the command can be executed eagerly right after the command query is created. - */ - protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { - val converter = CatalystTypeConverters.createToCatalystConverter(schema) - cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow]) - } - - override def output: Seq[Attribute] = cmd.output - - override def children: Seq[SparkPlan] = Nil - - override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray - - override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray - - protected override def doExecute(): RDD[InternalRow] = { - sqlContext.sparkContext.parallelize(sideEffectResult, 1) - } - - override def argString: String = cmd.toString -} - - -case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging { - - private def keyValueOutput: Seq[Attribute] = { - val schema = StructType( - StructField("key", StringType, false) :: - StructField("value", StringType, false) :: Nil) - schema.toAttributes - } - - private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match { - // Configures the deprecated "mapred.reduce.tasks" property. - case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") - if (value.toInt < 1) { - val msg = - s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + - "determining the number of reducers is not supported." - throw new IllegalArgumentException(msg) - } else { - sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value) - Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) - } - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " + - s"External sort will continue to be used.") - Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.USE_SQL_AGGREGATE2, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " + - s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " + - s"continue to be true.") - Seq(Row(SQLConf.Deprecated.USE_SQL_AGGREGATE2, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " + - s"will be ignored. Tungsten will continue to be used.") - Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " + - s"will be ignored. Codegen will continue to be used.") - Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " + - s"will be ignored. Unsafe mode will continue to be used.") - Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " + - s"will be ignored. Sort merge join will continue to be used.") - Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true")) - } - (keyValueOutput, runFunc) - - // Configures a single property. - case Some((key, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - sqlContext.setConf(key, value) - Seq(Row(key, value)) - } - (keyValueOutput, runFunc) - - // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) - // Queries all key-value pairs that are set in the SQLConf of the sqlContext. - case None => - val runFunc = (sqlContext: SQLContext) => { - sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq - } - (keyValueOutput, runFunc) - - // Queries all properties along with their default values and docs that are defined in the - // SQLConf of the sqlContext. - case Some(("-v", None)) => - val runFunc = (sqlContext: SQLContext) => { - sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) => - Row(key, defaultValue, doc) - } - } - val schema = StructType( - StructField("key", StringType, false) :: - StructField("default", StringType, false) :: - StructField("meaning", StringType, false) :: Nil) - (schema.toAttributes, runFunc) - - // Queries the deprecated "mapred.reduce.tasks" property. - case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") - Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString)) - } - (keyValueOutput, runFunc) - - // Queries a single property. - case Some((key, None)) => - val runFunc = (sqlContext: SQLContext) => { - val value = - try sqlContext.getConf(key) catch { - case _: NoSuchElementException => "" - } - Seq(Row(key, value)) - } - (keyValueOutput, runFunc) - } - - override val output: Seq[Attribute] = _output - - override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext) - -} - -/** - * An explain command for users to see how a command will be executed. - * - * Note that this command takes in a logical plan, runs the optimizer on the logical plan - * (but do NOT actually execute it). - */ -case class ExplainCommand( - logicalPlan: LogicalPlan, - override val output: Seq[Attribute] = - Seq(AttributeReference("plan", StringType, nullable = true)()), - extended: Boolean = false) - extends RunnableCommand { - - // Run through the optimizer to generate the physical plan. - override def run(sqlContext: SQLContext): Seq[Row] = try { - // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. - val queryExecution = sqlContext.executePlan(logicalPlan) - val outputString = if (extended) queryExecution.toString else queryExecution.simpleString - - outputString.split("\n").map(Row(_)) - } catch { case cause: TreeNodeException[_] => - ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) - } -} - - -case class CacheTableCommand( - tableName: String, - plan: Option[LogicalPlan], - isLazy: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - plan.foreach { logicalPlan => - sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName) - } - sqlContext.cacheTable(tableName) - - if (!isLazy) { - // Performs eager caching - sqlContext.table(tableName).count() - } - - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - - -case class UncacheTableCommand(tableName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.table(tableName).unpersist(blocking = false) - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - -/** - * Clear all cached data from the in-memory cache. - */ -case object ClearCacheCommand extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.clearCache() - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - - -case class DescribeCommand( - child: SparkPlan, - override val output: Seq[Attribute], - isExtended: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - child.schema.fields.map { field => - val cmtKey = "comment" - val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" - Row(field.name, field.dataType.simpleString, comment) - } - } -} - -/** - * A command for users to get tables in the given database. - * If a databaseName is not given, the current database will be used. - * The syntax of using this command in SQL is: - * {{{ - * SHOW TABLES [IN databaseName] - * }}} - */ -case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand { - - // The result of SHOW TABLES has two columns, tableName and isTemporary. - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("tableName", StringType, false) :: - StructField("isTemporary", BooleanType, false) :: Nil) - - schema.toAttributes - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - // Since we need to return a Seq of rows, we will call getTables directly - // instead of calling tables in sqlContext. - val rows = sqlContext.catalog.getTables(databaseName).map { - case (tableName, isTemporary) => Row(tableName, isTemporary) - } - - rows - } -} - -/** - * A command for users to list all of the registered functions. - * The syntax of using this command in SQL is: - * {{{ - * SHOW FUNCTIONS - * }}} - * TODO currently we are simply ignore the db - */ -case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand { - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("function", StringType, nullable = false) :: Nil) - - schema.toAttributes - } - - override def run(sqlContext: SQLContext): Seq[Row] = pattern match { - case Some(p) => - try { - val regex = java.util.regex.Pattern.compile(p) - sqlContext.functionRegistry.listFunction().filter(regex.matcher(_).matches()).map(Row(_)) - } catch { - // probably will failed in the regex that user provided, then returns empty row. - case _: Throwable => Seq.empty[Row] - } - case None => - sqlContext.functionRegistry.listFunction().map(Row(_)) - } -} - -/** - * A command for users to get the usage of a registered function. - * The syntax of using this command in SQL is - * {{{ - * DESCRIBE FUNCTION [EXTENDED] upper; - * }}} - */ -case class DescribeFunction( - functionName: String, - isExtended: Boolean) extends RunnableCommand { - - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("function_desc", StringType, nullable = false) :: Nil) - - schema.toAttributes - } - - private def replaceFunctionName(usage: String, functionName: String): String = { - if (usage == null) { - "To be added." - } else { - usage.replaceAll("_FUNC_", functionName) - } - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.functionRegistry.lookupFunction(functionName) match { - case Some(info) => - val result = - Row(s"Function: ${info.getName}") :: - Row(s"Class: ${info.getClassName}") :: - Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil - - if (isExtended) { - result :+ Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}") - } else { - result - } - - case None => Seq(Row(s"Function: $functionName not found.")) - } - } -} - -case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.catalog.setCurrentDatabase(databaseName) - Seq.empty[Row] - } - - override val output: Seq[Attribute] = Seq.empty -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c24967abeb..ceb35107bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @@ -146,12 +147,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), part, query, overwrite, false) if part.isEmpty => - execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil + ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil case i @ logical.InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil + ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala index 3b7dc2e8d0..abb1628590 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.sources.InsertableRelation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index c8b020d55a..d4cc20b06f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -28,7 +28,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution} +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index a141b58d3d..fb9618804d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.types._ /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 30a5e2ea4a..4358c7c76d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -24,10 +24,10 @@ import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} -import org.apache.spark.sql.execution.ExplainCommand import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD import org.apache.spark.sql.sources._ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 458d4f2c3c..d0d94da78f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -34,7 +34,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow} -import org.apache.spark.sql.execution.SetCommand +import org.apache.spark.sql.execution.command.SetCommand import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index a9295d31c0..69669d79be 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.{ExecutedCommand, SetCommand} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 8883e370dc..a19dc21638 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -22,7 +22,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.{DescribeCommand => RunnableDescribeCommand, _} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand} import org.apache.spark.sql.hive.execution._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 3f81c99c41..44f579fbb7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation} /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 2914d03749..83d057f7e4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.{ HiveContext, HiveMetastoreTypes, SQLBuilder} /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index dfa5a982b1..57293fce97 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.MetastoreRelation /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala index 381fb61160..9bb971992d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveNativeCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.StringType diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 246b52a3b0..cc32548112 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{BucketSpec, LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.sources._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index a7eca46d19..4633a09c7e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.CacheTableCommand +import org.apache.spark.sql.execution.command.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveNativeCommand diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index af4c44e578..afb816211e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.{ExplainCommand, SetCommand} +import org.apache.spark.sql.execution.command.{ExplainCommand, SetCommand} import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable, SQLBuilder} import org.apache.spark.sql.hive.test.TestHive diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index d81c566822..e5077376a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql._ -import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} +import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.command.ExecutedCommand import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.hive.execution.HiveTableScan -- cgit v1.2.3