diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-15 12:05:20 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:13:37 -0700 |
commit | 339598c0806d01e3d43cd49dd02e9d510b5f586b (patch) | |
tree | 78e4a57b3e8457c24b7c8ff849b78daf04b87926 | |
parent | 02d6464f2f51217ab7435ef594f003943a742bbf (diff) | |
download | spark-339598c0806d01e3d43cd49dd02e9d510b5f586b.tar.gz spark-339598c0806d01e3d43cd49dd02e9d510b5f586b.tar.bz2 spark-339598c0806d01e3d43cd49dd02e9d510b5f586b.zip |
several of Reynold's suggestions implemented
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 56f350b1f5..8d06b4ceb8 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -83,15 +83,13 @@ class CoalescedRDD[T: ClassManifest]( extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { - val res = mutable.ArrayBuffer[CoalescedRDDPartition]() val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack) - for ((pg, i) <- packer.getPartitions.zipWithIndex) { - val ids = pg.list.map(_.index).toArray - res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) + packer.getPartitions.zipWithIndex.map { + case (pg, i) => + val ids = pg.list.map(_.index).toArray + new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } - - res.toArray } override def compute(partition: Partition, context: TaskContext): Iterator[T] = { @@ -119,10 +117,11 @@ class CoalescedRDD[T: ClassManifest]( * @return the machine most preferred by split */ override def getPreferredLocations(partition: Partition): Seq[String] = { - if (partition.isInstanceOf[CoalescedRDDPartition]) + if (partition.isInstanceOf[CoalescedRDDPartition]) { List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) - else + } else { super.getPreferredLocations(partition) + } } } @@ -167,15 +166,15 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) // initializes/resets to start iterating from the beginning private def resetIterator() = { - val i1 = prev.partitions.view.map( (p: Partition) => + val i1 = prev.partitions.view.map{ p: Partition => { if (prev.preferredLocations(p).length > 0) - Some((prev.preferredLocations(p)(0),p)) else None } ) - val i2 = prev.partitions.view.map( (p: Partition) => + Some((prev.preferredLocations(p)(0),p)) else None } } + val i2 = prev.partitions.view.map{ p: Partition => { if (prev.preferredLocations(p).length > 1) - Some((prev.preferredLocations(p)(1),p)) else None } ) - val i3 = prev.partitions.view.map( (p: Partition) => + Some((prev.preferredLocations(p)(1),p)) else None } } + val i3 = prev.partitions.view.map{ p: Partition => { if (prev.preferredLocations(p).length > 2) - Some((prev.preferredLocations(p)(2),p)) else None } ) + Some((prev.preferredLocations(p)(2),p)) else None } } val res = List(i1,i2,i3) res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) } @@ -215,7 +214,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) pgroup.list += part // already assign this element initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets true - } else false + } else { false } } /** |