aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Staple <aaron.staple@gmail.com>2014-08-01 12:04:04 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-01 12:04:04 -0700
commiteb5bdcaf6c7834558cb76b7132f68b8d94230356 (patch)
treec8ba46bf6ba40aa506a1b69905f955d0b44dfd22
parentc82fe4781cd0356bcfdd25c7eadf1da624bb2228 (diff)
downloadspark-eb5bdcaf6c7834558cb76b7132f68b8d94230356.tar.gz
spark-eb5bdcaf6c7834558cb76b7132f68b8d94230356.tar.bz2
spark-eb5bdcaf6c7834558cb76b7132f68b8d94230356.zip
[SPARK-695] In DAGScheduler's getPreferredLocs, track set of visited partitions.
getPreferredLocs traverses a dependency graph of partitions using depth first search. Given a complex dependency graph, the old implementation may explore a set of paths in the graph that is exponential in the number of nodes. By maintaining a set of visited nodes the new implementation avoids revisiting nodes, preventing exponential blowup. Some comment and whitespace cleanups are also included. Author: Aaron Staple <aaron.staple@gmail.com> Closes #1362 from staple/SPARK-695 and squashes the following commits: ecea0f3 [Aaron Staple] address review comments 751c661 [Aaron Staple] [SPARK-695] Add a unit test. 5adf326 [Aaron Staple] Replace getPreferredLocsInternal's HashMap argument with a simpler HashSet. 58e37d0 [Aaron Staple] Replace comment documenting NarrowDependency. 6751ced [Aaron Staple] Revert "Remove unused variable." 04c7097 [Aaron Staple] Fix indentation. 0030884 [Aaron Staple] Remove unused variable. 33f67c6 [Aaron Staple] Clarify comment. 4e42b46 [Aaron Staple] Remove apparently incorrect comment describing NarrowDependency. 65c2d3d [Aaron Staple] [SPARK-695] In DAGScheduler's getPreferredLocs, track set of visited partitions.
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala16
5 files changed, 37 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index 3935c87722..ab2594cfc0 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -34,8 +34,8 @@ abstract class Dependency[T] extends Serializable {
/**
* :: DeveloperApi ::
- * Base class for dependencies where each partition of the parent RDD is used by at most one
- * partition of the child RDD. Narrow dependencies allow for pipelined execution.
+ * Base class for dependencies where each partition of the child RDD depends on a small number
+ * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5f75c1dd2c..368835a867 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -458,7 +458,7 @@ class SparkContext(config: SparkConf) extends Logging {
/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
- def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
+ def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index e7221e3032..11ebafbf6d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -49,8 +49,8 @@ private[spark] case class CoalescedRDDPartition(
}
/**
- * Computes how many of the parents partitions have getPreferredLocation
- * as one of their preferredLocations
+ * Computes the fraction of the parents' partitions containing preferredLocation within
+ * their getPreferredLocs.
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c7e3d7c5f8..5110785de3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1148,6 +1148,22 @@ class DAGScheduler(
*/
private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
+ getPreferredLocsInternal(rdd, partition, new HashSet)
+ }
+
+ /** Recursive implementation for getPreferredLocs. */
+ private def getPreferredLocsInternal(
+ rdd: RDD[_],
+ partition: Int,
+ visited: HashSet[(RDD[_],Int)])
+ : Seq[TaskLocation] =
+ {
+ // If the partition has already been visited, no need to re-visit.
+ // This avoids exponential path exploration. SPARK-695
+ if (!visited.add((rdd,partition))) {
+ // Nil has already been returned for previously visited partitions.
+ return Nil
+ }
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (!cached.isEmpty) {
@@ -1164,7 +1180,7 @@ class DAGScheduler(
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
- val locs = getPreferredLocs(n.rdd, inPart)
+ val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 0ce13d015d..36e238b4c9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,6 +23,8 @@ import scala.language.reflectiveCalls
import akka.actor._
import akka.testkit.{ImplicitSender, TestKit, TestActorRef}
import org.scalatest.{BeforeAndAfter, FunSuiteLike}
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.rdd.RDD
@@ -64,7 +66,7 @@ class MyRDD(
class DAGSchedulerSuiteDummyException extends Exception
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
- with ImplicitSender with BeforeAndAfter with LocalSparkContext {
+ with ImplicitSender with BeforeAndAfter with LocalSparkContext with Timeouts {
val conf = new SparkConf
/** Set of TaskSets the DAGScheduler has requested executed. */
@@ -294,6 +296,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("avoid exponential blowup when getting preferred locs list") {
+ // Build up a complex dependency graph with repeated zip operations, without preferred locations.
+ var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
+ (1 to 30).foreach(_ => rdd = rdd.zip(rdd))
+ // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
+ failAfter(10 seconds) {
+ val preferredLocs = scheduler.getPreferredLocs(rdd,0)
+ // No preferred locations are returned.
+ assert(preferredLocs.length === 0)
+ }
+ }
+
test("unserializable task") {
val unserializableRdd = new MyRDD(sc, 1, Nil) {
class UnserializableClass