aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-09-28 20:32:54 -0700
committerReynold Xin <rxin@apache.org>2014-09-28 20:32:54 -0700
commitf350cd307045c2c02e713225d8f1247f18ba123e (patch)
tree9cdd6fe5905d9ff9160ec0232b9d6079837a75e0 /core/src/main/scala
parent25164a89dd32eef58d9b6823ae259439f796e81a (diff)
downloadspark-f350cd307045c2c02e713225d8f1247f18ba123e.tar.gz
spark-f350cd307045c2c02e713225d8f1247f18ba123e.tar.bz2
spark-f350cd307045c2c02e713225d8f1247f18ba123e.zip
[SPARK-3543] TaskContext remaining cleanup work.
Author: Reynold Xin <rxin@apache.org> Closes #2560 from rxin/TaskContext and squashes the following commits: 9eff95a [Reynold Xin] [SPARK-3543] remaining cleanup work.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala3
2 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 036dcc4966..21d0cc7b5c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -194,7 +194,7 @@ class HadoopRDD[K, V](
val jobConf = getJobConf()
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
- context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
+ context.getStageId, theSplit.index, context.getAttemptId.toInt, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 7f578bc5da..67833743f3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -86,7 +86,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
- self.mapPartitionsWithContext((context, iter) => {
+ self.mapPartitions(iter => {
+ val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {