diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 00:07:45 +0000 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 00:07:45 +0000 |
commit | 94479673eb0ea839d5f6b6bd43c5abf75af7b9eb (patch) | |
tree | 17feddf6c9954be8b25705e2ebf3ec03ae4af7a9 /core | |
parent | e9165d2a391c73d7e06436426047759aa62807c2 (diff) | |
download | spark-94479673eb0ea839d5f6b6bd43c5abf75af7b9eb.tar.gz spark-94479673eb0ea839d5f6b6bd43c5abf75af7b9eb.tar.bz2 spark-94479673eb0ea839d5f6b6bd43c5abf75af7b9eb.zip |
Fixed bug in PartitionAwareUnionRDD
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 995042e590..3cbf3b4c4f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -9,8 +9,8 @@ class PartitionerAwareUnionRDDPartition( @transient val rdds: Seq[RDD[_]], val idx: Int ) extends Partition { - var parents = rdds.map(_.partitions(index)).toArray - + var parents = rdds.map(_.partitions(idx)).toArray + override val index = idx override def hashCode(): Int = idx @@ -42,7 +42,7 @@ class PartitionerAwareUnionRDD[T: ClassTag]( // Get the location where most of the partitions of parent RDDs are located override def getPreferredLocations(s: Partition): Seq[String] = { - logDebug("Getting preferred locations for " + this) + logDebug("Finding preferred location for " + this + ", partition " + s.index) val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents val locations = rdds.zip(parentPartitions).flatMap { case (rdd, part) => { @@ -51,11 +51,14 @@ class PartitionerAwareUnionRDD[T: ClassTag]( parentLocations } } - if (locations.isEmpty) { - Seq.empty + val location = if (locations.isEmpty) { + None } else { - Seq(locations.groupBy(x => x).map(x => (x._1, x._2.length)).maxBy(_._2)._1) + // Find the location where maximum number of parent partitions prefer + Some(locations.groupBy(x => x).maxBy(_._2.length)._1) } + logDebug("Selected location for " + this + ", partition " + s.index + " = " + location) + location.toSeq } override def compute(s: Partition, context: TaskContext): Iterator[T] = { |