diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index b92ea01a87..f6703986bd 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -42,7 +42,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) private val now = new Date() private val conf = new SerializableWritable(jobConf) - + private var jobID = 0 private var splitID = 0 private var attemptID = 0 @@ -58,8 +58,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def preSetup() { setIDs(0, 0, 0) HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value) - - val jCtxt = getJobContext() + + val jCtxt = getJobContext() getOutputCommitter().setupJob(jCtxt) } @@ -74,7 +74,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val numfmt = NumberFormat.getInstance() numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) - + val outputName = "part-" + numfmt.format(splitID) val path = FileOutputFormat.getOutputPath(conf.value) val fs: FileSystem = { @@ -85,7 +85,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } - getOutputCommitter().setupTask(getTaskContext()) + getOutputCommitter().setupTask(getTaskContext()) writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) } @@ -103,18 +103,18 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def commit() { val taCtxt = getTaskContext() - val cmtr = getOutputCommitter() + val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { try { cmtr.commitTask(taCtxt) logInfo (taID + ": Committed") } catch { - case e: IOException => { + case e: IOException => { logError("Error committing the output of task: " + taID.value, e) cmtr.abortTask(taCtxt) throw e } - } + } } else { logWarning ("No need to commit output of task: " + taID.value) } @@ -144,7 +144,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } private def getJobContext(): JobContext = { - if (jobContext == null) { + if (jobContext == null) { jobContext = newJobContext(conf.value, jID.value) } jobContext @@ -175,7 +175,7 @@ object SparkHadoopWriter { val jobtrackerID = formatter.format(time) new JobID(jobtrackerID, id) } - + def createPathFromString(path: String, conf: JobConf): Path = { if (path == null) { throw new IllegalArgumentException("Output path is null") |