aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-12-10 14:27:53 -0800
committerAndrew Or <andrew@databricks.com>2014-12-10 14:27:53 -0800
commit4f93d0cabe5d1fc7c0fd0a33d992fd85df1fecb4 (patch)
treeb5886d48a7bcacdd8b7b6423547367277829320a /core
parent447ae2de5d4c2af865fdb63f8b876b865de60f74 (diff)
downloadspark-4f93d0cabe5d1fc7c0fd0a33d992fd85df1fecb4.tar.gz
spark-4f93d0cabe5d1fc7c0fd0a33d992fd85df1fecb4.tar.bz2
spark-4f93d0cabe5d1fc7c0fd0a33d992fd85df1fecb4.zip
[SPARK-4759] Fix driver hanging from coalescing partitions
The driver hangs sometimes when we coalesce RDD partitions. See JIRA for more details and reproduction. This is because our use of empty string as default preferred location in `CoalescedRDDPartition` causes the `TaskSetManager` to schedule the corresponding task on host `""` (empty string). The intended semantics here, however, is that the partition does not have a preferred location, and the TSM should schedule the corresponding task accordingly. Author: Andrew Or <andrew@databricks.com> Closes #3633 from andrewor14/coalesce-preferred-loc and squashes the following commits: e520d6b [Andrew Or] Oops 3ebf8bd [Andrew Or] A few comments f370a4e [Andrew Or] Fix tests 2f7dfb6 [Andrew Or] Avoid using empty string as default preferred location
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala2
2 files changed, 22 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 9fab1d78ab..b073eba8a1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -35,11 +35,10 @@ import org.apache.spark.util.Utils
* @param preferredLocation the preferred location for this partition
*/
private[spark] case class CoalescedRDDPartition(
- index: Int,
- @transient rdd: RDD[_],
- parentsIndices: Array[Int],
- @transient preferredLocation: String = ""
- ) extends Partition {
+ index: Int,
+ @transient rdd: RDD[_],
+ parentsIndices: Array[Int],
+ @transient preferredLocation: Option[String] = None) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
@throws(classOf[IOException])
@@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition(
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
- val loc = parents.count(p =>
- rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
-
+ val loc = parents.count { p =>
+ val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
+ preferredLocation.exists(parentPreferredLocations.contains)
+ }
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
@@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition(
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
private[spark] class CoalescedRDD[T: ClassTag](
- @transient var prev: RDD[T],
- maxPartitions: Int,
- balanceSlack: Double = 0.10)
+ @transient var prev: RDD[T],
+ maxPartitions: Int,
+ balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
@@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
- List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+ partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}
@@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
*
*/
-private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
@@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
}
}
-private[spark] case class PartitionGroup(prefLoc: String = "") {
+private case class PartitionGroup(prefLoc: Option[String] = None) {
var arr = mutable.ArrayBuffer[Partition]()
-
def size = arr.size
}
+
+private object PartitionGroup {
+ def apply(prefLoc: String): PartitionGroup = {
+ require(prefLoc != "", "Preferred location must not be empty")
+ PartitionGroup(Some(prefLoc))
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 46fcb80fa1..6836e9ab0f 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -294,7 +294,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("coalesced RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3)
- val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+ val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation)
assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5