aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-05 14:11:05 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-05 14:11:05 +0800
commit30345c43b7d17bb00184b60a547225bae8ee78e7 (patch)
treef7bdd72e3f720c5e8491803f9246bb140c79baa7 /sql/core
parent6873430cb5ec0096991a02d4e01266945e79ebb9 (diff)
downloadspark-30345c43b7d17bb00184b60a547225bae8ee78e7.tar.gz
spark-30345c43b7d17bb00184b60a547225bae8ee78e7.tar.bz2
spark-30345c43b7d17bb00184b60a547225bae8ee78e7.zip
[SPARK-19058][SQL] fix partition related behaviors with DataFrameWriter.saveAsTable
## What changes were proposed in this pull request? When we append data to a partitioned table with `DataFrameWriter.saveAsTable`, there are 2 issues: 1. doesn't work when the partition has custom location. 2. will recover all partitions This PR fixes them by moving the special partition handling code from `DataSourceAnalysis` to `InsertIntoHadoopFsRelationCommand`, so that the `DataFrameWriter.saveAsTable` code path can also benefit from it. ## How was this patch tested? newly added regression tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16460 from cloud-fan/append.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala78
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala88
6 files changed, 106 insertions, 108 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 405f38ad49..3127ebf679 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -393,7 +393,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Drop the existing table
catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
- createTable(tableIdent)
+ createTable(tableIdentWithDB)
+ // Refresh the cache of the table in the catalog.
+ catalog.refreshTable(tableIdentWithDB)
case _ => createTable(tableIdent)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index c64c7ad943..73b21533a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -138,7 +138,7 @@ case class CreateDataSourceTableAsSelectCommand(
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
- val result = if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+ if (sessionState.catalog.tableExists(tableIdentWithDB)) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableName has been dropped when the save mode is Overwrite")
@@ -150,14 +150,16 @@ case class CreateDataSourceTableAsSelectCommand(
return Seq.empty
}
- saveDataIntoTable(sparkSession, table, table.storage.locationUri, query, mode)
+ saveDataIntoTable(
+ sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
} else {
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.locationUri
}
- val result = saveDataIntoTable(sparkSession, table, tableLocation, query, mode)
+ val result = saveDataIntoTable(
+ sparkSession, table, tableLocation, query, mode, tableExists = false)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -165,20 +167,17 @@ case class CreateDataSourceTableAsSelectCommand(
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
schema = result.schema)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
- result
- }
- result match {
- case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
- sparkSession.sqlContext.conf.manageFilesourcePartitions =>
- // Need to recover partitions into the metastore so our saved data is visible.
- sparkSession.sessionState.executePlan(
- AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
- case _ =>
+ result match {
+ case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
+ sparkSession.sqlContext.conf.manageFilesourcePartitions =>
+ // Need to recover partitions into the metastore so our saved data is visible.
+ sparkSession.sessionState.executePlan(
+ AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
+ case _ =>
+ }
}
- // Refresh the cache of the table in the catalog.
- sessionState.catalog.refreshTable(tableIdentWithDB)
Seq.empty[Row]
}
@@ -187,7 +186,8 @@ case class CreateDataSourceTableAsSelectCommand(
table: CatalogTable,
tableLocation: Option[String],
data: LogicalPlan,
- mode: SaveMode): BaseRelation = {
+ mode: SaveMode,
+ tableExists: Boolean): BaseRelation = {
// Create the relation based on the input logical plan: `data`.
val pathOption = tableLocation.map("path" -> _)
val dataSource = DataSource(
@@ -196,7 +196,7 @@ case class CreateDataSourceTableAsSelectCommand(
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
- catalogTable = Some(table))
+ catalogTable = if (tableExists) Some(table) else None)
try {
dataSource.write(mode, Dataset.ofRows(session, query))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ac3f0688bb..7e23260e65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -473,6 +473,11 @@ case class DataSource(
s"Unable to resolve $name given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
+ val fileIndex = catalogTable.map(_.identifier).map { tableIdent =>
+ sparkSession.table(tableIdent).queryExecution.analyzed.collect {
+ case LogicalRelation(t: HadoopFsRelation, _, _) => t.location
+ }.head
+ }
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
@@ -480,15 +485,14 @@ case class DataSource(
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitions = Map.empty,
- customPartitionLocations = Map.empty,
partitionColumns = columns,
bucketSpec = bucketSpec,
fileFormat = format,
- refreshFunction = _ => Unit, // No existing table needs to be refreshed.
options = options,
query = data.logicalPlan,
mode = mode,
- catalogTable = catalogTable)
+ catalogTable = catalogTable,
+ fileIndex = fileIndex)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 61f0d43f24..3d3db06eee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -197,91 +197,19 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
val partitionSchema = actualQuery.resolve(
t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
- val partitionsTrackedByCatalog =
- t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
- l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
- l.catalogTable.get.tracksPartitionsInCatalog
-
- var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
- var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
-
val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
- // When partitions are tracked by the catalog, compute all custom partition locations that
- // may be relevant to the insertion job.
- if (partitionsTrackedByCatalog) {
- val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions(
- l.catalogTable.get.identifier, Some(staticPartitions))
- initialMatchingPartitions = matchingPartitions.map(_.spec)
- customPartitionLocations = getCustomPartitionLocations(
- t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions)
- }
-
- // Callback for updating metastore partition metadata after the insertion job completes.
- // TODO(ekl) consider moving this into InsertIntoHadoopFsRelationCommand
- def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
- if (partitionsTrackedByCatalog) {
- val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
- if (newPartitions.nonEmpty) {
- AlterTableAddPartitionCommand(
- l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
- ifNotExists = true).run(t.sparkSession)
- }
- if (overwrite) {
- val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
- if (deletedPartitions.nonEmpty) {
- AlterTableDropPartitionCommand(
- l.catalogTable.get.identifier, deletedPartitions.toSeq,
- ifExists = true, purge = false,
- retainData = true /* already deleted */).run(t.sparkSession)
- }
- }
- }
- t.location.refresh()
- }
-
- val insertCmd = InsertIntoHadoopFsRelationCommand(
+ InsertIntoHadoopFsRelationCommand(
outputPath,
staticPartitions,
- customPartitionLocations,
partitionSchema,
t.bucketSpec,
t.fileFormat,
- refreshPartitionsCallback,
t.options,
actualQuery,
mode,
- table)
-
- insertCmd
- }
-
- /**
- * Given a set of input partitions, returns those that have locations that differ from the
- * Hive default (e.g. /k1=v1/k2=v2). These partitions were manually assigned locations by
- * the user.
- *
- * @return a mapping from partition specs to their custom locations
- */
- private def getCustomPartitionLocations(
- spark: SparkSession,
- table: CatalogTable,
- basePath: Path,
- partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = {
- val hadoopConf = spark.sessionState.newHadoopConf
- val fs = basePath.getFileSystem(hadoopConf)
- val qualifiedBasePath = basePath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- partitions.flatMap { p =>
- val defaultLocation = qualifiedBasePath.suffix(
- "/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString
- val catalogLocation = new Path(p.location).makeQualified(
- fs.getUri, fs.getWorkingDirectory).toString
- if (catalogLocation != defaultLocation) {
- Some(p.spec -> catalogLocation)
- } else {
- None
- }
- }.toMap
+ table,
+ Some(t.location))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index ead3233202..6d0671d4cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -106,7 +106,7 @@ object FileSourceStrategy extends Strategy with Logging {
val outputAttributes = readDataColumns ++ partitionColumns
val scan =
- new FileSourceScanExec(
+ FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 53c884c22b..84ea58b68a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -23,11 +23,11 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command._
/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
@@ -37,23 +37,18 @@ import org.apache.spark.sql.execution.command.RunnableCommand
* overwrites: when the spec is empty, all partitions are overwritten.
* When it covers a prefix of the partition keys, only partitions matching
* the prefix are overwritten.
- * @param customPartitionLocations mapping of partition specs to their custom locations. The
- * caller should guarantee that exactly those table partitions
- * falling under the specified static partition keys are contained
- * in this map, and that no other partitions are.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
staticPartitions: TablePartitionSpec,
- customPartitionLocations: Map[TablePartitionSpec, String],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
- refreshFunction: Seq[TablePartitionSpec] => Unit,
options: Map[String, String],
@transient query: LogicalPlan,
mode: SaveMode,
- catalogTable: Option[CatalogTable])
+ catalogTable: Option[CatalogTable],
+ fileIndex: Option[FileIndex])
extends RunnableCommand {
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
@@ -74,12 +69,30 @@ case class InsertIntoHadoopFsRelationCommand(
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ val partitionsTrackedByCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions &&
+ catalogTable.isDefined &&
+ catalogTable.get.partitionColumnNames.nonEmpty &&
+ catalogTable.get.tracksPartitionsInCatalog
+
+ var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
+ var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
+
+ // When partitions are tracked by the catalog, compute all custom partition locations that
+ // may be relevant to the insertion job.
+ if (partitionsTrackedByCatalog) {
+ val matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
+ catalogTable.get.identifier, Some(staticPartitions))
+ initialMatchingPartitions = matchingPartitions.map(_.spec)
+ customPartitionLocations = getCustomPartitionLocations(
+ fs, catalogTable.get, qualifiedOutputPath, matchingPartitions)
+ }
+
val pathExists = fs.exists(qualifiedOutputPath)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
- deleteMatchingPartitions(fs, qualifiedOutputPath)
+ deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations)
true
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
@@ -98,6 +111,27 @@ case class InsertIntoHadoopFsRelationCommand(
outputPath = outputPath.toString,
isAppend = isAppend)
+ // Callback for updating metastore partition metadata after the insertion job completes.
+ def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
+ if (partitionsTrackedByCatalog) {
+ val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
+ if (newPartitions.nonEmpty) {
+ AlterTableAddPartitionCommand(
+ catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
+ ifNotExists = true).run(sparkSession)
+ }
+ if (mode == SaveMode.Overwrite) {
+ val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
+ if (deletedPartitions.nonEmpty) {
+ AlterTableDropPartitionCommand(
+ catalogTable.get.identifier, deletedPartitions.toSeq,
+ ifExists = true, purge = false,
+ retainData = true /* already deleted */).run(sparkSession)
+ }
+ }
+ }
+ }
+
FileFormatWriter.write(
sparkSession = sparkSession,
queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
@@ -108,8 +142,10 @@ case class InsertIntoHadoopFsRelationCommand(
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
- refreshFunction = refreshFunction,
+ refreshFunction = refreshPartitionsCallback,
options = options)
+
+ fileIndex.foreach(_.refresh())
} else {
logInfo("Skipping insertion into a relation that already exists.")
}
@@ -121,7 +157,10 @@ case class InsertIntoHadoopFsRelationCommand(
* Deletes all partition files that match the specified static prefix. Partitions with custom
* locations are also cleared based on the custom locations map given to this class.
*/
- private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = {
+ private def deleteMatchingPartitions(
+ fs: FileSystem,
+ qualifiedOutputPath: Path,
+ customPartitionLocations: Map[TablePartitionSpec, String]): Unit = {
val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
"/" + partitionColumns.flatMap { p =>
staticPartitions.get(p.name) match {
@@ -152,4 +191,29 @@ case class InsertIntoHadoopFsRelationCommand(
}
}
}
+
+ /**
+ * Given a set of input partitions, returns those that have locations that differ from the
+ * Hive default (e.g. /k1=v1/k2=v2). These partitions were manually assigned locations by
+ * the user.
+ *
+ * @return a mapping from partition specs to their custom locations
+ */
+ private def getCustomPartitionLocations(
+ fs: FileSystem,
+ table: CatalogTable,
+ qualifiedOutputPath: Path,
+ partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = {
+ partitions.flatMap { p =>
+ val defaultLocation = qualifiedOutputPath.suffix(
+ "/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString
+ val catalogLocation = new Path(p.location).makeQualified(
+ fs.getUri, fs.getWorkingDirectory).toString
+ if (catalogLocation != defaultLocation) {
+ Some(p.spec -> catalogLocation)
+ } else {
+ None
+ }
+ }.toMap
+ }
}