diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-04-15 19:16:36 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-04-15 19:16:36 +0530 |
commit | 19652a44be81f3b8fbbb9ecc4987dcd933d2eca9 (patch) | |
tree | 4093b69770afd410f3b0bfa8ae502bc8a2ff64d9 | |
parent | 54b3d45b816f26a9d3509c1f8bea70c6d99d3de0 (diff) | |
download | spark-19652a44be81f3b8fbbb9ecc4987dcd933d2eca9.tar.gz spark-19652a44be81f3b8fbbb9ecc4987dcd933d2eca9.tar.bz2 spark-19652a44be81f3b8fbbb9ecc4987dcd933d2eca9.zip |
Fix issue with FileSuite failing
-rw-r--r-- | core/src/main/scala/spark/HadoopWriter.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 1 | ||||
-rw-r--r-- | core/src/test/scala/spark/FileSuite.scala | 1 |
3 files changed, 7 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index 80421b6328..5e8396edb9 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -2,14 +2,10 @@ package org.apache.hadoop.mapred import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path -import org.apache.hadoop.util.ReflectionUtils -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.Text import java.text.SimpleDateFormat import java.text.NumberFormat import java.io.IOException -import java.net.URI import java.util.Date import spark.Logging @@ -25,8 +21,6 @@ import spark.SerializableWritable */ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable { - println("Created HadoopWriter") - private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -43,7 +37,6 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe @transient private var taskContext: TaskAttemptContext = null def preSetup() { - println("preSetup") setIDs(0, 0, 0) setConfParams() @@ -53,20 +46,17 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe def setup(jobid: Int, splitid: Int, attemptid: Int) { - println("setup") setIDs(jobid, splitid, attemptid) setConfParams() } def open() { - println("open") val numfmt = NumberFormat.getInstance() numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) val outputName = "part-" + numfmt.format(splitID) val path = FileOutputFormat.getOutputPath(conf.value) - println("open outputName = " + outputName + ", fs for " + conf.value) val fs: FileSystem = { if (path != null) { path.getFileSystem(conf.value) @@ -81,7 +71,6 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe } def write(key: AnyRef, value: AnyRef) { - println("write " + key + " = " + value) if (writer!=null) { //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")") writer.write(key, value) @@ -91,19 +80,16 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe } def close() { - println("close") writer.close(Reporter.NULL) } def commit() { - println("commit") val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { try { cmtr.commitTask(taCtxt) logInfo (taID + ": Committed") - println("Committed = " + taID) } catch { case e: IOException => { logError("Error committing the output of task: " + taID.value, e) @@ -112,13 +98,17 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe } } } else { - println("No need to commit") logWarning ("No need to commit output of task: " + taID.value) } } + def commitJob() { + // always ? Or if cmtr.needsTaskCommit ? + val cmtr = getOutputCommitter() + cmtr.commitJob(getJobContext()) + } + def cleanup() { - println("cleanup") getOutputCommitter().cleanupJob(getJobContext()) } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 39469fa3c8..9a6966b3f1 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -636,6 +636,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( } self.context.runJob(self, writeToFile _) + writer.commitJob() writer.cleanup() } diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala index a3840905f4..91b48c7456 100644 --- a/core/src/test/scala/spark/FileSuite.scala +++ b/core/src/test/scala/spark/FileSuite.scala @@ -18,7 +18,6 @@ class FileSuite extends FunSuite with LocalSparkContext { val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 4) nums.saveAsTextFile(outputDir) - println("outputDir = " + outputDir) // Read the plain text file and check it's OK val outputFile = new File(outputDir, "part-00000") val content = Source.fromFile(outputFile).mkString |