aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-03-24 21:55:03 -0700
committerAaron Davidson <aaron@databricks.com>2014-03-24 21:55:03 -0700
commit5140598df889f7227c9d6a7953031eeef524badd (patch)
tree129e43867802653f6e29d48ff1ee6c2396392929 /core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
parentdc126f2121d0cd1dc0caa50ae0c4cb9137d42562 (diff)
downloadspark-5140598df889f7227c9d6a7953031eeef524badd.tar.gz
spark-5140598df889f7227c9d6a7953031eeef524badd.tar.bz2
spark-5140598df889f7227c9d6a7953031eeef524badd.zip
SPARK-1128: set hadoop task properties when constructing HadoopRDD
https://spark-project.atlassian.net/browse/SPARK-1128 The task properties are not set when constructing HadoopRDD in current implementation, this may limit the implementation based on ``` mapred.tip.id mapred.task.id mapred.task.is.map mapred.task.partition mapred.job.id ``` This patch also contains a small fix in createJobID (SparkHadoopWriter.scala), where the current implementation actually is not using time parameter Author: CodingCat <zhunansjtu@gmail.com> Author: Nan Zhu <CodingCat@users.noreply.github.com> Closes #101 from CodingCat/SPARK-1128 and squashes the following commits: ed0980f [CodingCat] make SparkHiveHadoopWriter belongs to spark package 5b1ad7d [CodingCat] move SparkHiveHadoopWriter to org.apache.spark package 258f92c [CodingCat] code cleanup af88939 [CodingCat] update the comments and permission of SparkHadoopWriter 9bd1fe3 [CodingCat] move configuration for jobConf to HadoopRDD b7bdfa5 [Nan Zhu] style fix a3153a8 [Nan Zhu] style fix c3258d2 [CodingCat] set hadoop task properties while using InputFormat
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala29
1 files changed, 10 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index d404459a8e..b92ea01a87 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -15,28 +15,26 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred
+package org.apache.spark
import java.io.IOException
import java.text.NumberFormat
import java.text.SimpleDateFormat
import java.util.Date
+import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.SerializableWritable
+import org.apache.spark.rdd.HadoopRDD
/**
- * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
- * because we need to access this class from the `spark` package to use some package-private Hadoop
- * functions, but this class should not be used directly by users.
+ * Internal helper class that saves an RDD using a Hadoop OutputFormat.
*
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
-private[apache]
+private[spark]
class SparkHadoopWriter(@transient jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
@@ -59,7 +57,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def preSetup() {
setIDs(0, 0, 0)
- setConfParams()
+ HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
@@ -68,7 +66,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def setup(jobid: Int, splitid: Int, attemptid: Int) {
setIDs(jobid, splitid, attemptid)
- setConfParams()
+ HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now),
+ jobid, splitID, attemptID, conf.value)
}
def open() {
@@ -167,21 +166,13 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}
-
- private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString)
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
- conf.value.set("mapred.task.id", taID.value.toString)
- conf.value.setBoolean("mapred.task.is.map", true)
- conf.value.setInt("mapred.task.partition", splitID)
- }
}
-private[apache]
+private[spark]
object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- val jobtrackerID = formatter.format(new Date())
+ val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}