diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 6 |
1 files changed, 3 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index d237797aa6..911a002884 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -21,7 +21,7 @@ import java.io.{ObjectOutputStream, IOException} import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{Partition, Partitioner, SparkEnv, TaskContext} +import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} import org.apache.spark.util.AppendOnlyMap @@ -125,12 +125,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher - fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach { + fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach { kv => getSeq(kv._1)(depNum) += kv._2 } } } - map.iterator + new InterruptibleIterator(context, map.iterator) } override def clearDependencies() { |