aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-15 12:05:20 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:13:37 -0700
commit339598c0806d01e3d43cd49dd02e9d510b5f586b (patch)
tree78e4a57b3e8457c24b7c8ff849b78daf04b87926 /core
parent02d6464f2f51217ab7435ef594f003943a742bbf (diff)
downloadspark-339598c0806d01e3d43cd49dd02e9d510b5f586b.tar.gz
spark-339598c0806d01e3d43cd49dd02e9d510b5f586b.tar.bz2
spark-339598c0806d01e3d43cd49dd02e9d510b5f586b.zip
several of Reynold's suggestions implemented
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala29
1 files changed, 14 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 56f350b1f5..8d06b4ceb8 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -83,15 +83,13 @@ class CoalescedRDD[T: ClassManifest](
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getPartitions: Array[Partition] = {
- val res = mutable.ArrayBuffer[CoalescedRDDPartition]()
val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
- for ((pg, i) <- packer.getPartitions.zipWithIndex) {
- val ids = pg.list.map(_.index).toArray
- res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
+ packer.getPartitions.zipWithIndex.map {
+ case (pg, i) =>
+ val ids = pg.list.map(_.index).toArray
+ new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
-
- res.toArray
}
override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
@@ -119,10 +117,11 @@ class CoalescedRDD[T: ClassManifest](
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
- if (partition.isInstanceOf[CoalescedRDDPartition])
+ if (partition.isInstanceOf[CoalescedRDDPartition]) {
List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
- else
+ } else {
super.getPreferredLocations(partition)
+ }
}
}
@@ -167,15 +166,15 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
// initializes/resets to start iterating from the beginning
private def resetIterator() = {
- val i1 = prev.partitions.view.map( (p: Partition) =>
+ val i1 = prev.partitions.view.map{ p: Partition =>
{ if (prev.preferredLocations(p).length > 0)
- Some((prev.preferredLocations(p)(0),p)) else None } )
- val i2 = prev.partitions.view.map( (p: Partition) =>
+ Some((prev.preferredLocations(p)(0),p)) else None } }
+ val i2 = prev.partitions.view.map{ p: Partition =>
{ if (prev.preferredLocations(p).length > 1)
- Some((prev.preferredLocations(p)(1),p)) else None } )
- val i3 = prev.partitions.view.map( (p: Partition) =>
+ Some((prev.preferredLocations(p)(1),p)) else None } }
+ val i3 = prev.partitions.view.map{ p: Partition =>
{ if (prev.preferredLocations(p).length > 2)
- Some((prev.preferredLocations(p)(2),p)) else None } )
+ Some((prev.preferredLocations(p)(2),p)) else None } }
val res = List(i1,i2,i3)
res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
}
@@ -215,7 +214,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double)
pgroup.list += part // already assign this element
initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets
true
- } else false
+ } else { false }
}
/**