aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala (renamed from sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala)8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala2
5 files changed, 41 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index d404459a8e..b92ea01a87 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -15,28 +15,26 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred
+package org.apache.spark
import java.io.IOException
import java.text.NumberFormat
import java.text.SimpleDateFormat
import java.util.Date
+import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.SerializableWritable
+import org.apache.spark.rdd.HadoopRDD
/**
- * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
- * because we need to access this class from the `spark` package to use some package-private Hadoop
- * functions, but this class should not be used directly by users.
+ * Internal helper class that saves an RDD using a Hadoop OutputFormat.
*
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
-private[apache]
+private[spark]
class SparkHadoopWriter(@transient jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
@@ -59,7 +57,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def preSetup() {
setIDs(0, 0, 0)
- setConfParams()
+ HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
@@ -68,7 +66,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def setup(jobid: Int, splitid: Int, attemptid: Int) {
setIDs(jobid, splitid, attemptid)
- setConfParams()
+ HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(now),
+ jobid, splitID, attemptID, conf.value)
}
def open() {
@@ -167,21 +166,13 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}
-
- private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString)
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
- conf.value.set("mapred.task.id", taID.value.toString)
- conf.value.setBoolean("mapred.task.is.map", true)
- conf.value.setInt("mapred.task.partition", splitID)
- }
}
-private[apache]
+private[spark]
object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- val jobtrackerID = formatter.format(new Date())
+ val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 100ddb3607..932ff5bf36 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.rdd
+import java.text.SimpleDateFormat
+import java.util.Date
import java.io.EOFException
import scala.collection.immutable.Map
@@ -27,6 +29,9 @@ import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
+import org.apache.hadoop.mapred.JobID
+import org.apache.hadoop.mapred.TaskAttemptID
+import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.util.ReflectionUtils
import org.apache.spark._
@@ -111,6 +116,9 @@ class HadoopRDD[K, V](
protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+ // used to build JobTracker ID
+ private val createTime = new Date()
+
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
@@ -165,12 +173,14 @@ class HadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = {
val iter = new NextIterator[(K, V)] {
+
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
-
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
+ HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
+ context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
@@ -222,4 +232,17 @@ private[spark] object HadoopRDD {
def putCachedMetadata(key: String, value: Any) =
SparkEnv.get.hadoopJobMetadata.put(key, value)
+
+ /** Add Hadoop configuration specific to a single partition and attempt. */
+ def addLocalConfiguration(jobTrackerId: String, jobId: Int, splitId: Int, attemptId: Int,
+ conf: JobConf) {
+ val jobID = new JobID(jobTrackerId, jobId)
+ val taId = new TaskAttemptID(new TaskID(jobID, true, splitId), attemptId)
+
+ conf.set("mapred.tip.id", taId.getTaskID.toString)
+ conf.set("mapred.task.id", taId.toString)
+ conf.setBoolean("mapred.task.is.map", true)
+ conf.setInt("mapred.task.partition", splitId)
+ conf.set("mapred.job.id", jobID.toString)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 75fc02acd1..14386ff5b9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -34,14 +34,13 @@ import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob,
+RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
-import org.apache.hadoop.mapred.SparkHadoopWriter
-
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
diff --git a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 0b38731919..d96c2f70e0 100644
--- a/sql/hive/src/main/scala/org/apache/hadoop/mapred/SparkHadoopWriter.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapred
+package org.apache.spark
import java.io.IOException
import java.text.NumberFormat
@@ -25,16 +25,14 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
+import org.apache.hadoop.mapred._
import org.apache.hadoop.io.Writable
-import org.apache.spark.Logging
-import org.apache.spark.SerializableWritable
-
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
* It is based on [[SparkHadoopWriter]].
*/
-protected[apache]
+protected[spark]
class SparkHiveHadoopWriter(
@transient jobConf: JobConf,
fileSinkConf: FileSinkDesc)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
index 9aa9e173a8..78f69e7ff5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveOperators.scala
@@ -35,7 +35,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
import org.apache.spark.sql.execution._
-import org.apache.spark.{TaskContext, SparkException}
+import org.apache.spark.{SparkHiveHadoopWriter, TaskContext, SparkException}
/* Implicits */
import scala.collection.JavaConversions._