diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-09-03 23:49:47 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-09-03 23:49:47 -0700 |
commit | 00362dac976cd05b06638deb11d990d612429e0b (patch) | |
tree | c9f9e1b0c9499b6687002a412d3f0823daa96ce4 | |
parent | 1bed0a3869a526241381d2a74ba064e5b3721336 (diff) | |
download | spark-00362dac976cd05b06638deb11d990d612429e0b.tar.gz spark-00362dac976cd05b06638deb11d990d612429e0b.tar.bz2 spark-00362dac976cd05b06638deb11d990d612429e0b.zip |
[HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD zipPartitions"
9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests
to fail nondeterministically, which is blocking development for others.
Author: Ankur Dave <ankurdave@gmail.com>
Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits:
10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac "fix GraphX EdgeRDD zipPartitions"
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala | 4 | ||||
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala | 16 |
2 files changed, 2 insertions, 18 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 35fbd47e6c..5bcb96b136 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} -import org.apache.spark._ +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size))) + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index eaaa4499b6..6506bac73d 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.graphx import org.scalatest.FunSuite -import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -351,19 +350,4 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test("non-default number of edge partitions") { - val n = 10 - val defaultParallelism = 3 - val numEdgePartitions = 4 - assert(defaultParallelism != numEdgePartitions) - val conf = new SparkConf() - .set("spark.default.parallelism", defaultParallelism.toString) - val sc = new SparkContext("local", "test", conf) - val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)), - numEdgePartitions) - val graph = Graph.fromEdgeTuples(edges, 1) - val neighborAttrSums = graph.mapReduceTriplets[Int]( - et => Iterator((et.dstId, et.srcAttr)), _ + _) - assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) - } } |