aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-04-13 14:29:07 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-13 14:29:07 -0700
commit85ee0cabe87a27b6947c2d3e8525f04c77f80f6f (patch)
treef008271b8daef5685c57e9f7c4dcc6bc3e68e6eb /sql
parent1e340c3ae4d5361d048a3d6990f144cfc923666f (diff)
downloadspark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.tar.gz
spark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.tar.bz2
spark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.zip
[SPARK-6130] [SQL] support if not exists for insert overwrite into partition in hiveQl
Standard syntax: INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement; INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;   Hive extension (multiple inserts): FROM from_statement INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...; FROM from_statement INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] ...;   Hive extension (dynamic partition inserts): INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement; INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement; Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #4865 from adrian-wang/insertoverwrite and squashes the following commits: 2fce94f [Daoyuan Wang] add assert 10ea6f3 [Daoyuan Wang] add name for boolean parameter 0bbe9b9 [Daoyuan Wang] fix failure 4391154 [Daoyuan Wang] support if not exists for insert overwrite into partition in hiveQl
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala7
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala31
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b7107740
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a620
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a20
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d250
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad8990
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe30
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d5554570
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d20
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff535
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f80
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a670
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff535
30 files changed, 84 insertions, 35 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index ee04cb579d..bc8d3751f6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -155,7 +155,7 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
protected lazy val insert: Parser[LogicalPlan] =
INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
- case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
+ case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o, false)
}
protected lazy val cte: Parser[LogicalPlan] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 50702ac683..8b68b0df35 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -193,7 +193,7 @@ class Analyzer(
}
realPlan transform {
- case i@InsertIntoTable(u: UnresolvedRelation, _, _, _) =>
+ case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u, cteRelations)))
case u: UnresolvedRelation =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 145f062dd6..21c15ad14f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -293,7 +293,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
- analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)
+ analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite, false)
def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d31a6eecf..17522976dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -125,12 +125,14 @@ case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
- overwrite: Boolean)
+ overwrite: Boolean,
+ ifNotExists: Boolean)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = child.output
+ assert(overwrite || !ifNotExists)
override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall {
case (childAttr, tableAttr) =>
DataType.equalsIgnoreCompatibleNullability(childAttr.dataType, tableAttr.dataType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 9b9adf8550..94ae2d65fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1209,7 +1209,7 @@ class DataFrame private[sql](
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit = {
sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
- Map.empty, logicalPlan, overwrite)).toRdd
+ Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 23f7e56094..5268b73340 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -211,7 +211,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite = false) :: Nil
- case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
+ case logical.InsertIntoTable(
+ table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index e13759b7fe..34d048e426 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -56,7 +56,7 @@ private[sql] object DataSourceStrategy extends Strategy {
execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), part, query, overwrite) if part.isEmpty =>
+ l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case _ => Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index 5a78001117..6ed68d179e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -37,7 +37,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
// We are inserting into an InsertableRelation.
case i @ InsertIntoTable(
- l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite) => {
+ l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite, ifNotExists) => {
// First, make sure the data to be inserted have the same number of fields with the
// schema of the relation.
if (l.output.size != child.output.size) {
@@ -84,7 +84,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
+ l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
// Right now, we do not support insert into a data source table with partition specs.
if (partition.nonEmpty) {
failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
@@ -102,7 +102,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
case i @ logical.InsertIntoTable(
- l: LogicalRelation, partition, query, overwrite) if !l.isInstanceOf[InsertableRelation] =>
+ l: LogicalRelation, partition, query, overwrite, ifNotExists)
+ if !l.isInstanceOf[InsertableRelation] =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 2ae9d018e1..81ee48ef41 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -532,6 +532,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
"inputddl7",
"inputddl8",
"insert1",
+ "insert1_overwrite_partitions",
"insert2_overwrite_partitions",
"insert_compressed",
"join0",
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 3ed5c5b031..f1c0bd92aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -527,7 +527,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Collects all `MetastoreRelation`s which should be replaced
val toBeReplaced = plan.collect {
// Write path
- case InsertIntoTable(relation: MetastoreRelation, _, _, _)
+ case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
@@ -538,7 +538,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
(relation, parquetRelation, attributedRewrites)
// Write path
- case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
+ case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
@@ -569,15 +569,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val alias = r.alias.getOrElse(r.tableName)
Subquery(alias, parquetRelation)
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
+ case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
- case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
+ case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
@@ -698,7 +698,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
+ case p @ InsertIntoTable(table: MetastoreRelation, _, child, _, _) =>
castChildOutput(p, table, child)
}
@@ -715,7 +715,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
.forall { case (left, right) => left.sameType(right) }) {
// If both types ignoring nullability of ArrayType, MapType, StructType are the same,
// use InsertIntoHiveTable instead of InsertIntoTable.
- InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite)
+ InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
@@ -753,7 +753,8 @@ private[hive] case class InsertIntoHiveTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
- overwrite: Boolean)
+ overwrite: Boolean,
+ ifNotExists: Boolean)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index b2ae74efeb..53a204b8c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -1002,7 +1002,27 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)
- InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite)
+ InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, false)
+
+ case Token(destinationToken(),
+ Token("TOK_TAB",
+ tableArgs) ::
+ Token("TOK_IFNOTEXISTS",
+ ifNotExists) :: Nil) =>
+ val Some(tableNameParts) :: partitionClause :: Nil =
+ getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
+
+ val tableIdent = extractTableIdent(tableNameParts)
+
+ val partitionKeys = partitionClause.map(_.getChildren.map {
+ // Parse partitions. We also make keys case insensitive.
+ case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+ cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value))
+ case Token("TOK_PARTVAL", Token(key, Nil) :: Nil) =>
+ cleanIdentifier(key.toLowerCase) -> None
+ }.toMap).getOrElse(Map.empty)
+
+ InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, true)
case a: ASTNode =>
throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5f7e897295..1ccb0c279c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -184,12 +184,14 @@ private[hive] trait HiveStrategies {
object DataSinks extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
+ case logical.InsertIntoTable(
+ table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
execution.InsertIntoHiveTable(
- table, partition, planLater(child), overwrite) :: Nil
- case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) =>
+ table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+ case hive.InsertIntoHiveTable(
+ table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
execution.InsertIntoHiveTable(
- table, partition, planLater(child), overwrite) :: Nil
+ table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index fade9e5852..76a1965f3c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -67,7 +67,7 @@ case class CreateTableAsSelect(
new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
}
} else {
- hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+ hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
}
Seq.empty[Row]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 6c96747439..89995a91b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -45,7 +45,8 @@ case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
child: SparkPlan,
- overwrite: Boolean) extends UnaryNode with HiveInspectors {
+ overwrite: Boolean,
+ ifNotExists: Boolean) extends UnaryNode with HiveInspectors {
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@@ -219,15 +220,25 @@ case class InsertIntoHiveTable(
isSkewedStoreAsSubdir)
}
} else {
- catalog.synchronized {
- catalog.client.loadPartition(
- outputPath,
- qualifiedTableName,
- orderedPartitionSpec,
- overwrite,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
+ // scalastyle:off
+ // ifNotExists is only valid with static partition, refer to
+ // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
+ // scalastyle:on
+ val oldPart = catalog.synchronized {
+ catalog.client.getPartition(
+ catalog.client.getTable(qualifiedTableName), partitionSpec, false)
+ }
+ if (oldPart == null || !ifNotExists) {
+ catalog.synchronized {
+ catalog.client.loadPartition(
+ outputPath,
+ qualifiedTableName,
+ orderedPartitionSpec,
+ overwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ }
}
}
} else {
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53
new file mode 100644
index 0000000000..185a91c110
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53
@@ -0,0 +1,5 @@
+98 val_98
+98 val_98
+97 val_97
+97 val_97
+96 val_96
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53
new file mode 100644
index 0000000000..185a91c110
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53
@@ -0,0 +1,5 @@
+98 val_98
+98 val_98
+97 val_97
+97 val_97
+96 val_96