diff options
author | luluorta <luluorta@gmail.com> | 2014-09-02 19:25:52 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-09-02 19:26:27 -0700 |
commit | 9b225ac3072de522b40b46aba6df1f1c231f13ef (patch) | |
tree | c43b1a6a9176d93410079fd77d833c40c4cd9116 /graphx/src/test/scala | |
parent | e9bb12bea9fbef94332fbec88e3cd9197a27b7ad (diff) | |
download | spark-9b225ac3072de522b40b46aba6df1f1c231f13ef.tar.gz spark-9b225ac3072de522b40b46aba6df1f1c231f13ef.tar.bz2 spark-9b225ac3072de522b40b46aba6df1f1c231f13ef.zip |
[SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions
If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw:
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
Author: luluorta <luluorta@gmail.com>
Closes #1763 from luluorta/fix-graph-zip and squashes the following commits:
8338961 [luluorta] fix GraphX EdgeRDD zipPartitions
Diffstat (limited to 'graphx/src/test/scala')
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 6506bac73d..eaaa4499b6 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("non-default number of edge partitions") { + val n = 10 + val defaultParallelism = 3 + val numEdgePartitions = 4 + assert(defaultParallelism != numEdgePartitions) + val conf = new SparkConf() + .set("spark.default.parallelism", defaultParallelism.toString) + val sc = new SparkContext("local", "test", conf) + val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)), + numEdgePartitions) + val graph = Graph.fromEdgeTuples(edges, 1) + val neighborAttrSums = graph.mapReduceTriplets[Int]( + et => Iterator((et.dstId, et.srcAttr)), _ + _) + assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) + } } |