diff options
author | Takeshi Yamamuro <linguin.m.s@gmail.com> | 2015-01-23 19:25:15 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2015-01-23 19:26:39 -0800 |
commit | e224dbb011789297cd6c6ba095f702c042869ed6 (patch) | |
tree | b5d0fa187205ed0b2d3383e835efcec1bcc8f817 | |
parent | cef1f092a628ac20709857b4388bb10e0b5143b0 (diff) | |
download | spark-e224dbb011789297cd6c6ba095f702c042869ed6.tar.gz spark-e224dbb011789297cd6c6ba095f702c042869ed6.tar.bz2 spark-e224dbb011789297cd6c6ba095f702c042869ed6.zip |
[SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp...
If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;
object GraphTest extends Logging {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
graph.aggregateMessages(
ctx => {
ctx.sendToSrc(1)
ctx.sendToDst(2)
},
_ + _)
}
}
val g = GraphLoader.edgeListFile(sc, "graph.txt")
val rdd = GraphTest.run(g)
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
...
Author: Takeshi Yamamuro <linguin.m.s@gmail.com>
Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits:
0cd8942 [Ankur Dave] Use more concise getOrElse
aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions
0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 4 | ||||
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala | 20 |
2 files changed, 22 insertions, 2 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 897c7ee12a..f1550ac2e1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} -import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -46,7 +46,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) + partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.size))) override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 9da0064104..ed9876b8dc 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -386,4 +386,24 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("non-default number of edge partitions") { + val n = 10 + val defaultParallelism = 3 + val numEdgePartitions = 4 + assert(defaultParallelism != numEdgePartitions) + val conf = new org.apache.spark.SparkConf() + .set("spark.default.parallelism", defaultParallelism.toString) + val sc = new SparkContext("local", "test", conf) + try { + val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)), + numEdgePartitions) + val graph = Graph.fromEdgeTuples(edges, 1) + val neighborAttrSums = graph.mapReduceTriplets[Int]( + et => Iterator((et.dstId, et.srcAttr)), _ + _) + assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) + } finally { + sc.stop() + } + } + } |