aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-04-13 14:29:07 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-13 14:29:07 -0700
commit85ee0cabe87a27b6947c2d3e8525f04c77f80f6f (patch)
treef008271b8daef5685c57e9f7c4dcc6bc3e68e6eb /sql/hive/src
parent1e340c3ae4d5361d048a3d6990f144cfc923666f (diff)
downloadspark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.tar.gz
spark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.tar.bz2
spark-85ee0cabe87a27b6947c2d3e8525f04c77f80f6f.zip
[SPARK-6130] [SQL] support if not exists for insert overwrite into partition in hiveQl
Standard syntax: INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement; INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;   Hive extension (multiple inserts): FROM from_statement INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...; FROM from_statement INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] ...;   Hive extension (dynamic partition inserts): INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement; INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement; Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #4865 from adrian-wang/insertoverwrite and squashes the following commits: 2fce94f [Daoyuan Wang] add assert 10ea6f3 [Daoyuan Wang] add name for boolean parameter 0bbe9b9 [Daoyuan Wang] fix failure 4391154 [Daoyuan Wang] support if not exists for insert overwrite into partition in hiveQl
Diffstat (limited to 'sql/hive/src')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala22
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala31
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b7107740
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a620
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a20
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d250
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad8990
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe30
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d5554570
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee0
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d20
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff535
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f80
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a670
-rw-r--r--sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff535
21 files changed, 69 insertions, 25 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 3ed5c5b031..f1c0bd92aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -527,7 +527,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Collects all `MetastoreRelation`s which should be replaced
val toBeReplaced = plan.collect {
// Write path
- case InsertIntoTable(relation: MetastoreRelation, _, _, _)
+ case InsertIntoTable(relation: MetastoreRelation, _, _, _, _)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
@@ -538,7 +538,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
(relation, parquetRelation, attributedRewrites)
// Write path
- case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
+ case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _, _)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
@@ -569,15 +569,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val alias = r.alias.getOrElse(r.tableName)
Subquery(alias, parquetRelation)
- case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
+ case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
- case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
+ case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists)
if relationMap.contains(r) =>
val parquetRelation = relationMap(r)
- InsertIntoTable(parquetRelation, partition, child, overwrite)
+ InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists)
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
@@ -698,7 +698,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p
- case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
+ case p @ InsertIntoTable(table: MetastoreRelation, _, child, _, _) =>
castChildOutput(p, table, child)
}
@@ -715,7 +715,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
.forall { case (left, right) => left.sameType(right) }) {
// If both types ignoring nullability of ArrayType, MapType, StructType are the same,
// use InsertIntoHiveTable instead of InsertIntoTable.
- InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite)
+ InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else {
// Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map {
@@ -753,7 +753,8 @@ private[hive] case class InsertIntoHiveTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
- overwrite: Boolean)
+ overwrite: Boolean,
+ ifNotExists: Boolean)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index b2ae74efeb..53a204b8c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -1002,7 +1002,27 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)
- InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite)
+ InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, false)
+
+ case Token(destinationToken(),
+ Token("TOK_TAB",
+ tableArgs) ::
+ Token("TOK_IFNOTEXISTS",
+ ifNotExists) :: Nil) =>
+ val Some(tableNameParts) :: partitionClause :: Nil =
+ getClauses(Seq("TOK_TABNAME", "TOK_PARTSPEC"), tableArgs)
+
+ val tableIdent = extractTableIdent(tableNameParts)
+
+ val partitionKeys = partitionClause.map(_.getChildren.map {
+ // Parse partitions. We also make keys case insensitive.
+ case Token("TOK_PARTVAL", Token(key, Nil) :: Token(value, Nil) :: Nil) =>
+ cleanIdentifier(key.toLowerCase) -> Some(PlanUtils.stripQuotes(value))
+ case Token("TOK_PARTVAL", Token(key, Nil) :: Nil) =>
+ cleanIdentifier(key.toLowerCase) -> None
+ }.toMap).getOrElse(Map.empty)
+
+ InsertIntoTable(UnresolvedRelation(tableIdent, None), partitionKeys, query, overwrite, true)
case a: ASTNode =>
throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5f7e897295..1ccb0c279c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -184,12 +184,14 @@ private[hive] trait HiveStrategies {
object DataSinks extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
+ case logical.InsertIntoTable(
+ table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
execution.InsertIntoHiveTable(
- table, partition, planLater(child), overwrite) :: Nil
- case hive.InsertIntoHiveTable(table: MetastoreRelation, partition, child, overwrite) =>
+ table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+ case hive.InsertIntoHiveTable(
+ table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
execution.InsertIntoHiveTable(
- table, partition, planLater(child), overwrite) :: Nil
+ table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index fade9e5852..76a1965f3c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -67,7 +67,7 @@ case class CreateTableAsSelect(
new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
}
} else {
- hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+ hiveContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
}
Seq.empty[Row]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 6c96747439..89995a91b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -45,7 +45,8 @@ case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
child: SparkPlan,
- overwrite: Boolean) extends UnaryNode with HiveInspectors {
+ overwrite: Boolean,
+ ifNotExists: Boolean) extends UnaryNode with HiveInspectors {
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@@ -219,15 +220,25 @@ case class InsertIntoHiveTable(
isSkewedStoreAsSubdir)
}
} else {
- catalog.synchronized {
- catalog.client.loadPartition(
- outputPath,
- qualifiedTableName,
- orderedPartitionSpec,
- overwrite,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
+ // scalastyle:off
+ // ifNotExists is only valid with static partition, refer to
+ // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
+ // scalastyle:on
+ val oldPart = catalog.synchronized {
+ catalog.client.getPartition(
+ catalog.client.getTable(qualifiedTableName), partitionSpec, false)
+ }
+ if (oldPart == null || !ifNotExists) {
+ catalog.synchronized {
+ catalog.client.loadPartition(
+ outputPath,
+ qualifiedTableName,
+ orderedPartitionSpec,
+ overwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ }
}
}
} else {
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-0-d5edc0daa94b33915df794df3b710774
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-1-9eb9372f4855928fae16f5fa554b3a62
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-10-ec2cef3d37146c450c60202a572f5cab
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-11-8854d6001200fc11529b2e2da755e5a2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-12-71ff68fda0aa7a36cb50d8fab0d70d25
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-13-7e4e7d7003fc6ef17bc19c3461ad899
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-14-ec2cef3d37146c450c60202a572f5cab
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-15-a3b2e230efde74e970ae8a3b55f383fc
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-2-8396c17a66e3d9a374d4361873b9bfe3
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-3-3876bb356dd8af7e78d061093d555457
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-4-528e23afb272c2e69004c86ddaa70ee
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-5-de5d56456c28d63775554e56355911d2
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53
new file mode 100644
index 0000000000..185a91c110
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-6-3efdc331b3b4bdac3e60c757600fff53
@@ -0,0 +1,5 @@
+98 val_98
+98 val_98
+97 val_97
+97 val_97
+96 val_96
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-7-92f6af82704504968de078c133f222f8
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-8-316cad7c63ddd4fb043be2affa5b0a67
diff --git a/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53 b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53
new file mode 100644
index 0000000000..185a91c110
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/insert1_overwrite_partitions-9-3efdc331b3b4bdac3e60c757600fff53
@@ -0,0 +1,5 @@
+98 val_98
+98 val_98
+97 val_97
+97 val_97
+96 val_96