aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-11-10 17:00:43 -0800
committerReynold Xin <rxin@databricks.com>2016-11-10 17:00:43 -0800
commita3356343cbf58b930326f45721fb4ecade6f8029 (patch)
tree67d7c367c0dea780f5c8d3ff78a5525c4e3ad520 /sql/core
parente0deee1f7df31177cfc14bbb296f0baa372f473d (diff)
downloadspark-a3356343cbf58b930326f45721fb4ecade6f8029.tar.gz
spark-a3356343cbf58b930326f45721fb4ecade6f8029.tar.bz2
spark-a3356343cbf58b930326f45721fb4ecade6f8029.zip
[SPARK-18185] Fix all forms of INSERT / OVERWRITE TABLE for Datasource tables
## What changes were proposed in this pull request? As of current 2.1, INSERT OVERWRITE with dynamic partitions against a Datasource table will overwrite the entire table instead of only the partitions matching the static keys, as in Hive. It also doesn't respect custom partition locations. This PR adds support for all these operations to Datasource tables managed by the Hive metastore. It is implemented as follows - During planning time, the full set of partitions affected by an INSERT or OVERWRITE command is read from the Hive metastore. - The planner identifies any partitions with custom locations and includes this in the write task metadata. - FileFormatWriter tasks refer to this custom locations map when determining where to write for dynamic partition output. - When the write job finishes, the set of written partitions is compared against the initial set of matched partitions, and the Hive metastore is updated to reflect the newly added / removed partitions. It was necessary to introduce a method for staging files with absolute output paths to `FileCommitProtocol`. These files are not handled by the Hadoop output committer but are moved to their final locations when the job commits. The overwrite behavior of legacy Datasource tables is also changed: no longer will the entire table be overwritten if a partial partition spec is present. cc cloud-fan yhuai ## How was this patch tested? Unit tests, existing tests. Author: Eric Liang <ekl@databricks.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #15814 from ericl/sc-5027.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala94
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala6
7 files changed, 170 insertions, 49 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5d663949df..65422f1495 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -417,15 +417,17 @@ case class DataSource(
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
- outputPath,
- columns,
- bucketSpec,
- format,
- _ => Unit, // No existing table needs to be refreshed.
- options,
- data.logicalPlan,
- mode,
- catalogTable)
+ outputPath = outputPath,
+ staticPartitionKeys = Map.empty,
+ customPartitionLocations = Map.empty,
+ partitionColumns = columns,
+ bucketSpec = bucketSpec,
+ fileFormat = format,
+ refreshFunction = _ => Unit, // No existing table needs to be refreshed.
+ options = options,
+ query = data.logicalPlan,
+ mode = mode,
+ catalogTable = catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 739aeac877..4f19a2d00b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -24,10 +24,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -182,41 +182,53 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
"Cannot overwrite a path that is also being read from.")
}
- val overwritingSinglePartition =
- overwrite.specificPartition.isDefined &&
+ val partitionSchema = query.resolve(
+ t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
+ val partitionsTrackedByCatalog =
t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
+ l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
l.catalogTable.get.tracksPartitionsInCatalog
- val effectiveOutputPath = if (overwritingSinglePartition) {
- val partition = t.sparkSession.sessionState.catalog.getPartition(
- l.catalogTable.get.identifier, overwrite.specificPartition.get)
- new Path(partition.location)
- } else {
- outputPath
- }
-
- val effectivePartitionSchema = if (overwritingSinglePartition) {
- Nil
- } else {
- query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
+ var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
+ var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
+
+ // When partitions are tracked by the catalog, compute all custom partition locations that
+ // may be relevant to the insertion job.
+ if (partitionsTrackedByCatalog) {
+ val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions(
+ l.catalogTable.get.identifier, Some(overwrite.staticPartitionKeys))
+ initialMatchingPartitions = matchingPartitions.map(_.spec)
+ customPartitionLocations = getCustomPartitionLocations(
+ t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions)
}
+ // Callback for updating metastore partition metadata after the insertion job completes.
+ // TODO(ekl) consider moving this into InsertIntoHadoopFsRelationCommand
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
- if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
- l.catalogTable.get.partitionColumnNames.nonEmpty &&
- l.catalogTable.get.tracksPartitionsInCatalog) {
- val metastoreUpdater = AlterTableAddPartitionCommand(
- l.catalogTable.get.identifier,
- updatedPartitions.map(p => (p, None)),
- ifNotExists = true)
- metastoreUpdater.run(t.sparkSession)
+ if (partitionsTrackedByCatalog) {
+ val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
+ if (newPartitions.nonEmpty) {
+ AlterTableAddPartitionCommand(
+ l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
+ ifNotExists = true).run(t.sparkSession)
+ }
+ if (overwrite.enabled) {
+ val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
+ if (deletedPartitions.nonEmpty) {
+ AlterTableDropPartitionCommand(
+ l.catalogTable.get.identifier, deletedPartitions.toSeq,
+ ifExists = true, purge = true).run(t.sparkSession)
+ }
+ }
}
t.location.refresh()
}
val insertCmd = InsertIntoHadoopFsRelationCommand(
- effectiveOutputPath,
- effectivePartitionSchema,
+ outputPath,
+ if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty,
+ customPartitionLocations,
+ partitionSchema,
t.bucketSpec,
t.fileFormat,
refreshPartitionsCallback,
@@ -227,6 +239,34 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
insertCmd
}
+
+ /**
+ * Given a set of input partitions, returns those that have locations that differ from the
+ * Hive default (e.g. /k1=v1/k2=v2). These partitions were manually assigned locations by
+ * the user.
+ *
+ * @return a mapping from partition specs to their custom locations
+ */
+ private def getCustomPartitionLocations(
+ spark: SparkSession,
+ table: CatalogTable,
+ basePath: Path,
+ partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = {
+ val hadoopConf = spark.sessionState.newHadoopConf
+ val fs = basePath.getFileSystem(hadoopConf)
+ val qualifiedBasePath = basePath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ partitions.flatMap { p =>
+ val defaultLocation = qualifiedBasePath.suffix(
+ "/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString
+ val catalogLocation = new Path(p.location).makeQualified(
+ fs.getUri, fs.getWorkingDirectory).toString
+ if (catalogLocation != defaultLocation) {
+ Some(p.spec -> catalogLocation)
+ } else {
+ None
+ }
+ }.toMap
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 69b3fa667e..4e4b0e48cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -47,6 +47,10 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/** A helper object for writing FileFormat data out to a location. */
object FileFormatWriter extends Logging {
+ /** Describes how output files should be placed in the filesystem. */
+ case class OutputSpec(
+ outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String])
+
/** A shared job description for all the write tasks. */
private class WriteJobDescription(
val uuid: String, // prevent collision between different (appending) write jobs
@@ -56,7 +60,8 @@ object FileFormatWriter extends Logging {
val partitionColumns: Seq[Attribute],
val nonPartitionColumns: Seq[Attribute],
val bucketSpec: Option[BucketSpec],
- val path: String)
+ val path: String,
+ val customPartitionLocations: Map[TablePartitionSpec, String])
extends Serializable {
assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns),
@@ -83,7 +88,7 @@ object FileFormatWriter extends Logging {
plan: LogicalPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
- outputPath: String,
+ outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
@@ -93,7 +98,7 @@ object FileFormatWriter extends Logging {
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
- FileOutputFormat.setOutputPath(job, new Path(outputPath))
+ FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = plan.output.filterNot(partitionSet.contains)
@@ -111,7 +116,8 @@ object FileFormatWriter extends Logging {
partitionColumns = partitionColumns,
nonPartitionColumns = dataColumns,
bucketSpec = bucketSpec,
- path = outputPath)
+ path = outputSpec.outputPath,
+ customPartitionLocations = outputSpec.customPartitionLocations)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
// This call shouldn't be put into the `try` block below because it only initializes and
@@ -308,7 +314,17 @@ object FileFormatWriter extends Logging {
}
val ext = bucketId + description.outputWriterFactory.getFileExtension(taskAttemptContext)
- val path = committer.newTaskTempFile(taskAttemptContext, partDir, ext)
+ val customPath = partDir match {
+ case Some(dir) =>
+ description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir))
+ case _ =>
+ None
+ }
+ val path = if (customPath.isDefined) {
+ committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
+ } else {
+ committer.newTaskTempFile(taskAttemptContext, partDir, ext)
+ }
val newWriter = description.outputWriterFactory.newInstance(
path = path,
dataSchema = description.nonPartitionColumns.toStructType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index a0a8cb5024..28975e1546 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
@@ -32,19 +32,32 @@ import org.apache.spark.sql.execution.command.RunnableCommand
/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported.
+ *
+ * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of
+ * partition overwrites: when the spec is empty, all partitions are
+ * overwritten. When it covers a prefix of the partition keys, only
+ * partitions matching the prefix are overwritten.
+ * @param customPartitionLocations mapping of partition specs to their custom locations. The
+ * caller should guarantee that exactly those table partitions
+ * falling under the specified static partition keys are contained
+ * in this map, and that no other partitions are.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
+ staticPartitionKeys: TablePartitionSpec,
+ customPartitionLocations: Map[TablePartitionSpec, String],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
- refreshFunction: (Seq[TablePartitionSpec]) => Unit,
+ refreshFunction: Seq[TablePartitionSpec] => Unit,
options: Map[String, String],
@transient query: LogicalPlan,
mode: SaveMode,
catalogTable: Option[CatalogTable])
extends RunnableCommand {
+ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
+
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -66,10 +79,7 @@ case class InsertIntoHadoopFsRelationCommand(
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
- if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
- throw new IOException(s"Unable to clear output " +
- s"directory $qualifiedOutputPath prior to writing to it")
- }
+ deleteMatchingPartitions(fs, qualifiedOutputPath)
true
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
@@ -93,7 +103,8 @@ case class InsertIntoHadoopFsRelationCommand(
plan = query,
fileFormat = fileFormat,
committer = committer,
- outputPath = qualifiedOutputPath.toString,
+ outputSpec = FileFormatWriter.OutputSpec(
+ qualifiedOutputPath.toString, customPartitionLocations),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
@@ -105,4 +116,40 @@ case class InsertIntoHadoopFsRelationCommand(
Seq.empty[Row]
}
+
+ /**
+ * Deletes all partition files that match the specified static prefix. Partitions with custom
+ * locations are also cleared based on the custom locations map given to this class.
+ */
+ private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = {
+ val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) {
+ "/" + partitionColumns.flatMap { p =>
+ staticPartitionKeys.get(p.name) match {
+ case Some(value) =>
+ Some(escapePathName(p.name) + "=" + escapePathName(value))
+ case None =>
+ None
+ }
+ }.mkString("/")
+ } else {
+ ""
+ }
+ // first clear the path determined by the static partition keys (e.g. /table/foo=1)
+ val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
+ if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) {
+ throw new IOException(s"Unable to clear output " +
+ s"directory $staticPrefixPath prior to writing to it")
+ }
+ // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
+ for ((spec, customLoc) <- customPartitionLocations) {
+ assert(
+ (staticPartitionKeys.toSet -- spec).isEmpty,
+ "Custom partition location did not match static partitioning keys")
+ val path = new Path(customLoc)
+ if (fs.exists(path) && !fs.delete(path, true)) {
+ throw new IOException(s"Unable to clear partition " +
+ s"directory $path prior to writing to it")
+ }
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index a28b04ca3f..bf9f318780 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -62,6 +62,7 @@ object PartitioningUtils {
}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
/**
@@ -253,6 +254,15 @@ object PartitioningUtils {
}
/**
+ * This is the inverse of parsePathFragment().
+ */
+ def getPathFragment(spec: TablePartitionSpec, partitionSchema: StructType): String = {
+ partitionSchema.map { field =>
+ escapePathName(field.name) + "=" + escapePathName(spec(field.name))
+ }.mkString("/")
+ }
+
+ /**
* Normalize the column names in partition specification, w.r.t. the real partition column names
* and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a
* partition column named `month`, and it's case insensitive, we will normalize `monTh` to
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index e849cafef4..f1c5f9ab50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -80,7 +80,7 @@ class FileStreamSink(
plan = data.logicalPlan,
fileFormat = fileFormat,
committer = committer,
- outputPath = path,
+ outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 1fe13fa162..92191c8b64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -96,6 +96,12 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
file
}
+ override def newTaskTempFileAbsPath(
+ taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
+ throw new UnsupportedOperationException(
+ s"$this does not support adding files with an absolute path")
+ }
+
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
if (addedFiles.nonEmpty) {
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)