From 048276799ae15ce5978733722e8ddde6a07302ff Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 6 Jun 2012 16:46:53 -0700 Subject: Commit task outputs to Hadoop-supported storage systems in parallel on the cluster instead of on the master. Fixes #110. --- core/src/main/scala/spark/HadoopWriter.scala | 19 ++++++++----------- core/src/main/scala/spark/PairRDDFunctions.scala | 6 +++--- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index 84b37218b5..790603581f 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -12,8 +12,8 @@ import java.io.IOException import java.net.URI import java.util.Date -import spark.SerializableWritable import spark.Logging +import spark.SerializableWritable /** * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should also @@ -84,26 +84,23 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializabl writer.close(Reporter.NULL) } - def commit(): Boolean = { - var result = false + def commit() { val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { try { cmtr.commitTask(taCtxt) logInfo (taID + ": Committed") - result = true } catch { - case e:IOException => { - logError ("Error committing the output of task: " + taID.value) - e.printStackTrace() + case e: IOException => { + logError("Error committing the output of task: " + taID.value, e) cmtr.abortTask(taCtxt) + throw e } } - return result - } - logWarning ("No need to commit output of task: " + taID.value) - return true + } else { + logWarning ("No need to commit output of task: " + taID.value) + } } def cleanup() { diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 4982a1aa15..8b63d1aba1 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -335,7 +335,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val writer = new HadoopWriter(conf) writer.preSetup() - def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopWriter = { + def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) { writer.setup(context.stageId, context.splitId, context.attemptId) writer.open() @@ -347,10 +347,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( } writer.close() - return writer + writer.commit() } - self.context.runJob(self, writeToFile _ ).foreach(_.commit()) + self.context.runJob(self, writeToFile _) writer.cleanup() } -- cgit v1.2.3