diff options
author | CodingCat <zhunansjtu@gmail.com> | 2014-03-24 21:55:03 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2014-03-24 21:55:03 -0700 |
commit | 5140598df889f7227c9d6a7953031eeef524badd (patch) | |
tree | 129e43867802653f6e29d48ff1ee6c2396392929 /core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | |
parent | dc126f2121d0cd1dc0caa50ae0c4cb9137d42562 (diff) | |
download | spark-5140598df889f7227c9d6a7953031eeef524badd.tar.gz spark-5140598df889f7227c9d6a7953031eeef524badd.tar.bz2 spark-5140598df889f7227c9d6a7953031eeef524badd.zip |
SPARK-1128: set hadoop task properties when constructing HadoopRDD
https://spark-project.atlassian.net/browse/SPARK-1128
The task properties are not set when constructing HadoopRDD in current implementation, this may limit the implementation based on
```
mapred.tip.id
mapred.task.id
mapred.task.is.map
mapred.task.partition
mapred.job.id
```
This patch also contains a small fix in createJobID (SparkHadoopWriter.scala), where the current implementation actually is not using time parameter
Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>
Closes #101 from CodingCat/SPARK-1128 and squashes the following commits:
ed0980f [CodingCat] make SparkHiveHadoopWriter belongs to spark package
5b1ad7d [CodingCat] move SparkHiveHadoopWriter to org.apache.spark package
258f92c [CodingCat] code cleanup
af88939 [CodingCat] update the comments and permission of SparkHadoopWriter
9bd1fe3 [CodingCat] move configuration for jobConf to HadoopRDD
b7bdfa5 [Nan Zhu] style fix
a3153a8 [Nan Zhu] style fix
c3258d2 [CodingCat] set hadoop task properties while using InputFormat
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 29 |
1 files changed, 10 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index d404459a8e..b92ea01a87 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -15,28 +15,26 @@ * limitations under the License. */ -package org.apache.hadoop.mapred +package org.apache.spark import java.io.IOException import java.text.NumberFormat import java.text.SimpleDateFormat import java.util.Date +import org.apache.hadoop.mapred._ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path -import org.apache.spark.Logging -import org.apache.spark.SerializableWritable +import org.apache.spark.rdd.HadoopRDD /** - * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public - * because we need to access this class from the `spark` package to use some package-private Hadoop - * functions, but this class should not be used directly by users. + * Internal helper class that saves an RDD using a Hadoop OutputFormat. * * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -private[apache] +private[spark] class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil @@ -59,7 +57,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def preSetup() { setIDs(0, 0, 0) - setConfParams() + HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value) val jCtxt = getJobContext() getOutputCommitter().setupJob(jCtxt) @@ -68,7 +66,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) def setup(jobid: Int, splitid: Int, attemptid: Int) { setIDs(jobid, splitid, attemptid) - setConfParams() + HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now), + jobid, splitID, attemptID, conf.value) } def open() { @@ -167,21 +166,13 @@ class SparkHadoopWriter(@transient jobConf: JobConf) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) } - - private def setConfParams() { - conf.value.set("mapred.job.id", jID.value.toString) - conf.value.set("mapred.tip.id", taID.value.getTaskID.toString) - conf.value.set("mapred.task.id", taID.value.toString) - conf.value.setBoolean("mapred.task.is.map", true) - conf.value.setInt("mapred.task.partition", splitID) - } } -private[apache] +private[spark] object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) + val jobtrackerID = formatter.format(time) new JobID(jobtrackerID, id) } |