aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-18 20:42:35 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:16:04 -0700
commitf1c853d76dce4fbc34f580be0a3ae15cc5be9c80 (patch)
tree272ebca4b55cd5d2dd2b40d82dd68f1fe86cf40e /core
parent890ea6ba792f9d3d07916a0833c3a83f0150e8cc (diff)
downloadspark-f1c853d76dce4fbc34f580be0a3ae15cc5be9c80.tar.gz
spark-f1c853d76dce4fbc34f580be0a3ae15cc5be9c80.tar.bz2
spark-f1c853d76dce4fbc34f580be0a3ae15cc5be9c80.zip
fixed Matei's comments
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala145
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala25
3 files changed, 99 insertions, 73 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 544c971efc..7639749ecb 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -620,7 +620,7 @@ class SparkContext(
* @param partition to be looked up for locality
* @return list of preferred locations for the partition
*/
- def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] =
+ private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] =
dagScheduler.getPreferredLocs(rdd, partition)
/**
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index a5e3d74447..c475b7a8aa 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -21,13 +21,21 @@ import spark._
import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable
import scala.Some
+import scala.collection.mutable.ArrayBuffer
+/**
+ * Class that captures a coalesced RDD by essentially keeping track of parent partitions
+ * @param index of this coalesced partition
+ * @param rdd which it belongs to
+ * @param parentsIndices list of indices in the parent that have been coalesced into this partition
+ * @param preferredLocation the preferred location for this partition
+ */
case class CoalescedRDDPartition(
- index: Int,
- @transient rdd: RDD[_],
- parentsIndices: Array[Int],
- @transient preferredLocation: String = ""
- ) extends Partition {
+ index: Int,
+ @transient rdd: RDD[_],
+ parentsIndices: Array[Int],
+ @transient preferredLocation: String = ""
+ ) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
@throws(classOf[IOException])
@@ -43,45 +51,27 @@ case class CoalescedRDDPartition(
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
- val loc = parents.count(p => rdd.context.getPreferredLocs(rdd, p.index)
- .contains(preferredLocation))
+ val loc = parents.count(p =>
+ rdd.context.getPreferredLocs(rdd, p.index).contains(preferredLocation))
+
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
/**
- * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
- * this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the
- * parent had more than this many partitions, or fewer if the parent had fewer.
- *
- * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
- * or to avoid having a large number of small tasks when processing a directory with many files.
- *
- * If there is no locality information (no preferredLocations) in the parent, then the coalescing
- * is very simple: chunk parents that are close in the Array in chunks.
- * If there is locality information, it proceeds to pack them with the following three goals:
- *
- * (1) Balance the groups so they roughly have the same number of parent partitions
- * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
- * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
- * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
- *
- * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
- * We assume the final number of desired partitions is small, e.g. less than 1000.
- *
- * The algorithm tries to assign unique preferred machines to each partition. If the number of
- * desired partitions is greater than the number of preferred machines (can happen), it needs to
- * start picking duplicate preferred machines. This is determined using coupon collector estimation
- * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
- * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
- * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
- * according to locality. (contact alig for questions)
- *
+ * Represents a coalesced RDD that has fewer partitions than its parent RDD
+ * This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD
+ * so that each new partition has roughly the same number of parent partitions and that
+ * the preferred location of each new partition overlaps with as many preferred locations of its
+ * parent partitions
+ * @param prev RDD to be coalesced
+ * @param maxPartitions number of desired partitions in the coalesced RDD
+ * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
class CoalescedRDD[T: ClassManifest](
- @transient var prev: RDD[T],
- maxPartitions: Int,
- balanceSlack: Double = 0.10)
+ @transient var prev: RDD[T],
+ maxPartitions: Int,
+ balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
@@ -128,44 +118,75 @@ class CoalescedRDD[T: ClassManifest](
}
-private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
- balanceSlack: Double) {
- protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
- protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
+/**
+ * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
+ * this RDD computes one or more of the parent ones. Will produce exactly `maxPartitions` if the
+ * parent had more than this many partitions, or fewer if the parent had fewer.
+ *
+ * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
+ * or to avoid having a large number of small tasks when processing a directory with many files.
+ *
+ * If there is no locality information (no preferredLocations) in the parent, then the coalescing
+ * is very simple: chunk parents that are close in the Array in chunks.
+ * If there is locality information, it proceeds to pack them with the following three goals:
+ *
+ * (1) Balance the groups so they roughly have the same number of parent partitions
+ * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
+ * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
+ * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
+ *
+ * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
+ * We assume the final number of desired partitions is small, e.g. less than 1000.
+ *
+ * The algorithm tries to assign unique preferred machines to each partition. If the number of
+ * desired partitions is greater than the number of preferred machines (can happen), it needs to
+ * start picking duplicate preferred machines. This is determined using coupon collector estimation
+ * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
+ * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
+ * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
+ * according to locality. (contact alig for questions)
+ *
+ */
+
+private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+
+ def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
+ def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
- protected val rnd = new scala.util.Random(7919) // keep this class deterministic
+ val rnd = new scala.util.Random(7919) // keep this class deterministic
// each element of groupArr represents one coalesced partition
- protected val groupArr = mutable.ArrayBuffer[PartitionGroup]()
+ val groupArr = ArrayBuffer[PartitionGroup]()
// hash used to check whether some machine is already in groupArr
- protected val groupHash = mutable.Map[String, mutable.ListBuffer[PartitionGroup]]()
+ val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]()
// hash used for the first maxPartitions (to avoid duplicates)
- protected val initialHash = mutable.Map[Partition, Boolean]()
+ val initialHash = mutable.Set[Partition]()
// determines the tradeoff between load-balancing the partitions sizes and their locality
// e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
- protected val slack = (balanceSlack * prev.partitions.size).toInt
+ val slack = (balanceSlack * prev.partitions.size).toInt
- protected var noLocality = true // if true if no preferredLocations exists for parent RDD
+ var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- protected def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] =
+ def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] =
rdd.context.getPreferredLocs(rdd, part.index)
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
// the rotator first goes through the first replica copy of each partition, then second, third
// the iterators return type is a tuple: (replicaString, partition)
- protected class RotateLocations(prev: RDD[_]) extends Iterator[(String, Partition)] {
+ class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
+
+ var it: Iterator[(String, Partition)] = resetIterator()
- protected var it: Iterator[(String, Partition)] = resetIterator()
override val isEmpty = !it.hasNext
// initializes/resets to start iterating from the beginning
- protected def resetIterator() = {
+ def resetIterator() = {
val i1 = prev.partitions.view.map{ p: Partition =>
{ if (currentPreferredLocations(prev, p).length > 0)
Some((currentPreferredLocations(prev, p)(0),p)) else None } }
@@ -177,10 +198,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
Some((currentPreferredLocations(prev, p)(2),p)) else None } }
val res = List(i1,i2,i3)
res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
+
+ // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(0),p) }} ++
+ // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(1),p) }} ++
+ // prev.partitions.iterator.map{p: Partition => { (currentPreferredLocations(prev, p)(2),p) }}
}
// hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
- def hasNext(): Boolean = !isEmpty
+ def hasNext(): Boolean = { !isEmpty }
// return the next preferredLocation of some partition of the RDD
def next(): (String, Partition) = {
@@ -199,14 +224,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
* @param key string representing a partitioned group on preferred machine key
* @return Option of PartitionGroup that has least elements for key
*/
- protected def getLeastGroupHash(key: String): Option[PartitionGroup] = {
+ def getLeastGroupHash(key: String): Option[PartitionGroup] = {
groupHash.get(key).map(_.sortWith(compare).head)
}
def addPartToPGroup(part : Partition, pgroup : PartitionGroup) : Boolean = {
if (!initialHash.contains(part)) {
pgroup.list += part // already assign this element
- initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets
+ initialHash += part // needed to avoid assigning partitions to multiple buckets
true
} else { false }
}
@@ -217,8 +242,8 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
* until it has seen most of the preferred locations (2 * n log(n))
* @param targetLen
*/
- protected def setupGroups(targetLen: Int) {
- val rotIt = new RotateLocations(prev)
+ def setupGroups(targetLen: Int) {
+ val rotIt = new LocationIterator(prev)
// deal with empty case, just create targetLen partition groups with no preferred location
if (!rotIt.hasNext()) {
@@ -243,7 +268,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
val pgroup = PartitionGroup(nxt_replica)
groupArr += pgroup
addPartToPGroup(nxt_part, pgroup)
- groupHash += (nxt_replica -> (mutable.ListBuffer(pgroup))) // list in case we have multiple
+ groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
numCreated += 1
}
}
@@ -270,7 +295,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
* @param p partition (ball to be thrown)
* @return partition group (bin to be put in)
*/
- protected def pickBin(p: Partition): PartitionGroup = {
+ def pickBin(p: Partition): PartitionGroup = {
val pref = prev.context.getPreferredLocs(prev, p.index).
map(getLeastGroupHash(_)).sortWith(compare) // least loaded of the pref locations
val prefPart = if (pref == Nil) None else pref.head
@@ -290,7 +315,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
}
}
- protected def throwBalls() {
+ def throwBalls() {
if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
if (maxPartitions > groupArr.size) { // just return prev.partitions
for ((p,i) <- prev.partitions.zipWithIndex) {
@@ -310,7 +335,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
}
}
- protected def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+ def getPartitions : Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
/**
* Runs the packing algorithm and returns an array of PartitionGroups that if possible are
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index da5f78aef5..14f7c62782 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -176,7 +176,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
test("cogrouped RDDs with locality") {
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
- val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} )))
+ val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
val coalesced1 = data.coalesce(3)
assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
@@ -184,7 +184,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(splits.length == 3, "Supposed to coalesce to 3 but got " + splits.length)
assert(splits.foldLeft(true)
- ( (x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty")
+ ((x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty")
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
@@ -193,7 +193,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
(x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).
map(x => List(x)).toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
+ }
+ test("cogrouped RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
val rnd = scala.util.Random
@@ -202,30 +204,29 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val machines = mutable.ListBuffer[String]()
(1 to numMachines).foreach(machines += "m"+_)
- val blocks = (1 to partitions).map( i => (i, (i to (i+2))
- .map{ j => machines(rnd.nextInt(machines.size)) } ))
+ val blocks = (1 to partitions).map(i => (i, (i to (i+2))
+ .map{ j => machines(rnd.nextInt(machines.size))}))
- val data2 = sc.makeRDD(blocks) // .map( i => i*2 )
+ val data2 = sc.makeRDD(blocks)
val coalesced2 = data2.coalesce(numMachines*2)
// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
- .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
- .foldLeft(1.)( (perc, loc) => math.min(perc,loc) )
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.)((perc, loc) => math.min(perc,loc))
assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
- .map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size )
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
.foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
- // TDD: Test for later when we have implemented functionality to get locality from DAGScheduler
- val data3 = sc.makeRDD(blocks).map( i => i*2 ) // derived RDD to test *current* pref locs
+ val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
val coalesced3 = data3.coalesce(numMachines*2)
val minLocality2 = coalesced3.partitions
- .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
- .foldLeft(1.)( (perc, loc) => math.min(perc,loc) )
+ .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
+ .foldLeft(1.)((perc, loc) => math.min(perc,loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
(minLocality2*100.).toInt + "%")
}