diff options
Diffstat (limited to 'core/src')
5 files changed, 37 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 3935c87722..ab2594cfc0 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -34,8 +34,8 @@ abstract class Dependency[T] extends Serializable { /** * :: DeveloperApi :: - * Base class for dependencies where each partition of the parent RDD is used by at most one - * partition of the child RDD. Narrow dependencies allow for pipelined execution. + * Base class for dependencies where each partition of the child RDD depends on a small number + * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. */ @DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5f75c1dd2c..368835a867 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -458,7 +458,7 @@ class SparkContext(config: SparkConf) extends Logging { /** Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. */ - def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = { + def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = { val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index e7221e3032..11ebafbf6d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -49,8 +49,8 @@ private[spark] case class CoalescedRDDPartition( } /** - * Computes how many of the parents partitions have getPreferredLocation - * as one of their preferredLocations + * Computes the fraction of the parents' partitions containing preferredLocation within + * their getPreferredLocs. * @return locality of this coalesced partition between 0 and 1 */ def localFraction: Double = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c7e3d7c5f8..5110785de3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1148,6 +1148,22 @@ class DAGScheduler( */ private[spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized { + getPreferredLocsInternal(rdd, partition, new HashSet) + } + + /** Recursive implementation for getPreferredLocs. */ + private def getPreferredLocsInternal( + rdd: RDD[_], + partition: Int, + visited: HashSet[(RDD[_],Int)]) + : Seq[TaskLocation] = + { + // If the partition has already been visited, no need to re-visit. + // This avoids exponential path exploration. SPARK-695 + if (!visited.add((rdd,partition))) { + // Nil has already been returned for previously visited partitions. + return Nil + } // If the partition is cached, return the cache locations val cached = getCacheLocs(rdd)(partition) if (!cached.isEmpty) { @@ -1164,7 +1180,7 @@ class DAGScheduler( rdd.dependencies.foreach { case n: NarrowDependency[_] => for (inPart <- n.getParents(partition)) { - val locs = getPreferredLocs(n.rdd, inPart) + val locs = getPreferredLocsInternal(n.rdd, inPart, visited) if (locs != Nil) { return locs } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0ce13d015d..36e238b4c9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -23,6 +23,8 @@ import scala.language.reflectiveCalls import akka.actor._ import akka.testkit.{ImplicitSender, TestKit, TestActorRef} import org.scalatest.{BeforeAndAfter, FunSuiteLike} +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.rdd.RDD @@ -64,7 +66,7 @@ class MyRDD( class DAGSchedulerSuiteDummyException extends Exception class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike - with ImplicitSender with BeforeAndAfter with LocalSparkContext { + with ImplicitSender with BeforeAndAfter with LocalSparkContext with Timeouts { val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ @@ -294,6 +296,18 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("avoid exponential blowup when getting preferred locs list") { + // Build up a complex dependency graph with repeated zip operations, without preferred locations. + var rdd: RDD[_] = new MyRDD(sc, 1, Nil) + (1 to 30).foreach(_ => rdd = rdd.zip(rdd)) + // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided. + failAfter(10 seconds) { + val preferredLocs = scheduler.getPreferredLocs(rdd,0) + // No preferred locations are returned. + assert(preferredLocs.length === 0) + } + } + test("unserializable task") { val unserializableRdd = new MyRDD(sc, 1, Nil) { class UnserializableClass |