aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-06 16:48:59 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-06 16:48:59 -0700
commit7e1c97fc4b5a225e496ebd95c0ef6095dc4aeae9 (patch)
tree89660f22ef8ad9a3b494f346e010aafae93e4faa
parent6888bc71915dd6ff9932493bf934b7a9d53fd75f (diff)
parent048276799ae15ce5978733722e8ddde6a07302ff (diff)
downloadspark-7e1c97fc4b5a225e496ebd95c0ef6095dc4aeae9.tar.gz
spark-7e1c97fc4b5a225e496ebd95c0ef6095dc4aeae9.tar.bz2
spark-7e1c97fc4b5a225e496ebd95c0ef6095dc4aeae9.zip
Merge branch 'master' into mesos-0.9
-rw-r--r--core/src/main/scala/spark/HadoopWriter.scala19
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala6
2 files changed, 11 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index 84b37218b5..790603581f 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -12,8 +12,8 @@ import java.io.IOException
import java.net.URI
import java.util.Date
-import spark.SerializableWritable
import spark.Logging
+import spark.SerializableWritable
/**
* Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should also
@@ -84,26 +84,23 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl
writer.close(Reporter.NULL)
}
- def commit(): Boolean = {
- var result = false
+ def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
- result = true
} catch {
- case e:IOException => {
- logError ("Error committing the output of task: " + taID.value)
- e.printStackTrace()
+ case e: IOException => {
+ logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
+ throw e
}
}
- return result
- }
- logWarning ("No need to commit output of task: " + taID.value)
- return true
+ } else {
+ logWarning ("No need to commit output of task: " + taID.value)
+ }
}
def cleanup() {
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 4982a1aa15..8b63d1aba1 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -335,7 +335,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val writer = new HadoopWriter(conf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopWriter = {
+ def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
writer.setup(context.stageId, context.splitId, context.attemptId)
writer.open()
@@ -347,10 +347,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
writer.close()
- return writer
+ writer.commit()
}
- self.context.runJob(self, writeToFile _ ).foreach(_.commit())
+ self.context.runJob(self, writeToFile _)
writer.cleanup()
}