aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala29
1 files changed, 14 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 56f350b1f5..8d06b4ceb8 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -83,15 +83,13 @@ class CoalescedRDD[T: ClassManifest](
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
- val res = mutable.ArrayBuffer[CoalescedRDDPartition]()
val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
- for ((pg, i) <- packer.getPartitions.zipWithIndex) {
- val ids = pg.list.map(_.index).toArray
- res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
+ packer.getPartitions.zipWithIndex.map {
+ case (pg, i) =>
+ val ids = pg.list.map(_.index).toArray
+ new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
-
- res.toArray
}
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
@@ -119,10 +117,11 @@ class CoalescedRDD[T: ClassManifest](
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
- if (partition.isInstanceOf[CoalescedRDDPartition])
+ if (partition.isInstanceOf[CoalescedRDDPartition]) {
List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
- else
+ } else {
super.getPreferredLocations(partition)
+ }
}
}
@@ -167,15 +166,15 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
// initializes/resets to start iterating from the beginning
private def resetIterator() = {
- val i1 = prev.partitions.view.map( (p: Partition) =>
+ val i1 = prev.partitions.view.map{ p: Partition =>
{ if (prev.preferredLocations(p).length > 0)
- Some((prev.preferredLocations(p)(0),p)) else None } )
- val i2 = prev.partitions.view.map( (p: Partition) =>
+ Some((prev.preferredLocations(p)(0),p)) else None } }
+ val i2 = prev.partitions.view.map{ p: Partition =>
{ if (prev.preferredLocations(p).length > 1)
- Some((prev.preferredLocations(p)(1),p)) else None } )
- val i3 = prev.partitions.view.map( (p: Partition) =>
+ Some((prev.preferredLocations(p)(1),p)) else None } }
+ val i3 = prev.partitions.view.map{ p: Partition =>
{ if (prev.preferredLocations(p).length > 2)
- Some((prev.preferredLocations(p)(2),p)) else None } )
+ Some((prev.preferredLocations(p)(2),p)) else None } }
val res = List(i1,i2,i3)
res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
}
@@ -215,7 +214,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
pgroup.list += part // already assign this element
initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets
true
- } else false
+ } else { false }
}
/**