aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-11-02 14:15:10 +0800
committerWenchen Fan <wenchen@databricks.com>2016-11-02 14:15:10 +0800
commitabefe2ec428dc24a4112c623fb6fbe4b2ca60a2b (patch)
treec37c599640ae00418499d9467a025a47daebe43d /sql
parent620da3b4828b3580c7ed7339b2a07938e6be1bb1 (diff)
downloadspark-abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b.tar.gz
spark-abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b.tar.bz2
spark-abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b.zip
[SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables
## What changes were proposed in this pull request? There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive. (1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition. (2) INSERT|OVERWRITE does not work with partitions that have custom locations. This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged. There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release. ## How was this patch tested? Unit tests. Author: Eric Liang <ekl@databricks.com> Closes #15705 from ericl/sc-4942.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala9
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala19
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala52
11 files changed, 129 insertions, 21 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 66e52ca68a..e901683be6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(TableIdentifier(tableName)),
- Map.empty, logicalPlan, overwrite, false)
+ Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38e9bb6c16..ac1577b3ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
}
+ val overwrite = ctx.OVERWRITE != null
+ val overwritePartition =
+ if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) {
+ Some(partitionKeys.map(t => (t._1, t._2.get)))
+ } else {
+ None
+ }
InsertIntoTable(
UnresolvedRelation(tableIdent, None),
partitionKeys,
query,
- ctx.OVERWRITE != null,
+ OverwriteOptions(overwrite, overwritePartition),
ctx.EXISTS != null)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a48974c632..7a15c2285d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
@@ -345,18 +346,32 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
}
+/**
+ * Options for writing new data into a table.
+ *
+ * @param enabled whether to overwrite existing data in the table.
+ * @param specificPartition only data in the specified partition will be overwritten.
+ */
+case class OverwriteOptions(
+ enabled: Boolean,
+ specificPartition: Option[CatalogTypes.TablePartitionSpec] = None) {
+ if (specificPartition.isDefined) {
+ assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.")
+ }
+}
+
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
child: LogicalPlan,
- overwrite: Boolean,
+ overwrite: OverwriteOptions,
ifNotExists: Boolean)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
- assert(overwrite || !ifNotExists)
+ assert(overwrite.enabled || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
override lazy val resolved: Boolean = childrenResolved && table.resolved
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index ca86304d4d..7400f3430e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -180,7 +180,16 @@ class PlanParserSuite extends PlanTest {
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifNotExists: Boolean = false): LogicalPlan =
- InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
+ InsertIntoTable(
+ table("s"), partition, plan,
+ OverwriteOptions(
+ overwrite,
+ if (overwrite && partition.nonEmpty) {
+ Some(partition.map(kv => (kv._1, kv._2.get)))
+ } else {
+ None
+ }),
+ ifNotExists)
// Single inserts
assertEqual(s"insert overwrite table s $sql",
@@ -196,9 +205,9 @@ class PlanParserSuite extends PlanTest {
val plan2 = table("t").where('x > 5).select(star())
assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
InsertIntoTable(
- table("s"), Map.empty, plan.limit(1), overwrite = false, ifNotExists = false).union(
+ table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
InsertIntoTable(
- table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false)))
+ table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
}
test ("insert with if not exists") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 11dd1df909..700f4835ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union}
import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.types.StructType
@@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
child = df.logicalPlan,
- overwrite = mode == SaveMode.Overwrite,
+ overwrite = OverwriteOptions(mode == SaveMode.Overwrite),
ifNotExists = false)).toRdd
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 092aabc89a..443a2ec033 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -67,7 +67,10 @@ class CatalogFileIndex(
val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
table.identifier, filters)
val partitions = selectedPartitions.map { p =>
- PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
+ val path = new Path(p.storage.locationUri.get)
+ val fs = path.getFileSystem(hadoopConf)
+ PartitionPath(
+ p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
new PrunedInMemoryFileIndex(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 34b77cab65..47c1f9d3fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable.ArrayBuffer
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -174,14 +176,32 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
}.flatten
- val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
- if (overwrite && inputPaths.contains(outputPath)) {
+ val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append
+ if (overwrite.enabled && inputPaths.contains(outputPath)) {
throw new AnalysisException(
"Cannot overwrite a path that is also being read from.")
}
+ val overwritingSinglePartition = (overwrite.specificPartition.isDefined &&
+ t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
+ l.catalogTable.get.partitionProviderIsHive)
+
+ val effectiveOutputPath = if (overwritingSinglePartition) {
+ val partition = t.sparkSession.sessionState.catalog.getPartition(
+ l.catalogTable.get.identifier, overwrite.specificPartition.get)
+ new Path(partition.storage.locationUri.get)
+ } else {
+ outputPath
+ }
+
+ val effectivePartitionSchema = if (overwritingSinglePartition) {
+ Nil
+ } else {
+ query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
+ }
+
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
- if (l.catalogTable.isDefined &&
+ if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
l.catalogTable.get.partitionColumnNames.nonEmpty &&
l.catalogTable.get.partitionProviderIsHive) {
val metastoreUpdater = AlterTableAddPartitionCommand(
@@ -194,8 +214,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
}
val insertCmd = InsertIntoHadoopFsRelationCommand(
- outputPath,
- query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
+ effectiveOutputPath,
+ effectivePartitionSchema,
t.bucketSpec,
t.fileFormat,
refreshPartitionsCallback,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index b2ff68a833..2eba1e9986 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.sources.InsertableRelation
@@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation
case class InsertIntoDataSourceCommand(
logicalRelation: LogicalRelation,
query: LogicalPlan,
- overwrite: Boolean)
+ overwrite: OverwriteOptions)
extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
@@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand(
val data = Dataset.ofRows(sparkSession, query)
// Apply the schema of the existing table to the new data.
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
- relation.insert(df, overwrite)
+ relation.insert(df, overwrite.enabled)
// Invalidate the cache.
sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9d2930948d..ce1e3eb1a5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -46,7 +46,8 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(
table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
- InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+ InsertIntoHiveTable(
+ table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ef5a5a001f..cac43597ae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions}
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.hive.MetastoreRelation
@@ -88,7 +88,8 @@ case class CreateHiveTableAsSelectCommand(
} else {
try {
sparkSession.sessionState.executePlan(InsertIntoTable(
- metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+ metastoreRelation, Map(), query, overwrite = OverwriteOptions(true),
+ ifNotExists = false)).toRdd
} catch {
case NonFatal(e) =>
// drop the created table.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 5f16960fb1..ac435bf619 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -134,4 +134,56 @@ class PartitionProviderCompatibilitySuite
}
}
}
+
+ test("insert overwrite partition of legacy datasource table overwrites entire table") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedDatasourceTable("test", dir)
+ spark.sql(
+ """insert overwrite table test
+ |partition (partCol=1)
+ |select * from range(100)""".stripMargin)
+ assert(spark.sql("select * from test").count() == 100)
+
+ // Dynamic partitions case
+ spark.sql("insert overwrite table test select id, id from range(10)".stripMargin)
+ assert(spark.sql("select * from test").count() == 10)
+ }
+ }
+ }
+ }
+
+ test("insert overwrite partition of new datasource table overwrites just partition") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedDatasourceTable("test", dir)
+ sql("msck repair table test")
+ spark.sql(
+ """insert overwrite table test
+ |partition (partCol=1)
+ |select * from range(100)""".stripMargin)
+ assert(spark.sql("select * from test").count() == 104)
+
+ // Test overwriting a partition that has a custom location
+ withTempDir { dir2 =>
+ sql(
+ s"""alter table test partition (partCol=1)
+ |set location '${dir2.getAbsolutePath}'""".stripMargin)
+ assert(sql("select * from test").count() == 4)
+ sql(
+ """insert overwrite table test
+ |partition (partCol=1)
+ |select * from range(30)""".stripMargin)
+ sql(
+ """insert overwrite table test
+ |partition (partCol=1)
+ |select * from range(20)""".stripMargin)
+ assert(sql("select * from test").count() == 24)
+ }
+ }
+ }
+ }
+ }
}