aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluluorta <luluorta@gmail.com>2014-09-02 19:25:52 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-09-02 19:28:57 -0700
commitffdb2fcf8cd5880375bee52ee101e0373bf63e27 (patch)
tree94821c1d421e69ce9c5d83406e89aa3a8179b515
parent0c8183cb30b69902cb6fef220980ea51c4533396 (diff)
downloadspark-ffdb2fcf8cd5880375bee52ee101e0373bf63e27.tar.gz
spark-ffdb2fcf8cd5880375bee52ee101e0373bf63e27.tar.bz2
spark-ffdb2fcf8cd5880375bee52ee101e0373bf63e27.zip
[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions
If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions Author: luluorta <luluorta@gmail.com> Closes #1763 from luluorta/fix-graph-zip and squashes the following commits: 8338961 [luluorta] fix GraphX EdgeRDD zipPartitions (cherry picked from commit 9b225ac3072de522b40b46aba6df1f1c231f13ef) Signed-off-by: Ankur Dave <ankurdave@gmail.com>
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala16
2 files changed, 18 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 899a3cbd62..0f1a101156 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
+import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -47,7 +47,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
override val partitioner =
- partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+ partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size)))
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 6506bac73d..eaaa4499b6 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
@@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("non-default number of edge partitions") {
+ val n = 10
+ val defaultParallelism = 3
+ val numEdgePartitions = 4
+ assert(defaultParallelism != numEdgePartitions)
+ val conf = new SparkConf()
+ .set("spark.default.parallelism", defaultParallelism.toString)
+ val sc = new SparkContext("local", "test", conf)
+ val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
+ numEdgePartitions)
+ val graph = Graph.fromEdgeTuples(edges, 1)
+ val neighborAttrSums = graph.mapReduceTriplets[Int](
+ et => Iterator((et.dstId, et.srcAttr)), _ + _)
+ assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+ }
}