diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-14 12:42:50 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-14 12:42:50 -0400 |
commit | 0ccfe20755665aa4c347b82e18297c5b3a2284ee (patch) | |
tree | 4230959322167b9f47b69c00d39dd428aca5ba77 | |
parent | 969644df8eec13f4b89597a0682c8037949d855b (diff) | |
download | spark-0ccfe20755665aa4c347b82e18297c5b3a2284ee.tar.gz spark-0ccfe20755665aa4c347b82e18297c5b3a2284ee.tar.bz2 spark-0ccfe20755665aa4c347b82e18297c5b3a2284ee.zip |
Forgot to add a file
-rw-r--r-- | core/src/main/scala/spark/HadoopWriter.scala | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala new file mode 100644 index 0000000000..ae421a243e --- /dev/null +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -0,0 +1,170 @@ +package org.apache.hadoop.mapred + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.ReflectionUtils +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Text + +import java.text.SimpleDateFormat +import java.text.NumberFormat +import java.io.IOException +import java.net.URI +import java.util.Date + +import spark.SerializableWritable +import spark.Logging + +/** + * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should + * also contain an output key class, an output value class, a filename to write to, etc + * exactly like in a Hadoop job. + */ +@serializable class HadoopWriter(@transient jobConf: JobConf) extends Logging { + private val now = new Date() + private val conf = new SerializableWritable(jobConf) + + private var jobID = 0 + private var splitID = 0 + private var attemptID = 0 + private var jID: SerializableWritable[JobID] = null + private var taID: SerializableWritable[TaskAttemptID] = null + + @transient private var writer: RecordWriter[AnyRef,AnyRef] = null + @transient private var format: OutputFormat[AnyRef,AnyRef] = null + @transient private var committer: OutputCommitter = null + @transient private var jobContext: JobContext = null + @transient private var taskContext: TaskAttemptContext = null + + def preSetup() { + setIDs(0, 0, 0) + setConfParams() + + val jCtxt = getJobContext() + getOutputCommitter().setupJob(jCtxt) + } + + + def setup(jobid: Int, splitid: Int, attemptid: Int) { + setIDs(jobid, splitid, attemptid) + setConfParams() + } + + def open() { + val numfmt = NumberFormat.getInstance() + numfmt.setMinimumIntegerDigits(5) + numfmt.setGroupingUsed(false) + + val outputName = "part-" + numfmt.format(splitID) + val path = FileOutputFormat.getOutputPath(conf.value) + val fs: FileSystem = { + if (path != null) + path.getFileSystem(conf.value) + else + FileSystem.get(conf.value) + } + + getOutputCommitter().setupTask(getTaskContext()) + writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) + } + + def write(key: AnyRef, value: AnyRef) { + if (writer!=null) { + //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")") + writer.write(key, value) + } else + throw new IOException("Writer is null, open() has not been called") + } + + def close() { + writer.close(Reporter.NULL) + } + + def commit(): Boolean = { + var result = false + val taCtxt = getTaskContext() + val cmtr = getOutputCommitter() + if (cmtr.needsTaskCommit(taCtxt)) { + try { + cmtr.commitTask(taCtxt) + logInfo (taID + ": Committed") + result = true + } catch { + case e:IOException => { + logError ("Error committing the output of task: " + taID.value) + e.printStackTrace() + cmtr.abortTask(taCtxt) + } + } + return result + } + logWarning ("No need to commit output of task: " + taID.value) + return true + } + + def cleanup() { + getOutputCommitter().cleanupJob(getJobContext()) + } + + // ********* Private Functions ********* + + private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = { + if (format == null) + format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]] + return format + } + + private def getOutputCommitter(): OutputCommitter = { + if (committer == null) + committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter] + return committer + } + + private def getJobContext(): JobContext = { + if (jobContext == null) + jobContext = new JobContext(conf.value, jID.value) + return jobContext + } + + private def getTaskContext(): TaskAttemptContext = { + if (taskContext == null) + taskContext = new TaskAttemptContext(conf.value, taID.value) + return taskContext + } + + private def setIDs(jobid: Int, splitid: Int, attemptid: Int) { + jobID = jobid + splitID = splitid + attemptID = attemptid + + jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid)) + taID = new SerializableWritable[TaskAttemptID] (new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) + } + + private def setConfParams() { + conf.value.set("mapred.job.id", jID.value.toString); + conf.value.set("mapred.tip.id", taID.value.getTaskID.toString); + conf.value.set("mapred.task.id", taID.value.toString); + conf.value.setBoolean("mapred.task.is.map", true); + conf.value.setInt("mapred.task.partition", splitID); + } +} + +object HadoopWriter { + def createJobID(time: Date, id: Int): JobID = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + return new JobID(jobtrackerID, id) + } + + def createPathFromString(path: String, conf: JobConf): Path = { + if (path == null) + throw new IllegalArgumentException("Output path is null") + var outputPath = new Path(path) + val fs = outputPath.getFileSystem(conf) + if (outputPath == null || fs == null) + throw new IllegalArgumentException("Incorrectly formatted output path") + outputPath = outputPath.makeQualified(fs) + return outputPath + } +} |