diff options
author | Nezih Yigitbasi <nyigitbasi@netflix.com> | 2016-04-19 14:35:26 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-19 14:35:26 -0700 |
commit | 3c91afec20607e0d853433a904105ee22df73c73 (patch) | |
tree | 428fd278fbffed6115e6fbf6b6b2bd9a3903c0f4 /core/src/main/scala | |
parent | 0b8369d8548c0204b9c24d826c731063b72360b8 (diff) | |
download | spark-3c91afec20607e0d853433a904105ee22df73c73.tar.gz spark-3c91afec20607e0d853433a904105ee22df73c73.tar.bz2 spark-3c91afec20607e0d853433a904105ee22df73c73.zip |
[SPARK-14042][CORE] Add custom coalescer support
## What changes were proposed in this pull request?
This PR adds support for specifying an optional custom coalescer to the `coalesce()` method. Currently I have only added this feature to the `RDD` interface, and once we sort out the details we can proceed with adding this feature to the other APIs (`Dataset` etc.)
## How was this patch tested?
Added a unit test for this functionality.
/cc rxin (per our discussion on the mailing list)
Author: Nezih Yigitbasi <nyigitbasi@netflix.com>
Closes #11865 from nezihyigitbasi/custom_coalesce_policy.
Diffstat (limited to 'core/src/main/scala')
3 files changed, 107 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 35665ab7c0..e75f1dbf81 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -70,23 +70,23 @@ private[spark] case class CoalescedRDDPartition( * parent partitions * @param prev RDD to be coalesced * @param maxPartitions number of desired partitions in the coalesced RDD (must be positive) - * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance + * @param partitionCoalescer [[PartitionCoalescer]] implementation to use for coalescing */ private[spark] class CoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], maxPartitions: Int, - balanceSlack: Double = 0.10) + partitionCoalescer: Option[PartitionCoalescer] = None) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies require(maxPartitions > 0 || maxPartitions == prev.partitions.length, s"Number of partitions ($maxPartitions) must be positive.") override def getPartitions: Array[Partition] = { - val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack) + val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer()) - pc.run().zipWithIndex.map { + pc.coalesce(maxPartitions, prev).zipWithIndex.map { case (pg, i) => - val ids = pg.arr.map(_.index).toArray + val ids = pg.partitions.map(_.index).toArray new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } } @@ -144,15 +144,15 @@ private[spark] class CoalescedRDD[T: ClassTag]( * desired partitions is greater than the number of preferred machines (can happen), it needs to * start picking duplicate preferred machines. This is determined using coupon collector estimation * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist: - * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two - * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions - * according to locality. (contact alig for questions) - * + * it tries to also achieve locality. This is done by allowing a slack (balanceSlack, where + * 1.0 is all locality, 0 is all balance) between two bins. If two bins are within the slack + * in terms of balance, the algorithm will assign partitions according to locality. + * (contact alig for questions) */ -private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { - - def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size +private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) + extends PartitionCoalescer { + def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.numPartitions < o2.numPartitions def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) @@ -167,14 +167,10 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: // hash used for the first maxPartitions (to avoid duplicates) val initialHash = mutable.Set[Partition]() - // determines the tradeoff between load-balancing the partitions sizes and their locality - // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality - val slack = (balanceSlack * prev.partitions.length).toInt - var noLocality = true // if true if no preferredLocations exists for parent RDD // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - def currPrefLocs(part: Partition): Seq[String] = { + def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = { prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) } @@ -192,7 +188,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: def resetIterator(): Iterator[(String, Partition)] = { val iterators = (0 to 2).map { x => prev.partitions.iterator.flatMap { p => - if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None + if (currPrefLocs(p, prev).size > x) Some((currPrefLocs(p, prev)(x), p)) else None } } iterators.reduceLeft((x, y) => x ++ y) @@ -215,8 +211,9 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: /** * Sorts and gets the least element of the list associated with key in groupHash * The returned PartitionGroup is the least loaded of all groups that represent the machine "key" + * * @param key string representing a partitioned group on preferred machine key - * @return Option of PartitionGroup that has least elements for key + * @return Option of [[PartitionGroup]] that has least elements for key */ def getLeastGroupHash(key: String): Option[PartitionGroup] = { groupHash.get(key).map(_.sortWith(compare).head) @@ -224,7 +221,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = { if (!initialHash.contains(part)) { - pgroup.arr += part // already assign this element + pgroup.partitions += part // already assign this element initialHash += part // needed to avoid assigning partitions to multiple buckets true } else { false } @@ -236,12 +233,12 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: * until it has seen most of the preferred locations (2 * n log(n)) * @param targetLen */ - def setupGroups(targetLen: Int) { + def setupGroups(targetLen: Int, prev: RDD[_]) { val rotIt = new LocationIterator(prev) // deal with empty case, just create targetLen partition groups with no preferred location if (!rotIt.hasNext) { - (1 to targetLen).foreach(x => groupArr += PartitionGroup()) + (1 to targetLen).foreach(x => groupArr += new PartitionGroup()) return } @@ -259,7 +256,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: tries += 1 val (nxt_replica, nxt_part) = rotIt.next() if (!groupHash.contains(nxt_replica)) { - val pgroup = PartitionGroup(nxt_replica) + val pgroup = new PartitionGroup(Some(nxt_replica)) groupArr += pgroup addPartToPGroup(nxt_part, pgroup) groupHash.put(nxt_replica, ArrayBuffer(pgroup)) // list in case we have multiple @@ -269,7 +266,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates var (nxt_replica, nxt_part) = rotIt.next() - val pgroup = PartitionGroup(nxt_replica) + val pgroup = new PartitionGroup(Some(nxt_replica)) groupArr += pgroup groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup var tries = 0 @@ -285,17 +282,29 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: /** * Takes a parent RDD partition and decides which of the partition groups to put it in * Takes locality into account, but also uses power of 2 choices to load balance - * It strikes a balance between the two use the balanceSlack variable + * It strikes a balance between the two using the balanceSlack variable * @param p partition (ball to be thrown) + * @param balanceSlack determines the trade-off between load-balancing the partitions sizes and + * their locality. e.g., balanceSlack=0.10 means that it allows up to 10% + * imbalance in favor of locality * @return partition group (bin to be put in) */ - def pickBin(p: Partition): PartitionGroup = { - val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs + def pickBin(p: Partition, prev: RDD[_], balanceSlack: Double): PartitionGroup = { + val slack = (balanceSlack * prev.partitions.length).toInt + // least loaded pref locs + val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare) val prefPart = if (pref == Nil) None else pref.head val r1 = rnd.nextInt(groupArr.size) val r2 = rnd.nextInt(groupArr.size) - val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2) + val minPowerOfTwo = { + if (groupArr(r1).numPartitions < groupArr(r2).numPartitions) { + groupArr(r1) + } + else { + groupArr(r2) + } + } if (prefPart.isEmpty) { // if no preferred locations, just use basic power of two return minPowerOfTwo @@ -303,55 +312,45 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: val prefPartActual = prefPart.get - if (minPowerOfTwo.size + slack <= prefPartActual.size) { // more imbalance than the slack allows + // more imbalance than the slack allows + if (minPowerOfTwo.numPartitions + slack <= prefPartActual.numPartitions) { minPowerOfTwo // prefer balance over locality } else { prefPartActual // prefer locality over balance } } - def throwBalls() { + def throwBalls(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions for ((p, i) <- prev.partitions.zipWithIndex) { - groupArr(i).arr += p + groupArr(i).partitions += p } } else { // no locality available, then simply split partitions based on positions in array for (i <- 0 until maxPartitions) { val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt - (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) } + (rangeStart until rangeEnd).foreach{ j => groupArr(i).partitions += prev.partitions(j) } } } } else { for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group - pickBin(p).arr += p + pickBin(p, prev, balanceSlack).partitions += p } } } - def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray + def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.numPartitions > 0).toArray /** * Runs the packing algorithm and returns an array of PartitionGroups that if possible are * load balanced and grouped by locality - * @return array of partition groups + * + * @return array of partition groups */ - def run(): Array[PartitionGroup] = { - setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) - throwBalls() // assign partitions (balls) to each group (bins) + def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = { + setupGroups(math.min(prev.partitions.length, maxPartitions), prev) // setup the groups (bins) + throwBalls(maxPartitions, prev, balanceSlack) // assign partitions (balls) to each group (bins) getPartitions } } - -private case class PartitionGroup(prefLoc: Option[String] = None) { - var arr = mutable.ArrayBuffer[Partition]() - def size: Int = arr.size -} - -private object PartitionGroup { - def apply(prefLoc: String): PartitionGroup = { - require(prefLoc != "", "Preferred location must not be empty") - PartitionGroup(Some(prefLoc)) - } -} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f6e0148f78..499a8b9aa1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -433,7 +433,9 @@ abstract class RDD[T: ClassTag]( * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ - def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) + def coalesce(numPartitions: Int, shuffle: Boolean = false, + partitionCoalescer: Option[PartitionCoalescer] = Option.empty) + (implicit ord: Ordering[T] = null) : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ @@ -451,9 +453,10 @@ abstract class RDD[T: ClassTag]( new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), - numPartitions).values + numPartitions, + partitionCoalescer).values } else { - new CoalescedRDD(this, numPartitions) + new CoalescedRDD(this, numPartitions, partitionCoalescer) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/coalesce-public.scala b/core/src/main/scala/org/apache/spark/rdd/coalesce-public.scala new file mode 100644 index 0000000000..d8a80aa5ae --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/coalesce-public.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Partition + +/** + * ::DeveloperApi:: + * A PartitionCoalescer defines how to coalesce the partitions of a given RDD. + */ +@DeveloperApi +trait PartitionCoalescer { + + /** + * Coalesce the partitions of the given RDD. + * + * @param maxPartitions the maximum number of partitions to have after coalescing + * @param parent the parent RDD whose partitions to coalesce + * @return an array of [[PartitionGroup]]s, where each element is itself an array of + * [[Partition]]s and represents a partition after coalescing is performed. + */ + def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] +} + +/** + * ::DeveloperApi:: + * A group of [[Partition]]s + * @param prefLoc preferred location for the partition group + */ +@DeveloperApi +class PartitionGroup(val prefLoc: Option[String] = None) { + val partitions = mutable.ArrayBuffer[Partition]() + def numPartitions: Int = partitions.size +} |