diff options
author | Eric Liang <ekl@databricks.com> | 2017-01-12 17:45:55 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-01-12 17:45:55 +0800 |
commit | c71b25481aa5f7bc27d5c979e66bed54cd46b97e (patch) | |
tree | 16c7a3450f83aa3ea9f43d93a6ade9e9ae7cbc74 /core | |
parent | 5db35b312e96dea07f03100c64b58723c2430cd7 (diff) | |
download | spark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.tar.gz spark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.tar.bz2 spark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.zip |
[SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API
## What changes were proposed in this pull request?
Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well.
## How was this patch tested?
Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one.
cc rxin cloud-fan
Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>
Closes #16554 from ericl/add-delete-protocol.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index afd2250c93..2394cf361c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -17,6 +17,7 @@ package org.apache.spark.internal.io +import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ import org.apache.spark.util.Utils @@ -112,6 +113,14 @@ abstract class FileCommitProtocol { * just crashes (or killed) before it can call abort. */ def abortTask(taskContext: TaskAttemptContext): Unit + + /** + * Specifies that a file should be deleted with the commit of this job. The default + * implementation deletes the file immediately. + */ + def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { + fs.delete(path, recursive) + } } |