aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorBrennon York <brennon.york@capitalone.com>2015-02-25 14:11:12 -0800
committerAnkur Dave <ankurdave@gmail.com>2015-02-25 14:11:12 -0800
commit9f603fce78fcc997926e9a72dec44d48cbc396fc (patch)
tree3f1d1cc53a7c24dbc2b05ee41d66c8dc77bb4466 /graphx
parenta777c65da9bc636e5cf5426e15a2e76d6b21b744 (diff)
downloadspark-9f603fce78fcc997926e9a72dec44d48cbc396fc.tar.gz
spark-9f603fce78fcc997926e9a72dec44d48cbc396fc.tar.bz2
spark-9f603fce78fcc997926e9a72dec44d48cbc396fc.zip
[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing
Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD. Author: Brennon York <brennon.york@capitalone.com> Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits: 0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala12
1 files changed, 9 insertions, 3 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 6dad167fa7..904be21314 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] (
this.mapVertexPartitions(_.map(f))
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
+ val otherPartition = other match {
+ case other: VertexRDD[_] if this.partitioner == other.partitioner =>
+ other.partitionsRDD
+ case _ =>
+ VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
+ }
val newPartitionsRDD = partitionsRDD.zipPartitions(
- other.partitionsRDD, preservesPartitioning = true
+ otherPartition, preservesPartitioning = true
) { (thisIter, otherIter) =>
val thisPart = thisIter.next()
val otherPart = otherIter.next()
@@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] (
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
other match {
- case other: VertexRDD[_] =>
+ case other: VertexRDD[_] if this.partitioner == other.partitioner =>
leftZipJoin(other)(f)
case _ =>
this.withPartitionsRDD[VD3](
@@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] (
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match {
- case other: VertexRDD[_] =>
+ case other: VertexRDD[_] if this.partitioner == other.partitioner =>
innerZipJoin(other)(f)
case _ =>
this.withPartitionsRDD(