diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-04-15 18:26:50 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-04-15 18:26:50 +0530 |
commit | 54b3d45b816f26a9d3509c1f8bea70c6d99d3de0 (patch) | |
tree | c671b92ece3d568009bc5927a2c6811bd5fbaec5 /core | |
parent | d90d2af1036e909f81cf77c85bfe589993c4f9f3 (diff) | |
download | spark-54b3d45b816f26a9d3509c1f8bea70c6d99d3de0.tar.gz spark-54b3d45b816f26a9d3509c1f8bea70c6d99d3de0.tar.bz2 spark-54b3d45b816f26a9d3509c1f8bea70c6d99d3de0.zip |
Checkpoint commit - compiles and passes a lot of tests - not all though, looking into FileSuite issues
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/HadoopWriter.scala | 12 |
1 files changed, 12 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index afcf9f6db4..80421b6328 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -24,6 +24,8 @@ import spark.SerializableWritable * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable { + + println("Created HadoopWriter") private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -41,6 +43,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe @transient private var taskContext: TaskAttemptContext = null def preSetup() { + println("preSetup") setIDs(0, 0, 0) setConfParams() @@ -50,17 +53,20 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe def setup(jobid: Int, splitid: Int, attemptid: Int) { + println("setup") setIDs(jobid, splitid, attemptid) setConfParams() } def open() { + println("open") val numfmt = NumberFormat.getInstance() numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) val outputName = "part-" + numfmt.format(splitID) val path = FileOutputFormat.getOutputPath(conf.value) + println("open outputName = " + outputName + ", fs for " + conf.value) val fs: FileSystem = { if (path != null) { path.getFileSystem(conf.value) @@ -75,6 +81,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe } def write(key: AnyRef, value: AnyRef) { + println("write " + key + " = " + value) if (writer!=null) { //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")") writer.write(key, value) @@ -84,16 +91,19 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe } def close() { + println("close") writer.close(Reporter.NULL) } def commit() { + println("commit") val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { try { cmtr.commitTask(taCtxt) logInfo (taID + ": Committed") + println("Committed = " + taID) } catch { case e: IOException => { logError("Error committing the output of task: " + taID.value, e) @@ -102,11 +112,13 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe } } } else { + println("No need to commit") logWarning ("No need to commit output of task: " + taID.value) } } def cleanup() { + println("cleanup") getOutputCommitter().cleanupJob(getJobContext()) } |