aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala20
1 files changed, 10 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index b92ea01a87..f6703986bd 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -42,7 +42,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
-
+
private var jobID = 0
private var splitID = 0
private var attemptID = 0
@@ -58,8 +58,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def preSetup() {
setIDs(0, 0, 0)
HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
-
- val jCtxt = getJobContext()
+
+ val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
}
@@ -74,7 +74,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)
-
+
val outputName = "part-" + numfmt.format(splitID)
val path = FileOutputFormat.getOutputPath(conf.value)
val fs: FileSystem = {
@@ -85,7 +85,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
- getOutputCommitter().setupTask(getTaskContext())
+ getOutputCommitter().setupTask(getTaskContext())
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
}
@@ -103,18 +103,18 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def commit() {
val taCtxt = getTaskContext()
- val cmtr = getOutputCommitter()
+ val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
} catch {
- case e: IOException => {
+ case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
- }
+ }
} else {
logWarning ("No need to commit output of task: " + taID.value)
}
@@ -144,7 +144,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
private def getJobContext(): JobContext = {
- if (jobContext == null) {
+ if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
jobContext
@@ -175,7 +175,7 @@ object SparkHadoopWriter {
val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}
-
+
def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")