diff options
author | Brennon York <brennon.york@capitalone.com> | 2015-03-14 17:38:12 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-03-14 17:38:12 +0000 |
commit | c49d156624624a719c0d1262a58933ea3e346963 (patch) | |
tree | f689ce05e7a2fa091905d66d9e21ccb6574a03cb /graphx/src/test/scala/org | |
parent | 127268bc3999201ec1c0a040a29c7fa9ac25476b (diff) | |
download | spark-c49d156624624a719c0d1262a58933ea3e346963.tar.gz spark-c49d156624624a719c0d1262a58933ea3e346963.tar.bz2 spark-c49d156624624a719c0d1262a58933ea3e346963.zip |
[SPARK-5790][GraphX]: VertexRDD's won't zip properly for `diff` capability (added tests)
Added tests that maropu [created](https://github.com/maropu/spark/blob/1f64794b2ce33e64f340e383d4e8a60639a7eb4b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala) for vertices with differing partition counts. Wanted to make sure his work got captured /merged as its not in the master branch and I don't believe there's a PR out already for it.
Author: Brennon York <brennon.york@capitalone.com>
Closes #5023 from brennonyork/SPARK-5790 and squashes the following commits:
83bbd29 [Brennon York] added maropu's tests for vertices with differing partition counts
Diffstat (limited to 'graphx/src/test/scala/org')
-rw-r--r-- | graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala | 38 |
1 files changed, 37 insertions, 1 deletions
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 131959cea3..97533dd3aa 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import org.scalatest.FunSuite -import org.apache.spark.SparkContext +import org.apache.spark.{HashPartitioner, SparkContext} import org.apache.spark.storage.StorageLevel class VertexRDDSuite extends FunSuite with LocalSparkContext { @@ -58,6 +58,16 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { } } + test("diff vertices with the non-equal number of partitions") { + withSpark { sc => + val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0))) + val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1))) + assert(vertexA.partitions.size != vertexB.partitions.size) + val vertexC = vertexA.diff(vertexB) + assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet) + } + } + test("leftJoin") { withSpark { sc => val n = 100 @@ -73,6 +83,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { } } + test("leftJoin vertices with the non-equal number of partitions") { + withSpark { sc => + val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1))) + val vertexB = VertexRDD( + vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3))) + assert(vertexA.partitions.size != vertexB.partitions.size) + val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) => + old - newOpt.getOrElse(0) + } + assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet) + } + } + test("innerJoin") { withSpark { sc => val n = 100 @@ -87,6 +110,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { (0 to n by 2).map(x => (x.toLong, 0)).toSet) } } + test("innerJoin vertices with the non-equal number of partitions") { + withSpark { sc => + val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1))) + val vertexB = VertexRDD( + vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3))) + assert(vertexA.partitions.size != vertexB.partitions.size) + val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) => + old - newVal + } + assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet) + } + } + test("aggregateUsingIndex") { withSpark { sc => val n = 100 |