aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2017-01-12 17:45:55 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-12 17:45:55 +0800
commitc71b25481aa5f7bc27d5c979e66bed54cd46b97e (patch)
tree16c7a3450f83aa3ea9f43d93a6ade9e9ae7cbc74 /core
parent5db35b312e96dea07f03100c64b58723c2430cd7 (diff)
downloadspark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.tar.gz
spark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.tar.bz2
spark-c71b25481aa5f7bc27d5c979e66bed54cd46b97e.zip
[SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API
## What changes were proposed in this pull request? Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well. ## How was this patch tested? Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one. cc rxin cloud-fan Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #16554 from ericl/add-delete-protocol.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index afd2250c93..2394cf361c 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -17,6 +17,7 @@
package org.apache.spark.internal.io
+import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce._
import org.apache.spark.util.Utils
@@ -112,6 +113,14 @@ abstract class FileCommitProtocol {
* just crashes (or killed) before it can call abort.
*/
def abortTask(taskContext: TaskAttemptContext): Unit
+
+ /**
+ * Specifies that a file should be deleted with the commit of this job. The default
+ * implementation deletes the file immediately.
+ */
+ def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = {
+ fs.delete(path, recursive)
+ }
}