diff options
author | Yin Huai <yhuai@databricks.com> | 2015-02-16 15:51:59 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-16 15:51:59 -0800 |
commit | 5b6cd65cd611b1a46a7d5eb33139c6224b96264e (patch) | |
tree | e9f19e5498290e0174cf55ad072314a9b665f3a1 /sql/hive/src/main | |
parent | 04b401da811e62a4365cf39ea95cadd0e737001c (diff) | |
download | spark-5b6cd65cd611b1a46a7d5eb33139c6224b96264e.tar.gz spark-5b6cd65cd611b1a46a7d5eb33139c6224b96264e.tar.bz2 spark-5b6cd65cd611b1a46a7d5eb33139c6224b96264e.zip |
[SPARK-5746][SQL] Check invalid cases for the write path of data source API
JIRA: https://issues.apache.org/jira/browse/SPARK-5746
liancheng marmbrus
Author: Yin Huai <yhuai@databricks.com>
Closes #4617 from yhuai/insertOverwrite and squashes the following commits:
8e3019d [Yin Huai] Fix compilation error.
499e8e7 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertOverwrite
e76e85a [Yin Huai] Address comments.
ac31b3c [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertOverwrite
f30bdad [Yin Huai] Use toDF.
99da57e [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertOverwrite
6b7545c [Yin Huai] Add a pre write check to the data source API.
a88c516 [Yin Huai] DDLParser will take a parsering function to take care CTAS statements.
Diffstat (limited to 'sql/hive/src/main')
3 files changed, 10 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 87b380f950..6c55bc6be1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, Ov import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand} import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.sources.DataSourceStrategy +import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy} import org.apache.spark.sql.types._ /** @@ -64,14 +64,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) + @transient + protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_)) + override def sql(sqlText: String): DataFrame = { val substituted = new VariableSubstitution().substitute(hiveconf, sqlText) // TODO: Create a framework for registering parsers instead of just hardcoding if statements. if (conf.dialect == "sql") { super.sql(substituted) } else if (conf.dialect == "hiveql") { - DataFrame(this, - ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted))) + val ddlPlan = ddlParserWithHiveQL(sqlText, exceptionOnError = false) + DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted))) } else { sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") } @@ -241,12 +244,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient override protected[sql] lazy val analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = false) { - override val extendedRules = + override val extendedResolutionRules = catalog.ParquetConversions :: catalog.CreateTables :: catalog.PreInsertionCasts :: ExtractPythonUdfs :: ResolveUdtfsAlias :: + sources.PreWriteCheck(catalog) :: sources.PreInsertCastAndRename :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 580c5706dd..72211fe2e4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -663,7 +663,7 @@ private[hive] case class MetastoreRelation } object HiveMetastoreTypes { - protected val ddlParser = new DDLParser + protected val ddlParser = new DDLParser(HiveQl.parseSql(_)) def toDataType(metastoreType: String): DataType = synchronized { ddlParser.parseType(metastoreType) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 965d159656..d2c39ab621 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing} +import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing} import org.apache.spark.sql.types.StringType @@ -227,12 +227,6 @@ private[hive] trait HiveStrategies { tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) => - val logicalPlan = hiveContext.parseSql(query) - val cmd = - CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, logicalPlan) - ExecutedCommand(cmd) :: Nil - - case CreateTableUsingAsLogicalPlan(tableName, provider, false, mode, opts, query) => val cmd = CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query) ExecutedCommand(cmd) :: Nil |