aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-02-16 15:51:59 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-16 15:51:59 -0800
commit5b6cd65cd611b1a46a7d5eb33139c6224b96264e (patch)
treee9f19e5498290e0174cf55ad072314a9b665f3a1 /sql/hive
parent04b401da811e62a4365cf39ea95cadd0e737001c (diff)
downloadspark-5b6cd65cd611b1a46a7d5eb33139c6224b96264e.tar.gz
spark-5b6cd65cd611b1a46a7d5eb33139c6224b96264e.tar.bz2
spark-5b6cd65cd611b1a46a7d5eb33139c6224b96264e.zip
[SPARK-5746][SQL] Check invalid cases for the write path of data source API
JIRA: https://issues.apache.org/jira/browse/SPARK-5746 liancheng marmbrus Author: Yin Huai <yhuai@databricks.com> Closes #4617 from yhuai/insertOverwrite and squashes the following commits: 8e3019d [Yin Huai] Fix compilation error. 499e8e7 [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertOverwrite e76e85a [Yin Huai] Address comments. ac31b3c [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertOverwrite f30bdad [Yin Huai] Use toDF. 99da57e [Yin Huai] Merge remote-tracking branch 'upstream/master' into insertOverwrite 6b7545c [Yin Huai] Add a pre write check to the data source API. a88c516 [Yin Huai] DDLParser will take a parsering function to take care CTAS statements.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala8
3 files changed, 10 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 87b380f950..6c55bc6be1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, Ov
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, QueryExecutionException, SetCommand}
import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
import org.apache.spark.sql.types._
/**
@@ -64,14 +64,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
+ @transient
+ protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_))
+
override def sql(sqlText: String): DataFrame = {
val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
if (conf.dialect == "sql") {
super.sql(substituted)
} else if (conf.dialect == "hiveql") {
- DataFrame(this,
- ddlParser(sqlText, exceptionOnError = false).getOrElse(HiveQl.parseSql(substituted)))
+ val ddlPlan = ddlParserWithHiveQL(sqlText, exceptionOnError = false)
+ DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted)))
} else {
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
}
@@ -241,12 +244,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
@transient
override protected[sql] lazy val analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false) {
- override val extendedRules =
+ override val extendedResolutionRules =
catalog.ParquetConversions ::
catalog.CreateTables ::
catalog.PreInsertionCasts ::
ExtractPythonUdfs ::
ResolveUdtfsAlias ::
+ sources.PreWriteCheck(catalog) ::
sources.PreInsertCastAndRename ::
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 580c5706dd..72211fe2e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -663,7 +663,7 @@ private[hive] case class MetastoreRelation
}
object HiveMetastoreTypes {
- protected val ddlParser = new DDLParser
+ protected val ddlParser = new DDLParser(HiveQl.parseSql(_))
def toDataType(metastoreType: String): DataType = synchronized {
ddlParser.parseType(metastoreType)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 965d159656..d2c39ab621 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing}
+import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, CreateTableUsing}
import org.apache.spark.sql.types.StringType
@@ -227,12 +227,6 @@ private[hive] trait HiveStrategies {
tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil
case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) =>
- val logicalPlan = hiveContext.parseSql(query)
- val cmd =
- CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, logicalPlan)
- ExecutedCommand(cmd) :: Nil
-
- case CreateTableUsingAsLogicalPlan(tableName, provider, false, mode, opts, query) =>
val cmd =
CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query)
ExecutedCommand(cmd) :: Nil