aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-11-10 17:00:43 -0800
committerReynold Xin <rxin@databricks.com>2016-11-10 17:00:43 -0800
commita3356343cbf58b930326f45721fb4ecade6f8029 (patch)
tree67d7c367c0dea780f5c8d3ff78a5525c4e3ad520 /core
parente0deee1f7df31177cfc14bbb296f0baa372f473d (diff)
downloadspark-a3356343cbf58b930326f45721fb4ecade6f8029.tar.gz
spark-a3356343cbf58b930326f45721fb4ecade6f8029.tar.bz2
spark-a3356343cbf58b930326f45721fb4ecade6f8029.zip
[SPARK-18185] Fix all forms of INSERT / OVERWRITE TABLE for Datasource tables
## What changes were proposed in this pull request? As of current 2.1, INSERT OVERWRITE with dynamic partitions against a Datasource table will overwrite the entire table instead of only the partitions matching the static keys, as in Hive. It also doesn't respect custom partition locations. This PR adds support for all these operations to Datasource tables managed by the Hive metastore. It is implemented as follows - During planning time, the full set of partitions affected by an INSERT or OVERWRITE command is read from the Hive metastore. - The planner identifies any partitions with custom locations and includes this in the write task metadata. - FileFormatWriter tasks refer to this custom locations map when determining where to write for dynamic partition output. - When the write job finishes, the set of written partitions is compared against the initial set of matched partitions, and the Hive metastore is updated to reflect the newly added / removed partitions. It was necessary to introduce a method for staging files with absolute output paths to `FileCommitProtocol`. These files are not handled by the Hadoop output committer but are moved to their final locations when the job commits. The overwrite behavior of legacy Datasource tables is also changed: no longer will the entire table be overwritten if a partial partition spec is present. cc cloud-fan yhuai ## How was this patch tested? Unit tests, existing tests. Author: Eric Liang <ekl@databricks.com> Author: Wenchen Fan <wenchen@databricks.com> Closes #15814 from ericl/sc-5027.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala63
2 files changed, 71 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index fb8020585c..afd2250c93 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -82,10 +82,25 @@ abstract class FileCommitProtocol {
*
* The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest
* are left to the commit protocol implementation to decide.
+ *
+ * Important: it is the caller's responsibility to add uniquely identifying content to "ext"
+ * if a task is going to write out multiple files to the same dir. The file commit protocol only
+ * guarantees that files written by different tasks will not conflict.
*/
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
/**
+ * Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
+ * Depending on the implementation, there may be weaker guarantees around adding files this way.
+ *
+ * Important: it is the caller's responsibility to add uniquely identifying content to "ext"
+ * if a task is going to write out multiple files to the same dir. The file commit protocol only
+ * guarantees that files written by different tasks will not conflict.
+ */
+ def newTaskTempFileAbsPath(
+ taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String
+
+ /**
* Commits a task after the writes succeed. Must be called on the executors when running tasks.
*/
def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 6b0bcb8f90..b2d9b8d2a0 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -17,7 +17,9 @@
package org.apache.spark.internal.io
-import java.util.Date
+import java.util.{Date, UUID}
+
+import scala.collection.mutable
import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
@@ -42,6 +44,19 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
/** OutputCommitter from Hadoop is not serializable so marking it transient. */
@transient private var committer: OutputCommitter = _
+ /**
+ * Tracks files staged by this task for absolute output paths. These outputs are not managed by
+ * the Hadoop OutputCommitter, so we must move these to their final locations on job commit.
+ *
+ * The mapping is from the temp output path to the final desired output path of the file.
+ */
+ @transient private var addedAbsPathFiles: mutable.Map[String, String] = null
+
+ /**
+ * The staging directory for all files committed with absolute output paths.
+ */
+ private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
+
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
@@ -54,11 +69,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def newTaskTempFile(
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
- // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
- // Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
- // the file name is fine and won't overflow.
- val split = taskContext.getTaskAttemptID.getTaskID.getId
- val filename = f"part-$split%05d-$jobId$ext"
+ val filename = getFilename(taskContext, ext)
val stagingDir: String = committer match {
// For FileOutputCommitter it has its own staging path called "work path".
@@ -73,6 +84,28 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
}
}
+ override def newTaskTempFileAbsPath(
+ taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
+ val filename = getFilename(taskContext, ext)
+ val absOutputPath = new Path(absoluteDir, filename).toString
+
+ // Include a UUID here to prevent file collisions for one task writing to different dirs.
+ // In principle we could include hash(absoluteDir) instead but this is simpler.
+ val tmpOutputPath = new Path(
+ absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString
+
+ addedAbsPathFiles(tmpOutputPath) = absOutputPath
+ tmpOutputPath
+ }
+
+ private def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
+ // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+ // Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
+ // the file name is fine and won't overflow.
+ val split = taskContext.getTaskAttemptID.getTaskID.getId
+ f"part-$split%05d-$jobId$ext"
+ }
+
override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
@@ -93,26 +126,42 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
+ val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
+ .foldLeft(Map[String, String]())(_ ++ _)
+ logDebug(s"Committing files staged for absolute locations $filesToMove")
+ val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+ for ((src, dst) <- filesToMove) {
+ fs.rename(new Path(src), new Path(dst))
+ }
+ fs.delete(absPathStagingDir, true)
}
override def abortJob(jobContext: JobContext): Unit = {
committer.abortJob(jobContext, JobStatus.State.FAILED)
+ val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+ fs.delete(absPathStagingDir, true)
}
override def setupTask(taskContext: TaskAttemptContext): Unit = {
committer = setupCommitter(taskContext)
committer.setupTask(taskContext)
+ addedAbsPathFiles = mutable.Map[String, String]()
}
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
- EmptyTaskCommitMessage
+ new TaskCommitMessage(addedAbsPathFiles.toMap)
}
override def abortTask(taskContext: TaskAttemptContext): Unit = {
committer.abortTask(taskContext)
+ // best effort cleanup of other staged files
+ for ((src, _) <- addedAbsPathFiles) {
+ val tmp = new Path(src)
+ tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
+ }
}
/** Whether we are using a direct output committer */