aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala94
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala61
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala6
7 files changed, 170 insertions, 49 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5d663949df..65422f1495 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -417,15 +417,17 @@ case class DataSource(
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
- outputPath,
- columns,
- bucketSpec,
- format,
- _ => Unit, // No existing table needs to be refreshed.
- options,
- data.logicalPlan,
- mode,
- catalogTable)
+ outputPath = outputPath,
+ staticPartitionKeys = Map.empty,
+ customPartitionLocations = Map.empty,
+ partitionColumns = columns,
+ bucketSpec = bucketSpec,
+ fileFormat = format,
+ refreshFunction = _ => Unit, // No existing table needs to be refreshed.
+ options = options,
+ query = data.logicalPlan,
+ mode = mode,
+ catalogTable = catalogTable)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 739aeac877..4f19a2d00b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -24,10 +24,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -182,41 +182,53 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
"Cannot overwrite a path that is also being read from.")
}
- val overwritingSinglePartition =
- overwrite.specificPartition.isDefined &&
+ val partitionSchema = query.resolve(
+ t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
+ val partitionsTrackedByCatalog =
t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
+ l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty &&
l.catalogTable.get.tracksPartitionsInCatalog
- val effectiveOutputPath = if (overwritingSinglePartition) {
- val partition = t.sparkSession.sessionState.catalog.getPartition(
- l.catalogTable.get.identifier, overwrite.specificPartition.get)
- new Path(partition.location)
- } else {
- outputPath
- }
-
- val effectivePartitionSchema = if (overwritingSinglePartition) {
- Nil
- } else {
- query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
+ var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
+ var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
+
+ // When partitions are tracked by the catalog, compute all custom partition locations that
+ // may be relevant to the insertion job.
+ if (partitionsTrackedByCatalog) {
+ val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions(
+ l.catalogTable.get.identifier, Some(overwrite.staticPartitionKeys))
+ initialMatchingPartitions = matchingPartitions.map(_.spec)
+ customPartitionLocations = getCustomPartitionLocations(
+ t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions)
}
+ // Callback for updating metastore partition metadata after the insertion job completes.
+ // TODO(ekl) consider moving this into InsertIntoHadoopFsRelationCommand
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
- if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
- l.catalogTable.get.partitionColumnNames.nonEmpty &&
- l.catalogTable.get.tracksPartitionsInCatalog) {
- val metastoreUpdater = AlterTableAddPartitionCommand(
- l.catalogTable.get.identifier,
- updatedPartitions.map(p => (p, None)),
- ifNotExists = true)
- metastoreUpdater.run(t.sparkSession)
+ if (partitionsTrackedByCatalog) {
+ val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions
+ if (newPartitions.nonEmpty) {
+ AlterTableAddPartitionCommand(
+ l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
+ ifNotExists = true).run(t.sparkSession)
+ }
+ if (overwrite.enabled) {
+ val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
+ if (deletedPartitions.nonEmpty) {
+ AlterTableDropPartitionCommand(
+ l.catalogTable.get.identifier, deletedPartitions.toSeq,
+ ifExists = true, purge = true).run(t.sparkSession)
+ }
+ }
}
t.location.refresh()
}
val insertCmd = InsertIntoHadoopFsRelationCommand(
- effectiveOutputPath,
- effectivePartitionSchema,
+ outputPath,
+ if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty,
+ customPartitionLocations,
+ partitionSchema,
t.bucketSpec,
t.fileFormat,
refreshPartitionsCallback,
@@ -227,6 +239,34 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
insertCmd
}
+
+ /**
+ * Given a set of input partitions, returns those that have locations that differ from the
+ * Hive default (e.g. /k1=v1/k2=v2). These partitions were manually assigned locations by
+ * the user.
+ *
+ * @return a mapping from partition specs to their custom locations
+ */
+ private def getCustomPartitionLocations(
+ spark: SparkSession,
+ table: CatalogTable,
+ basePath: Path,
+ partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = {
+ val hadoopConf = spark.sessionState.newHadoopConf
+ val fs = basePath.getFileSystem(hadoopConf)
+ val qualifiedBasePath = basePath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ partitions.flatMap { p =>
+ val defaultLocation = qualifiedBasePath.suffix(
+ "/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString
+ val catalogLocation = new Path(p.location).makeQualified(
+ fs.getUri, fs.getWorkingDirectory).toString
+ if (catalogLocation != defaultLocation) {
+ Some(p.spec -> catalogLocation)
+ } else {
+ None
+ }
+ }.toMap
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 69b3fa667e..4e4b0e48cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -47,6 +47,10 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/** A helper object for writing FileFormat data out to a location. */
object FileFormatWriter extends Logging {
+ /** Describes how output files should be placed in the filesystem. */
+ case class OutputSpec(
+ outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String])
+
/** A shared job description for all the write tasks. */
private class WriteJobDescription(
val uuid: String, // prevent collision between different (appending) write jobs
@@ -56,7 +60,8 @@ object FileFormatWriter extends Logging {
val partitionColumns: Seq[Attribute],
val nonPartitionColumns: Seq[Attribute],
val bucketSpec: Option[BucketSpec],
- val path: String)
+ val path: String,
+ val customPartitionLocations: Map[TablePartitionSpec, String])
extends Serializable {
assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns),
@@ -83,7 +88,7 @@ object FileFormatWriter extends Logging {
plan: LogicalPlan,
fileFormat: FileFormat,
committer: FileCommitProtocol,
- outputPath: String,
+ outputSpec: OutputSpec,
hadoopConf: Configuration,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
@@ -93,7 +98,7 @@ object FileFormatWriter extends Logging {
val job = Job.getInstance(hadoopConf)
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[InternalRow])
- FileOutputFormat.setOutputPath(job, new Path(outputPath))
+ FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = plan.output.filterNot(partitionSet.contains)
@@ -111,7 +116,8 @@ object FileFormatWriter extends Logging {
partitionColumns = partitionColumns,
nonPartitionColumns = dataColumns,
bucketSpec = bucketSpec,
- path = outputPath)
+ path = outputSpec.outputPath,
+ customPartitionLocations = outputSpec.customPartitionLocations)
SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
// This call shouldn't be put into the `try` block below because it only initializes and
@@ -308,7 +314,17 @@ object FileFormatWriter extends Logging {
}
val ext = bucketId + description.outputWriterFactory.getFileExtension(taskAttemptContext)
- val path = committer.newTaskTempFile(taskAttemptContext, partDir, ext)
+ val customPath = partDir match {
+ case Some(dir) =>
+ description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir))
+ case _ =>
+ None
+ }
+ val path = if (customPath.isDefined) {
+ committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext)
+ } else {
+ committer.newTaskTempFile(taskAttemptContext, partDir, ext)
+ }
val newWriter = description.outputWriterFactory.newInstance(
path = path,
dataSchema = description.nonPartitionColumns.toStructType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index a0a8cb5024..28975e1546 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.IOException
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
@@ -32,19 +32,32 @@ import org.apache.spark.sql.execution.command.RunnableCommand
/**
* A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending.
* Writing to dynamic partitions is also supported.
+ *
+ * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of
+ * partition overwrites: when the spec is empty, all partitions are
+ * overwritten. When it covers a prefix of the partition keys, only
+ * partitions matching the prefix are overwritten.
+ * @param customPartitionLocations mapping of partition specs to their custom locations. The
+ * caller should guarantee that exactly those table partitions
+ * falling under the specified static partition keys are contained
+ * in this map, and that no other partitions are.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
+ staticPartitionKeys: TablePartitionSpec,
+ customPartitionLocations: Map[TablePartitionSpec, String],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
- refreshFunction: (Seq[TablePartitionSpec]) => Unit,
+ refreshFunction: Seq[TablePartitionSpec] => Unit,
options: Map[String, String],
@transient query: LogicalPlan,
mode: SaveMode,
catalogTable: Option[CatalogTable])
extends RunnableCommand {
+ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
+
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -66,10 +79,7 @@ case class InsertIntoHadoopFsRelationCommand(
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
- if (!fs.delete(qualifiedOutputPath, true /* recursively */)) {
- throw new IOException(s"Unable to clear output " +
- s"directory $qualifiedOutputPath prior to writing to it")
- }
+ deleteMatchingPartitions(fs, qualifiedOutputPath)
true
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
@@ -93,7 +103,8 @@ case class InsertIntoHadoopFsRelationCommand(
plan = query,
fileFormat = fileFormat,
committer = committer,
- outputPath = qualifiedOutputPath.toString,
+ outputSpec = FileFormatWriter.OutputSpec(
+ qualifiedOutputPath.toString, customPartitionLocations),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
@@ -105,4 +116,40 @@ case class InsertIntoHadoopFsRelationCommand(
Seq.empty[Row]
}
+
+ /**
+ * Deletes all partition files that match the specified static prefix. Partitions with custom
+ * locations are also cleared based on the custom locations map given to this class.
+ */
+ private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = {
+ val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) {
+ "/" + partitionColumns.flatMap { p =>
+ staticPartitionKeys.get(p.name) match {
+ case Some(value) =>
+ Some(escapePathName(p.name) + "=" + escapePathName(value))
+ case None =>
+ None
+ }
+ }.mkString("/")
+ } else {
+ ""
+ }
+ // first clear the path determined by the static partition keys (e.g. /table/foo=1)
+ val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
+ if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) {
+ throw new IOException(s"Unable to clear output " +
+ s"directory $staticPrefixPath prior to writing to it")
+ }
+ // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
+ for ((spec, customLoc) <- customPartitionLocations) {
+ assert(
+ (staticPartitionKeys.toSet -- spec).isEmpty,
+ "Custom partition location did not match static partitioning keys")
+ val path = new Path(customLoc)
+ if (fs.exists(path) && !fs.delete(path, true)) {
+ throw new IOException(s"Unable to clear partition " +
+ s"directory $path prior to writing to it")
+ }
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index a28b04ca3f..bf9f318780 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -62,6 +62,7 @@ object PartitioningUtils {
}
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName
/**
@@ -253,6 +254,15 @@ object PartitioningUtils {
}
/**
+ * This is the inverse of parsePathFragment().
+ */
+ def getPathFragment(spec: TablePartitionSpec, partitionSchema: StructType): String = {
+ partitionSchema.map { field =>
+ escapePathName(field.name) + "=" + escapePathName(spec(field.name))
+ }.mkString("/")
+ }
+
+ /**
* Normalize the column names in partition specification, w.r.t. the real partition column names
* and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a
* partition column named `month`, and it's case insensitive, we will normalize `monTh` to
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index e849cafef4..f1c5f9ab50 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -80,7 +80,7 @@ class FileStreamSink(
plan = data.logicalPlan,
fileFormat = fileFormat,
committer = committer,
- outputPath = path,
+ outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
hadoopConf = hadoopConf,
partitionColumns = partitionColumns,
bucketSpec = None,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 1fe13fa162..92191c8b64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -96,6 +96,12 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
file
}
+ override def newTaskTempFileAbsPath(
+ taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
+ throw new UnsupportedOperationException(
+ s"$this does not support adding files with an absolute path")
+ }
+
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
if (addedFiles.nonEmpty) {
val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)