From 339598c0806d01e3d43cd49dd02e9d510b5f586b Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Thu, 15 Aug 2013 12:05:20 -0700 Subject: several of Reynold's suggestions implemented --- core/src/main/scala/spark/rdd/CoalescedRDD.scala | 29 ++++++++++++------------ 1 file changed, 14 insertions(+), 15 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 56f350b1f5..8d06b4ceb8 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -83,15 +83,13 @@ class CoalescedRDD[T: ClassManifest]( extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies override def getPartitions: Array[Partition] = { - val res = mutable.ArrayBuffer[CoalescedRDDPartition]() val packer = new PartitionCoalescer(maxPartitions, prev, balanceSlack) - for ((pg, i) <- packer.getPartitions.zipWithIndex) { - val ids = pg.list.map(_.index).toArray - res += new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) + packer.getPartitions.zipWithIndex.map { + case (pg, i) => + val ids = pg.list.map(_.index).toArray + new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } - - res.toArray } override def compute(partition: Partition, context: TaskContext): Iterator[T] = { @@ -119,10 +117,11 @@ class CoalescedRDD[T: ClassManifest]( * @return the machine most preferred by split */ override def getPreferredLocations(partition: Partition): Seq[String] = { - if (partition.isInstanceOf[CoalescedRDDPartition]) + if (partition.isInstanceOf[CoalescedRDDPartition]) { List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation) - else + } else { super.getPreferredLocations(partition) + } } } @@ -167,15 +166,15 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) // initializes/resets to start iterating from the beginning private def resetIterator() = { - val i1 = prev.partitions.view.map( (p: Partition) => + val i1 = prev.partitions.view.map{ p: Partition => { if (prev.preferredLocations(p).length > 0) - Some((prev.preferredLocations(p)(0),p)) else None } ) - val i2 = prev.partitions.view.map( (p: Partition) => + Some((prev.preferredLocations(p)(0),p)) else None } } + val i2 = prev.partitions.view.map{ p: Partition => { if (prev.preferredLocations(p).length > 1) - Some((prev.preferredLocations(p)(1),p)) else None } ) - val i3 = prev.partitions.view.map( (p: Partition) => + Some((prev.preferredLocations(p)(1),p)) else None } } + val i3 = prev.partitions.view.map{ p: Partition => { if (prev.preferredLocations(p).length > 2) - Some((prev.preferredLocations(p)(2),p)) else None } ) + Some((prev.preferredLocations(p)(2),p)) else None } } val res = List(i1,i2,i3) res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) } @@ -215,7 +214,7 @@ class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) pgroup.list += part // already assign this element initialHash += (part -> true) // needed to avoid assigning partitions to multiple buckets true - } else false + } else { false } } /** -- cgit v1.2.3