aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/test
diff options
context:
space:
mode:
authorTakeshi Yamamuro <linguin.m.s@gmail.com>2015-01-23 19:25:15 -0800
committerAnkur Dave <ankurdave@gmail.com>2015-01-23 19:26:39 -0800
commite224dbb011789297cd6c6ba095f702c042869ed6 (patch)
treeb5d0fa187205ed0b2d3383e835efcec1bcc8f817 /graphx/src/test
parentcef1f092a628ac20709857b4388bb10e0b5143b0 (diff)
downloadspark-e224dbb011789297cd6c6ba095f702c042869ed6.tar.gz
spark-e224dbb011789297cd6c6ba095f702c042869ed6.tar.bz2
spark-e224dbb011789297cd6c6ba095f702c042869ed6.zip
[SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp...
If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl), the following error occurs in ReplicatedVertexView.scala:72; object GraphTest extends Logging { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = { graph.aggregateMessages( ctx => { ctx.sendToSrc(1) ctx.sendToDst(2) }, _ + _) } } val g = GraphLoader.edgeListFile(sc, "graph.txt") val rdd = GraphTest.run(g) java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191) ... Author: Takeshi Yamamuro <linguin.m.s@gmail.com> Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits: 0cd8942 [Ankur Dave] Use more concise getOrElse aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions 0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl
Diffstat (limited to 'graphx/src/test')
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala20
1 files changed, 20 insertions, 0 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 9da0064104..ed9876b8dc 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -386,4 +386,24 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("non-default number of edge partitions") {
+ val n = 10
+ val defaultParallelism = 3
+ val numEdgePartitions = 4
+ assert(defaultParallelism != numEdgePartitions)
+ val conf = new org.apache.spark.SparkConf()
+ .set("spark.default.parallelism", defaultParallelism.toString)
+ val sc = new SparkContext("local", "test", conf)
+ try {
+ val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
+ numEdgePartitions)
+ val graph = Graph.fromEdgeTuples(edges, 1)
+ val neighborAttrSums = graph.mapReduceTriplets[Int](
+ et => Iterator((et.dstId, et.srcAttr)), _ + _)
+ assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+ } finally {
+ sc.stop()
+ }
+ }
+
}