diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-22 01:31:13 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-22 01:31:13 -0700 |
commit | 284b15d2fbff7c0c3ffe8737838071d366ea5742 (patch) | |
tree | cad712bbf0674f8b44895f2af61098d7f81b57c3 /sql/hive/src/main/scala/org/apache | |
parent | 80127935df06a829b734cafc2447aa1f3df40288 (diff) | |
download | spark-284b15d2fbff7c0c3ffe8737838071d366ea5742.tar.gz spark-284b15d2fbff7c0c3ffe8737838071d366ea5742.tar.bz2 spark-284b15d2fbff7c0c3ffe8737838071d366ea5742.zip |
[SPARK-14826][SQL] Remove HiveQueryExecution
## What changes were proposed in this pull request?
This patch removes HiveQueryExecution. As part of this, I consolidated all the describe commands into DescribeTableCommand.
## How was this patch tested?
Should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #12588 from rxin/SPARK-14826.
Diffstat (limited to 'sql/hive/src/main/scala/org/apache')
5 files changed, 7 insertions, 167 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala deleted file mode 100644 index ed1340dccf..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQueryExecution.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.command.{ExecutedCommand, HiveNativeCommand, SetCommand} -import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand - - -/** - * A [[QueryExecution]] with hive specific features. - */ -protected[hive] class HiveQueryExecution(ctx: SQLContext, logicalPlan: LogicalPlan) - extends QueryExecution(ctx, logicalPlan) { - - /** - * Returns the result as a hive compatible sequence of strings. For native commands, the - * execution is simply passed back to Hive. - */ - def stringResult(): Seq[String] = executedPlan match { - case ExecutedCommand(desc: DescribeHiveTableCommand) => - // If it is a describe command for a Hive table, we want to have the output format - // be similar with Hive. - desc.run(ctx).map { - case Row(name: String, dataType: String, comment) => - Seq(name, dataType, - Option(comment.asInstanceOf[String]).getOrElse("")) - .map(s => String.format(s"%-20s", s)) - .mkString("\t") - } - case command: ExecutedCommand => - command.executeCollect().map(_.getString(0)) - - case other => - val result: Seq[Seq[Any]] = other.executeCollectPublic().map(_.toSeq).toSeq - // We need the types so we can output struct field names - val types = analyzed.output.map(_.dataType) - // Reformat to match hive tab delimited output. - result.map(_.zip(types).map(HiveUtils.toHiveString)).map(_.mkString("\t")).toSeq - } - - override def simpleString: String = - logical match { - case _: HiveNativeCommand => "<Native command: executed by Hive>" - case _: SetCommand => "<SET command: executed by Hive, and noted by SQLContext>" - case _ => super.simpleString - } - -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index d8cc057fe2..b0877823c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -24,11 +24,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} -import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.sql.internal.SessionState /** @@ -50,11 +49,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) */ lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession() - override lazy val conf: SQLConf = new SQLConf { - override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) - } - - /** * SQLConf and HiveConf contracts: * @@ -116,7 +110,6 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) experimentalMethods.extraStrategies ++ Seq( FileSourceStrategy, DataSourceStrategy, - HiveCommandStrategy, HiveDDLStrategy, DDLStrategy, SpecialLimits, @@ -141,16 +134,13 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) // Helper methods, partially leftover from pre-2.0 days // ------------------------------------------------------ - override def executePlan(plan: LogicalPlan): HiveQueryExecution = { - new HiveQueryExecution(ctx, plan) - } - /** * Overrides default Hive configurations to avoid breaking changes to Spark SQL users. * - allow SQL11 keywords to be used as identifiers */ def setDefaultOverrideConfs(): Unit = { setConf(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false") + conf.setConfString("spark.sql.caseSensitive", "false") } override def setConf(key: String, value: String): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 8720e54ed6..5b7fbe0ce5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{DescribeCommand => _, _} -import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, CreateTempTableUsingAsSelect, DescribeCommand} +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, CreateTempTableUsingAsSelect} import org.apache.spark.sql.hive.execution._ private[hive] trait HiveStrategies { @@ -106,13 +106,4 @@ private[hive] trait HiveStrategies { case _ => Nil } } - - case object HiveCommandStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case describe: DescribeCommand => - ExecutedCommand( - DescribeHiveTableCommand(describe.table, describe.output, describe.isExtended)) :: Nil - case _ => Nil - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala deleted file mode 100644 index 8481324086..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.hive.metastore.api.FieldSchema - -import org.apache.spark.sql.{Row, SQLContext} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.command.{DescribeCommand, RunnableCommand} -import org.apache.spark.sql.hive.MetastoreRelation - -/** - * Implementation for "describe [extended] table". - */ -private[hive] -case class DescribeHiveTableCommand( - tableId: TableIdentifier, - override val output: Seq[Attribute], - isExtended: Boolean) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - // There are two modes here: - // For metastore tables, create an output similar to Hive's. - // For other tables, delegate to DescribeCommand. - - // In the future, we will consolidate the two and simply report what the catalog reports. - sqlContext.sessionState.catalog.lookupRelation(tableId) match { - case table: MetastoreRelation => - // Trying to mimic the format of Hive's output. But not exactly the same. - var results: Seq[(String, String, String)] = Nil - - val columns: Seq[FieldSchema] = table.hiveQlTable.getCols.asScala - val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols.asScala - results ++= columns.map(field => (field.getName, field.getType, field.getComment)) - if (partitionColumns.nonEmpty) { - val partColumnInfo = - partitionColumns.map(field => (field.getName, field.getType, field.getComment)) - results ++= - partColumnInfo ++ - Seq(("# Partition Information", "", "")) ++ - Seq((s"# ${output(0).name}", output(1).name, output(2).name)) ++ - partColumnInfo - } - - if (isExtended) { - results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) - } - - results.map { case (name, dataType, comment) => - Row(name, dataType, comment) - } - - case o: LogicalPlan => - DescribeCommand(tableId, output, isExtended).run(sqlContext) - } - } -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 7f8f6292cb..373c0730cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClient @@ -207,7 +208,7 @@ private[hive] class TestHiveSparkSession( protected[hive] implicit class SqlCmd(sql: String) { def cmd: () => Unit = { - () => new TestHiveQueryExecution(sql).stringResult(): Unit + () => new TestHiveQueryExecution(sql).hiveResultString(): Unit } } @@ -462,7 +463,7 @@ private[hive] class TestHiveSparkSession( private[hive] class TestHiveQueryExecution( sparkSession: TestHiveSparkSession, logicalPlan: LogicalPlan) - extends HiveQueryExecution(new SQLContext(sparkSession), logicalPlan) with Logging { + extends QueryExecution(new SQLContext(sparkSession), logicalPlan) with Logging { def this(sparkSession: TestHiveSparkSession, sql: String) { this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql)) |