aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-14 19:40:24 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:13:36 -0700
commit7a2a33e32dede41937570ec77cf1dfad070e963f (patch)
tree14d2dd487888908b06a742189cc84ad20551816d /core
parent66edf854aa585d23e47fc0bfb7fdd4e23c0ea592 (diff)
downloadspark-7a2a33e32dede41937570ec77cf1dfad070e963f.tar.gz
spark-7a2a33e32dede41937570ec77cf1dfad070e963f.tar.bz2
spark-7a2a33e32dede41937570ec77cf1dfad070e963f.zip
Large scale load and locality tests for the coalesced partitions added
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala143
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala38
2 files changed, 118 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index f999e9b0ec..61c4d0c004 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -36,7 +36,21 @@ private[spark] case class CoalescedRDDPartition(
oos.defaultWriteObject()
}
+ /**
+ * Gets the preferred location for this coalesced RDD partition. Most parent indices should prefer this machine.
+ * @return preferred location
+ */
def getPreferredLocation = prefLoc
+
+ /**
+ * Computes how many of the parents partitions have getPreferredLocation as one of their preferredLocations
+ * @return locality of this coalesced partition between 0 and 1
+ */
+ def localFraction :Double = {
+ var loc: Int = 0
+ parents.foreach(p => if (rdd.preferredLocations(p).contains(getPreferredLocation)) loc += 1)
+ if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
+ }
}
/**
@@ -55,13 +69,9 @@ class CoalescedRDD[T: ClassManifest](
override def getPartitions: Array[Partition] = {
val res = mutable.ArrayBuffer[CoalescedRDDPartition]()
- val targetLen = math.min(prev.partitions.length, maxPartitions)
val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
- packer.setupGroups(targetLen) // setup the groups (bins) and preferred locations
- packer.throwBalls() // assign partitions (balls) to each group (bins)
-
- for ((pg, i) <- packer.groupArr.zipWithIndex) {
+ for ((pg, i) <- packer.getPartitions.zipWithIndex) {
val ids = pg.list.map(_.index).toArray
res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
@@ -86,27 +96,68 @@ class CoalescedRDD[T: ClassManifest](
super.clearDependencies()
prev = null
}
+
+ /**
+ * Returns the preferred machine for the split. If split is of type CoalescedRDDPartition, then the preferred machine
+ * will be one which most parent splits prefer too.
+ * @param split
+ * @return the machine most preferred by split
+ */
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ if (split.isInstanceOf[CoalescedRDDPartition])
+ List(split.asInstanceOf[CoalescedRDDPartition].getPreferredLocation)
+ else
+ super.getPreferredLocations(split)
+ }
+
}
class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+ private def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
+ private def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
+ if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
+
+ private val rnd = new scala.util.Random(7919) // keep this class deterministic
+
+ // each element of groupArr represents one coalesced partition
+ private val groupArr = mutable.ArrayBuffer[PartitionGroup]()
+
+ // hash used to check whether some machine is already in groupArr
+ private val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]()
+
+ // hash used for the first maxPartitions (to avoid duplicates)
+ private val initialHash = mutable.Map[Partition, Boolean]()
+
+ // determines the tradeoff between load-balancing the partitions sizes and their locality
+ // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
+ private val slack = (balanceSlack * prev.partitions.size).toInt
+
+ private var noLocality = true // if true if no preferredLocations exists for parent RDD
+
+ this.setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) and preferred locations
+ this.throwBalls() // assign partitions (balls) to each group (bins)
+
+ def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
// the rotator first goes through the first replica copy of each partition, then second, then third
- private class RotateLocations(prev: RDD[_]) extends Iterator[String] {
+ // the iterators return type is a tuple: (replicaString, partition)
+ private class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] {
- private var it: Iterator[String] = resetIterator()
+ private var it: Iterator[(String, Partition)] = resetIterator()
override val isEmpty = !it.hasNext
// initializes/resets to start iterating from the beginning
private def resetIterator() = {
val i1 = prev.partitions.view.map( (p: Partition) =>
- { if (prev.preferredLocations(p).length > 0) Some(prev.preferredLocations(p)(0)) else None } )
+ { if (prev.preferredLocations(p).length > 0) Some((prev.preferredLocations(p)(0),p)) else None } )
val i2 = prev.partitions.view.map( (p: Partition) =>
- { if (prev.preferredLocations(p).length > 1) Some(prev.preferredLocations(p)(1)) else None } )
+ { if (prev.preferredLocations(p).length > 1) Some((prev.preferredLocations(p)(1),p)) else None } )
val i3 = prev.partitions.view.map( (p: Partition) =>
- { if (prev.preferredLocations(p).length > 2) Some(prev.preferredLocations(p)(2)) else None } )
+ { if (prev.preferredLocations(p).length > 2) Some((prev.preferredLocations(p)(2),p)) else None } )
val res = List(i1,i2,i3)
res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) into one iterator
}
@@ -115,7 +166,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
def hasNext(): Boolean = !isEmpty
// return the next preferredLocation of some partition of the RDD
- def next(): String = {
+ def next(): (String, Partition) = {
if (it.hasNext)
it.next()
else {
@@ -126,42 +177,11 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
case class PartitionGroup(prefLoc: String = "") {
- var list = mutable.MutableList[Partition]()
+ var list = mutable.ListBuffer[Partition]()
def size = list.size
-
- // returns number of partitions that got locality in this group
- def local = {
- var loc: Int = 0
- list.foreach(p => if (prev.preferredLocations(p).contains(prefLoc)) loc += 1)
- loc
- }
-
- override def toString(): String = {
- val localityStr = if (size == 0) "0" else (local*100. / size).toInt.toString
- "PartitionGroup(\"" + prefLoc + "\") size: " + size + " locality: " + localityStr +"% \n"
- // list.map("\t\t" + _.toString).mkString("\n") + "\n"
- }
}
- def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
- def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
- if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
-
- val rnd = new scala.util.Random(7919) // keep this class deterministic
-
- // each element of groupArr represents one coalesced partition
- val groupArr = mutable.ArrayBuffer[PartitionGroup]()
-
- // hash used to check whether some machine is already in groupArr
- val groupHash = mutable.Map[String, mutable.MutableList[PartitionGroup]]()
-
- // determines the tradeoff between load-balancing the partitions sizes and their locality
- // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
- val slack = (balanceSlack * prev.partitions.size).toInt
-
- private var noLocality = true // if true if no preferredLocations exists for parent RDD
-
/**
* Sorts and gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
@@ -172,13 +192,21 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
groupHash.get(key).map(_.sortWith(compare).head)
}
+ def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = {
+ if (!initialHash.contains(part)) {
+ pgroup.list += part // already preassign this element, ensures every bucket will have 1 element
+ initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets/groups
+ true
+ } else false
+ }
+
/**
* Initializes targetLen partition groups and assigns a preferredLocation
* This uses coupon collector to estimate how many preferredLocations it must rotate through until it has seen
* most of the preferred locations (2 * n log(n))
* @param targetLen
*/
- def setupGroups(targetLen: Int) {
+ private def setupGroups(targetLen: Int) {
val rotIt = new RotateLocations(prev)
// deal with empty rotator case, just create targetLen partition groups with no preferred location
@@ -199,22 +227,29 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
// likely targetLen >> number of preferred locations (more buckets than there are machines)
while (numCreated < targetLen && tries < expectedCoupons2) {
tries += 1
- val nxt = rotIt.next()
- if (!groupHash.contains(nxt)) {
- val pgroup = PartitionGroup(nxt)
+ val (nxt_replica, nxt_part) = rotIt.next()
+ if (!groupHash.contains(nxt_replica)) {
+ val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
- groupHash += (nxt -> (mutable.MutableList(pgroup))) // use list in case we have multiple groups for same machine
+ addPartToPGroup(nxt_part, pgroup)
+ groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple groups per machine
numCreated += 1
}
}
while (numCreated < targetLen) { // if we don't have enough partition groups, just create duplicates
- val nxt = rotIt.next()
- val pgroup = PartitionGroup(nxt)
+ var (nxt_replica, nxt_part) = rotIt.next()
+ val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
- groupHash.get(nxt).get += pgroup
+ groupHash.get(nxt_replica).get += pgroup
+ var tries = 0
+ while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure each group has at least one partition
+ nxt_part = rotIt.next()._2
+ tries += 1
+ }
numCreated += 1
}
+
}
/**
@@ -224,7 +259,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
* @param p partition (ball to be thrown)
* @return partition group (bin to be put in)
*/
- def pickBin(p: Partition): PartitionGroup = {
+ private def pickBin(p: Partition): PartitionGroup = {
val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded bin of replicas
val prefPart = if (pref == Nil) None else pref.head
@@ -243,7 +278,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
- def throwBalls() {
+ private def throwBalls() {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p,i) <- prev.partitions.zipWithIndex) {
@@ -257,7 +292,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
}
}
} else {
- for (p <- prev.partitions) { // throw every partition (ball) into a partition group (bin)
+ for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into a partition group
pickBin(p).list += p
}
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 881bdedfe5..c200bfe909 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -22,7 +22,8 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
+import spark.rdd._
+import scala.collection.parallel.mutable
class RDDSuite extends FunSuite with SharedSparkContext {
@@ -184,16 +185,35 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(splits.foldLeft(true)( (x,y) => if (!x) false else y.length >= 2) === true) // descent balance (2+ per bin)
- val prefs = List(List("m1","m2","m3"), List("m4","m5","m6"))
- val data2 = sc.makeRDD((1 to 100).map( i => (i, prefs(i % 2) ))) // alternate machine prefs
- val coalesced2 = data2.coalesce(10)
- val splits2 = coalesced2.glom().collect().map(_.toList).toList
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = data.coalesce(20)
+ assert(coalesced4.glom().collect().map(_.toList).toList.sortWith(
+ (x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).map(x => List(x)).toList)
+
+
+ // large scale experiment
+ import collection.mutable
+ val rnd = scala.util.Random
+ val partitions = 10000
+ val numMachines = 50
+ val machines = mutable.ListBuffer[String]()
+ (1 to numMachines).foreach(machines += "m"+_)
+
+ val blocks = (1 to partitions).map( i => (i, (i to (i+2)).map{ j => machines(rnd.nextInt(machines.size)) } ))
+
+ val data2 = sc.makeRDD(blocks)
+ val coalesced2 = data2.coalesce(numMachines*2)
- // this gives a list of pairs, each pair is of the form (even,odd), where even is the number of even elements...
- val list = splits2.map( ls => ls.foldLeft((0,0))( (x,y) => if (y % 2 == 0) (x._1+1,x._2) else (x._1,x._2+1)) )
- val maxes = list.map( { case (a,b) => if (a>b) a else b } ) // get the maxs, this represents the locality
- maxes.foreach( locality => assert( locality > 7) ) // at least 70% locality in each partition
+ // test that you get over 95% locality in each group
+ val minLocality = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
+ .foldLeft(100.)( (perc, loc) => math.min(perc,loc) )
+ assert(minLocality > 0.95)
+ // test that the groups are load balanced with 100 +/- 15 elements in each
+ val maxImbalance = coalesced2.partitions.map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size )
+ .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
+ assert(maxImbalance < 15)
}
test("zipped RDDs") {