aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-04-13 14:29:07 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-13 14:29:07 -0700
commit85ee0cabe87a27b6947c2d3e8525f04c77f80f6f (patch)
treef008271b8daef5685c57e9f7c4dcc6bc3e68e6eb /sql/core/src/main
parent1e340c3ae4d5361d048a3d6990f144cfc923666f (diff)
downloadspark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.tar.gz
spark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.tar.bz2
spark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.zip
[SPARK-6130] [SQL] support if not exists for insert overwrite into partition in hiveQl
Standard syntax: INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement; INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;   Hive extension (multiple inserts): FROM from_statement INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...; FROM from_statement INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] ...;   Hive extension (dynamic partition inserts): INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement; INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement; Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #4865 from adrian-wang/insertoverwrite and squashes the following commits: 2fce94f [Daoyuan Wang] add assert 10ea6f3 [Daoyuan Wang] add name for boolean parameter 0bbe9b9 [Daoyuan Wang] fix failure 4391154 [Daoyuan Wang] support if not exists for insert overwrite into partition in hiveQl
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala7
4 files changed, 8 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 9b9adf8550..94ae2d65fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1209,7 +1209,7 @@ class DataFrame private[sql](
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit = {
sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
- Map.empty, logicalPlan, overwrite)).toRdd
+ Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 23f7e56094..5268b73340 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -211,7 +211,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil
- case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
+ case logical.InsertIntoTable(
+ table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index e13759b7fe..34d048e426 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -56,7 +56,7 @@ private[sql] object DataSourceStrategy extends Strategy {
execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), part, query, overwrite) if part.isEmpty =>
+ l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index 5a78001117..6ed68d179e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
// We are inserting into an InsertableRelation.
case i @ InsertIntoTable(
- l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite) => {
+ l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite, ifNotExists) => {
// First, make sure the data to be inserted have the same number of fields with the
// schema of the relation.
if (l.output.size != child.output.size) {
@@ -84,7 +84,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
+ l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
// Right now, we do not support insert into a data source table with partition specs.
if (partition.nonEmpty) {
failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
@@ -102,7 +102,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
case i @ logical.InsertIntoTable(
- l: LogicalRelation, partition, query, overwrite) if !l.isInstanceOf[InsertableRelation] =>
+ l: LogicalRelation, partition, query, overwrite, ifNotExists)
+ if !l.isInstanceOf[InsertableRelation] =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")