From c71b25481aa5f7bc27d5c979e66bed54cd46b97e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 12 Jan 2017 17:45:55 +0800 Subject: [SPARK-19183][SQL] Add deleteWithJob hook to internal commit protocol API ## What changes were proposed in this pull request? Currently in SQL we implement overwrites by calling fs.delete() directly on the original data. This is not ideal since we the original files end up deleted even if the job aborts. We should extend the commit protocol to allow file overwrites to be managed as well. ## How was this patch tested? Existing tests. I also fixed a bunch of tests that were depending on the commit protocol implementation being set to the legacy mapreduce one. cc rxin cloud-fan Author: Eric Liang Author: Eric Liang Closes #16554 from ericl/add-delete-protocol. --- .../scala/org/apache/spark/internal/io/FileCommitProtocol.scala | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'core/src/main') diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index afd2250c93..2394cf361c 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -17,6 +17,7 @@ package org.apache.spark.internal.io +import org.apache.hadoop.fs._ import org.apache.hadoop.mapreduce._ import org.apache.spark.util.Utils @@ -112,6 +113,14 @@ abstract class FileCommitProtocol { * just crashes (or killed) before it can call abort. */ def abortTask(taskContext: TaskAttemptContext): Unit + + /** + * Specifies that a file should be deleted with the commit of this job. The default + * implementation deletes the file immediately. + */ + def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): Boolean = { + fs.delete(path, recursive) + } } -- cgit v1.2.3