aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-04-15 18:26:50 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-04-15 18:26:50 +0530
commit54b3d45b816f26a9d3509c1f8bea70c6d99d3de0 (patch)
treec671b92ece3d568009bc5927a2c6811bd5fbaec5 /core
parentd90d2af1036e909f81cf77c85bfe589993c4f9f3 (diff)
downloadspark-54b3d45b816f26a9d3509c1f8bea70c6d99d3de0.tar.gz
spark-54b3d45b816f26a9d3509c1f8bea70c6d99d3de0.tar.bz2
spark-54b3d45b816f26a9d3509c1f8bea70c6d99d3de0.zip
Checkpoint commit - compiles and passes a lot of tests - not all though, looking into FileSuite issues
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/HadoopWriter.scala12
1 files changed, 12 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index afcf9f6db4..80421b6328 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -24,6 +24,8 @@ import spark.SerializableWritable
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {
+
+ println("Created HadoopWriter")
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
@@ -41,6 +43,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
@transient private var taskContext: TaskAttemptContext = null
def preSetup() {
+ println("preSetup")
setIDs(0, 0, 0)
setConfParams()
@@ -50,17 +53,20 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
def setup(jobid: Int, splitid: Int, attemptid: Int) {
+ println("setup")
setIDs(jobid, splitid, attemptid)
setConfParams()
}
def open() {
+ println("open")
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)
val outputName = "part-" + numfmt.format(splitID)
val path = FileOutputFormat.getOutputPath(conf.value)
+ println("open outputName = " + outputName + ", fs for " + conf.value)
val fs: FileSystem = {
if (path != null) {
path.getFileSystem(conf.value)
@@ -75,6 +81,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
def write(key: AnyRef, value: AnyRef) {
+ println("write " + key + " = " + value)
if (writer!=null) {
//println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
writer.write(key, value)
@@ -84,16 +91,19 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
def close() {
+ println("close")
writer.close(Reporter.NULL)
}
def commit() {
+ println("commit")
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
+ println("Committed = " + taID)
} catch {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
@@ -102,11 +112,13 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
}
} else {
+ println("No need to commit")
logWarning ("No need to commit output of task: " + taID.value)
}
}
def cleanup() {
+ println("cleanup")
getOutputCommitter().cleanupJob(getJobContext())
}