diff options
Diffstat (limited to 'sql/core')
7 files changed, 170 insertions, 49 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5d663949df..65422f1495 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -417,15 +417,17 @@ case class DataSource( // will be adjusted within InsertIntoHadoopFsRelation. val plan = InsertIntoHadoopFsRelationCommand( - outputPath, - columns, - bucketSpec, - format, - _ => Unit, // No existing table needs to be refreshed. - options, - data.logicalPlan, - mode, - catalogTable) + outputPath = outputPath, + staticPartitionKeys = Map.empty, + customPartitionLocations = Map.empty, + partitionColumns = columns, + bucketSpec = bucketSpec, + fileFormat = format, + refreshFunction = _ => Unit, // No existing table needs to be refreshed. + options = options, + query = data.logicalPlan, + mode = mode, + catalogTable = catalogTable) sparkSession.sessionState.executePlan(plan).toRdd // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it. copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 739aeac877..4f19a2d00b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -24,10 +24,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} -import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, DDLUtils, ExecutedCommandExec} +import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -182,41 +182,53 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { "Cannot overwrite a path that is also being read from.") } - val overwritingSinglePartition = - overwrite.specificPartition.isDefined && + val partitionSchema = query.resolve( + t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) + val partitionsTrackedByCatalog = t.sparkSession.sessionState.conf.manageFilesourcePartitions && + l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty && l.catalogTable.get.tracksPartitionsInCatalog - val effectiveOutputPath = if (overwritingSinglePartition) { - val partition = t.sparkSession.sessionState.catalog.getPartition( - l.catalogTable.get.identifier, overwrite.specificPartition.get) - new Path(partition.location) - } else { - outputPath - } - - val effectivePartitionSchema = if (overwritingSinglePartition) { - Nil - } else { - query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) + var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil + var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty + + // When partitions are tracked by the catalog, compute all custom partition locations that + // may be relevant to the insertion job. + if (partitionsTrackedByCatalog) { + val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions( + l.catalogTable.get.identifier, Some(overwrite.staticPartitionKeys)) + initialMatchingPartitions = matchingPartitions.map(_.spec) + customPartitionLocations = getCustomPartitionLocations( + t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions) } + // Callback for updating metastore partition metadata after the insertion job completes. + // TODO(ekl) consider moving this into InsertIntoHadoopFsRelationCommand def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { - if (l.catalogTable.isDefined && updatedPartitions.nonEmpty && - l.catalogTable.get.partitionColumnNames.nonEmpty && - l.catalogTable.get.tracksPartitionsInCatalog) { - val metastoreUpdater = AlterTableAddPartitionCommand( - l.catalogTable.get.identifier, - updatedPartitions.map(p => (p, None)), - ifNotExists = true) - metastoreUpdater.run(t.sparkSession) + if (partitionsTrackedByCatalog) { + val newPartitions = updatedPartitions.toSet -- initialMatchingPartitions + if (newPartitions.nonEmpty) { + AlterTableAddPartitionCommand( + l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), + ifNotExists = true).run(t.sparkSession) + } + if (overwrite.enabled) { + val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions + if (deletedPartitions.nonEmpty) { + AlterTableDropPartitionCommand( + l.catalogTable.get.identifier, deletedPartitions.toSeq, + ifExists = true, purge = true).run(t.sparkSession) + } + } } t.location.refresh() } val insertCmd = InsertIntoHadoopFsRelationCommand( - effectiveOutputPath, - effectivePartitionSchema, + outputPath, + if (overwrite.enabled) overwrite.staticPartitionKeys else Map.empty, + customPartitionLocations, + partitionSchema, t.bucketSpec, t.fileFormat, refreshPartitionsCallback, @@ -227,6 +239,34 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { insertCmd } + + /** + * Given a set of input partitions, returns those that have locations that differ from the + * Hive default (e.g. /k1=v1/k2=v2). These partitions were manually assigned locations by + * the user. + * + * @return a mapping from partition specs to their custom locations + */ + private def getCustomPartitionLocations( + spark: SparkSession, + table: CatalogTable, + basePath: Path, + partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = { + val hadoopConf = spark.sessionState.newHadoopConf + val fs = basePath.getFileSystem(hadoopConf) + val qualifiedBasePath = basePath.makeQualified(fs.getUri, fs.getWorkingDirectory) + partitions.flatMap { p => + val defaultLocation = qualifiedBasePath.suffix( + "/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString + val catalogLocation = new Path(p.location).makeQualified( + fs.getUri, fs.getWorkingDirectory).toString + if (catalogLocation != defaultLocation) { + Some(p.spec -> catalogLocation) + } else { + None + } + }.toMap + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 69b3fa667e..4e4b0e48cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -47,6 +47,10 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** A helper object for writing FileFormat data out to a location. */ object FileFormatWriter extends Logging { + /** Describes how output files should be placed in the filesystem. */ + case class OutputSpec( + outputPath: String, customPartitionLocations: Map[TablePartitionSpec, String]) + /** A shared job description for all the write tasks. */ private class WriteJobDescription( val uuid: String, // prevent collision between different (appending) write jobs @@ -56,7 +60,8 @@ object FileFormatWriter extends Logging { val partitionColumns: Seq[Attribute], val nonPartitionColumns: Seq[Attribute], val bucketSpec: Option[BucketSpec], - val path: String) + val path: String, + val customPartitionLocations: Map[TablePartitionSpec, String]) extends Serializable { assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns), @@ -83,7 +88,7 @@ object FileFormatWriter extends Logging { plan: LogicalPlan, fileFormat: FileFormat, committer: FileCommitProtocol, - outputPath: String, + outputSpec: OutputSpec, hadoopConf: Configuration, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], @@ -93,7 +98,7 @@ object FileFormatWriter extends Logging { val job = Job.getInstance(hadoopConf) job.setOutputKeyClass(classOf[Void]) job.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(job, new Path(outputPath)) + FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) val partitionSet = AttributeSet(partitionColumns) val dataColumns = plan.output.filterNot(partitionSet.contains) @@ -111,7 +116,8 @@ object FileFormatWriter extends Logging { partitionColumns = partitionColumns, nonPartitionColumns = dataColumns, bucketSpec = bucketSpec, - path = outputPath) + path = outputSpec.outputPath, + customPartitionLocations = outputSpec.customPartitionLocations) SQLExecution.withNewExecutionId(sparkSession, queryExecution) { // This call shouldn't be put into the `try` block below because it only initializes and @@ -308,7 +314,17 @@ object FileFormatWriter extends Logging { } val ext = bucketId + description.outputWriterFactory.getFileExtension(taskAttemptContext) - val path = committer.newTaskTempFile(taskAttemptContext, partDir, ext) + val customPath = partDir match { + case Some(dir) => + description.customPartitionLocations.get(PartitioningUtils.parsePathFragment(dir)) + case _ => + None + } + val path = if (customPath.isDefined) { + committer.newTaskTempFileAbsPath(taskAttemptContext, customPath.get, ext) + } else { + committer.newTaskTempFile(taskAttemptContext, partDir, ext) + } val newWriter = description.outputWriterFactory.newInstance( path = path, dataSchema = description.nonPartitionColumns.toStructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index a0a8cb5024..28975e1546 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ @@ -32,19 +32,32 @@ import org.apache.spark.sql.execution.command.RunnableCommand /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. + * + * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of + * partition overwrites: when the spec is empty, all partitions are + * overwritten. When it covers a prefix of the partition keys, only + * partitions matching the prefix are overwritten. + * @param customPartitionLocations mapping of partition specs to their custom locations. The + * caller should guarantee that exactly those table partitions + * falling under the specified static partition keys are contained + * in this map, and that no other partitions are. */ case class InsertIntoHadoopFsRelationCommand( outputPath: Path, + staticPartitionKeys: TablePartitionSpec, + customPartitionLocations: Map[TablePartitionSpec, String], partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], fileFormat: FileFormat, - refreshFunction: (Seq[TablePartitionSpec]) => Unit, + refreshFunction: Seq[TablePartitionSpec] => Unit, options: Map[String, String], @transient query: LogicalPlan, mode: SaveMode, catalogTable: Option[CatalogTable]) extends RunnableCommand { + import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName + override protected def innerChildren: Seq[LogicalPlan] = query :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { @@ -66,10 +79,7 @@ case class InsertIntoHadoopFsRelationCommand( case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => - if (!fs.delete(qualifiedOutputPath, true /* recursively */)) { - throw new IOException(s"Unable to clear output " + - s"directory $qualifiedOutputPath prior to writing to it") - } + deleteMatchingPartitions(fs, qualifiedOutputPath) true case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => true @@ -93,7 +103,8 @@ case class InsertIntoHadoopFsRelationCommand( plan = query, fileFormat = fileFormat, committer = committer, - outputPath = qualifiedOutputPath.toString, + outputSpec = FileFormatWriter.OutputSpec( + qualifiedOutputPath.toString, customPartitionLocations), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = bucketSpec, @@ -105,4 +116,40 @@ case class InsertIntoHadoopFsRelationCommand( Seq.empty[Row] } + + /** + * Deletes all partition files that match the specified static prefix. Partitions with custom + * locations are also cleared based on the custom locations map given to this class. + */ + private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = { + val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) { + "/" + partitionColumns.flatMap { p => + staticPartitionKeys.get(p.name) match { + case Some(value) => + Some(escapePathName(p.name) + "=" + escapePathName(value)) + case None => + None + } + }.mkString("/") + } else { + "" + } + // first clear the path determined by the static partition keys (e.g. /table/foo=1) + val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) + if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) { + throw new IOException(s"Unable to clear output " + + s"directory $staticPrefixPath prior to writing to it") + } + // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4) + for ((spec, customLoc) <- customPartitionLocations) { + assert( + (staticPartitionKeys.toSet -- spec).isEmpty, + "Custom partition location did not match static partitioning keys") + val path = new Path(customLoc) + if (fs.exists(path) && !fs.delete(path, true)) { + throw new IOException(s"Unable to clear partition " + + s"directory $path prior to writing to it") + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index a28b04ca3f..bf9f318780 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -62,6 +62,7 @@ object PartitioningUtils { } import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME + import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.unescapePathName /** @@ -253,6 +254,15 @@ object PartitioningUtils { } /** + * This is the inverse of parsePathFragment(). + */ + def getPathFragment(spec: TablePartitionSpec, partitionSchema: StructType): String = { + partitionSchema.map { field => + escapePathName(field.name) + "=" + escapePathName(spec(field.name)) + }.mkString("/") + } + + /** * Normalize the column names in partition specification, w.r.t. the real partition column names * and case sensitivity. e.g., if the partition spec has a column named `monTh`, and there is a * partition column named `month`, and it's case insensitive, we will normalize `monTh` to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index e849cafef4..f1c5f9ab50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -80,7 +80,7 @@ class FileStreamSink( plan = data.logicalPlan, fileFormat = fileFormat, committer = committer, - outputPath = path, + outputSpec = FileFormatWriter.OutputSpec(path, Map.empty), hadoopConf = hadoopConf, partitionColumns = partitionColumns, bucketSpec = None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 1fe13fa162..92191c8b64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -96,6 +96,12 @@ class ManifestFileCommitProtocol(jobId: String, path: String) file } + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + throw new UnsupportedOperationException( + s"$this does not support adding files with an absolute path") + } + override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { if (addedFiles.nonEmpty) { val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration) |