aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala6
1 files changed, 4 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 66ccb6d437..d643a32af0 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.SparkHadoopWriter
import org.apache.spark.internal.Logging
import org.apache.spark.mapred.SparkHadoopMapRedUtil
@@ -69,7 +68,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def setupJob(jobContext: JobContext): Unit = {
// Setup IDs
- val jobId = SparkHadoopWriter.createJobID(new Date, 0)
+ val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, 0)
val taskAttemptId = new TaskAttemptID(taskId, 0)
@@ -108,4 +107,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
override def abortTask(taskContext: TaskAttemptContext): Unit = {
committer.abortTask(taskContext)
}
+
+ /** Whether we are using a direct output committer */
+ def isDirectOutput(): Boolean = committer.getClass.getSimpleName.contains("Direct")
}