aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 00:07:45 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 00:07:45 +0000
commit94479673eb0ea839d5f6b6bd43c5abf75af7b9eb (patch)
tree17feddf6c9954be8b25705e2ebf3ec03ae4af7a9 /core
parente9165d2a391c73d7e06436426047759aa62807c2 (diff)
downloadspark-94479673eb0ea839d5f6b6bd43c5abf75af7b9eb.tar.gz
spark-94479673eb0ea839d5f6b6bd43c5abf75af7b9eb.tar.bz2
spark-94479673eb0ea839d5f6b6bd43c5abf75af7b9eb.zip
Fixed bug in PartitionAwareUnionRDD
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala15
1 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 995042e590..3cbf3b4c4f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -9,8 +9,8 @@ class PartitionerAwareUnionRDDPartition(
@transient val rdds: Seq[RDD[_]],
val idx: Int
) extends Partition {
- var parents = rdds.map(_.partitions(index)).toArray
-
+ var parents = rdds.map(_.partitions(idx)).toArray
+
override val index = idx
override def hashCode(): Int = idx
@@ -42,7 +42,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
// Get the location where most of the partitions of parent RDDs are located
override def getPreferredLocations(s: Partition): Seq[String] = {
- logDebug("Getting preferred locations for " + this)
+ logDebug("Finding preferred location for " + this + ", partition " + s.index)
val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
val locations = rdds.zip(parentPartitions).flatMap {
case (rdd, part) => {
@@ -51,11 +51,14 @@ class PartitionerAwareUnionRDD[T: ClassTag](
parentLocations
}
}
- if (locations.isEmpty) {
- Seq.empty
+ val location = if (locations.isEmpty) {
+ None
} else {
- Seq(locations.groupBy(x => x).map(x => (x._1, x._2.length)).maxBy(_._2)._1)
+ // Find the location where maximum number of parent partitions prefer
+ Some(locations.groupBy(x => x).maxBy(_._2.length)._1)
}
+ logDebug("Selected location for " + this + ", partition " + s.index + " = " + location)
+ location.toSeq
}
override def compute(s: Partition, context: TaskContext): Iterator[T] = {