aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-11-18 16:25:44 -0800
committerXiangrui Meng <meng@databricks.com>2014-11-18 16:25:44 -0800
commitbb46046154a438df4db30a0e1fd557bd3399ee7b (patch)
tree30e2ac8c1785670596cad195676c9c5036945e0e /core
parent4a377aff2d36b64a65b54192a987aba44b8f78e0 (diff)
downloadspark-bb46046154a438df4db30a0e1fd557bd3399ee7b.tar.gz
spark-bb46046154a438df4db30a0e1fd557bd3399ee7b.tar.bz2
spark-bb46046154a438df4db30a0e1fd557bd3399ee7b.zip
[SPARK-4433] fix a racing condition in zipWithIndex
Spark hangs with the following code: ~~~ sc.parallelize(1 to 10).zipWithIndex.repartition(10).count() ~~~ This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction. This should be applied to branch-1.0, branch-1.1, and branch-1.2. pwendell Author: Xiangrui Meng <meng@databricks.com> Closes #3291 from mengxr/SPARK-4433 and squashes the following commits: c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala31
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala5
2 files changed, 22 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e2c301603b..8c43a55940 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -39,21 +39,24 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
private[spark]
class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
- override def getPartitions: Array[Partition] = {
+ /** The start index of each partition. */
+ @transient private val startIndices: Array[Long] = {
val n = prev.partitions.size
- val startIndices: Array[Long] =
- if (n == 0) {
- Array[Long]()
- } else if (n == 1) {
- Array(0L)
- } else {
- prev.context.runJob(
- prev,
- Utils.getIteratorSize _,
- 0 until n - 1, // do not need to count the last partition
- false
- ).scanLeft(0L)(_ + _)
- }
+ if (n == 0) {
+ Array[Long]()
+ } else if (n == 1) {
+ Array(0L)
+ } else {
+ prev.context.runJob(
+ prev,
+ Utils.getIteratorSize _,
+ 0 until n - 1, // do not need to count the last partition
+ allowLocal = false
+ ).scanLeft(0L)(_ + _)
+ }
+ }
+
+ override def getPartitions: Array[Partition] = {
firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d2e696dc2..e079ca3b1e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -739,6 +739,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
+ test("zipWithIndex chained with other RDDs (SPARK-4433)") {
+ val count = sc.parallelize(0 until 10, 2).zipWithIndex().repartition(4).count()
+ assert(count === 10)
+ }
+
test("zipWithUniqueId") {
val n = 10
val data = sc.parallelize(0 until n, 3)