path: root/core
diff options
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-07-14 12:42:50 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-07-14 12:42:50 -0400
commit0ccfe20755665aa4c347b82e18297c5b3a2284ee (patch)
tree4230959322167b9f47b69c00d39dd428aca5ba77 /core
parent969644df8eec13f4b89597a0682c8037949d855b (diff)
Forgot to add a file
Diffstat (limited to 'core')
1 files changed, 170 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
new file mode 100644
index 0000000000..ae421a243e
--- /dev/null
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -0,0 +1,170 @@
+package org.apache.hadoop.mapred
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import java.text.SimpleDateFormat
+import java.text.NumberFormat
+import java.io.IOException
+import java.net.URI
+import java.util.Date
+import spark.SerializableWritable
+import spark.Logging
+ * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should
+ * also contain an output key class, an output value class, a filename to write to, etc
+ * exactly like in a Hadoop job.
+ */
+@serializable class HadoopWriter(@transient jobConf: JobConf) extends Logging {
+ private val now = new Date()
+ private val conf = new SerializableWritable(jobConf)
+ private var jobID = 0
+ private var splitID = 0
+ private var attemptID = 0
+ private var jID: SerializableWritable[JobID] = null
+ private var taID: SerializableWritable[TaskAttemptID] = null
+ @transient private var writer: RecordWriter[AnyRef,AnyRef] = null
+ @transient private var format: OutputFormat[AnyRef,AnyRef] = null
+ @transient private var committer: OutputCommitter = null
+ @transient private var jobContext: JobContext = null
+ @transient private var taskContext: TaskAttemptContext = null
+ def preSetup() {
+ setIDs(0, 0, 0)
+ setConfParams()
+ val jCtxt = getJobContext()
+ getOutputCommitter().setupJob(jCtxt)
+ }
+ def setup(jobid: Int, splitid: Int, attemptid: Int) {
+ setIDs(jobid, splitid, attemptid)
+ setConfParams()
+ }
+ def open() {
+ val numfmt = NumberFormat.getInstance()
+ numfmt.setMinimumIntegerDigits(5)
+ numfmt.setGroupingUsed(false)
+ val outputName = "part-" + numfmt.format(splitID)
+ val path = FileOutputFormat.getOutputPath(conf.value)
+ val fs: FileSystem = {
+ if (path != null)
+ path.getFileSystem(conf.value)
+ else
+ FileSystem.get(conf.value)
+ }
+ getOutputCommitter().setupTask(getTaskContext())
+ writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
+ }
+ def write(key: AnyRef, value: AnyRef) {
+ if (writer!=null) {
+ //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
+ writer.write(key, value)
+ } else
+ throw new IOException("Writer is null, open() has not been called")
+ }
+ def close() {
+ writer.close(Reporter.NULL)
+ }
+ def commit(): Boolean = {
+ var result = false
+ val taCtxt = getTaskContext()
+ val cmtr = getOutputCommitter()
+ if (cmtr.needsTaskCommit(taCtxt)) {
+ try {
+ cmtr.commitTask(taCtxt)
+ logInfo (taID + ": Committed")
+ result = true
+ } catch {
+ case e:IOException => {
+ logError ("Error committing the output of task: " + taID.value)
+ e.printStackTrace()
+ cmtr.abortTask(taCtxt)
+ }
+ }
+ return result
+ }
+ logWarning ("No need to commit output of task: " + taID.value)
+ return true
+ }
+ def cleanup() {
+ getOutputCommitter().cleanupJob(getJobContext())
+ }
+ // ********* Private Functions *********
+ private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
+ if (format == null)
+ format = conf.value.getOutputFormat().asInstanceOf[OutputFormat[AnyRef,AnyRef]]
+ return format
+ }
+ private def getOutputCommitter(): OutputCommitter = {
+ if (committer == null)
+ committer = conf.value.getOutputCommitter().asInstanceOf[OutputCommitter]
+ return committer
+ }
+ private def getJobContext(): JobContext = {
+ if (jobContext == null)
+ jobContext = new JobContext(conf.value, jID.value)
+ return jobContext
+ }
+ private def getTaskContext(): TaskAttemptContext = {
+ if (taskContext == null)
+ taskContext = new TaskAttemptContext(conf.value, taID.value)
+ return taskContext
+ }
+ private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
+ jobID = jobid
+ splitID = splitid
+ attemptID = attemptid
+ jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid))
+ taID = new SerializableWritable[TaskAttemptID] (new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ }
+ private def setConfParams() {
+ conf.value.set("mapred.job.id", jID.value.toString);
+ conf.value.set("mapred.tip.id", taID.value.getTaskID.toString);
+ conf.value.set("mapred.task.id", taID.value.toString);
+ conf.value.setBoolean("mapred.task.is.map", true);
+ conf.value.setInt("mapred.task.partition", splitID);
+ }
+object HadoopWriter {
+ def createJobID(time: Date, id: Int): JobID = {
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ val jobtrackerID = formatter.format(new Date())
+ return new JobID(jobtrackerID, id)
+ }
+ def createPathFromString(path: String, conf: JobConf): Path = {
+ if (path == null)
+ throw new IllegalArgumentException("Output path is null")
+ var outputPath = new Path(path)
+ val fs = outputPath.getFileSystem(conf)
+ if (outputPath == null || fs == null)
+ throw new IllegalArgumentException("Incorrectly formatted output path")
+ outputPath = outputPath.makeQualified(fs)
+ return outputPath
+ }