aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
authorBrennon York <brennon.york@capitalone.com>2015-03-16 01:06:26 -0700
committerAnkur Dave <ankurdave@gmail.com>2015-03-16 01:06:26 -0700
commit45f4c66122c57011e74c694a424756812ab77d99 (patch)
treed2ecaeac9337457be3b345b95f1b632c36866f0c /graphx/src
parentaa6536fa3c2ed1cac47abc79fc22e273f0814858 (diff)
downloadspark-45f4c66122c57011e74c694a424756812ab77d99.tar.gz
spark-45f4c66122c57011e74c694a424756812ab77d99.tar.bz2
spark-45f4c66122c57011e74c694a424756812ab77d99.zip
[SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD
Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards compatibility and better unifies the VertexRDD methods to match each other. Author: Brennon York <brennon.york@capitalone.com> Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits: e800f08 [Brennon York] fixed merge conflicts b9274af [Brennon York] fixed merge conflicts f86375c [Brennon York] fixed minor include line 398ddb4 [Brennon York] fixed merge conflicts aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure that method works properly 2af0b88 [Brennon York] removed deprecation line 753c963 [Brennon York] fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method 2c678c6 [Brennon York] added mima exclude to exclude new public diff method from VertexRDD 93186f3 [Brennon York] added back the original diff method to sustain binary compatibility f18356e [Brennon York] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala9
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala13
3 files changed, 26 insertions, 0 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 40ecff7107..ad4bfe0772 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -126,6 +126,15 @@ abstract class VertexRDD[VD](
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
+ * @param other the other RDD[(VertexId, VD)] with which to diff against.
+ */
+ def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+ /**
+ * For each vertex present in both `this` and `other`, `diff` returns only those vertices with
+ * differing values; for values that are different, keeps the values from `other`. This is
+ * only guaranteed to work if the VertexRDDs share a common ancestor.
+ *
* @param other the other VertexRDD with which to diff against.
*/
def diff(other: VertexRDD[VD]): VertexRDD[VD]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 904be21314..125692ddaa 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
+ override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+ diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
+ }
+
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val otherPartition = other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 97533dd3aa..4f7a442ab5 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
import org.apache.spark.{HashPartitioner, SparkContext}
+import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
class VertexRDDSuite extends FunSuite with LocalSparkContext {
@@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}
+ test("diff with RDD[(VertexId, VD)]") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n).cache()
+ val flipEvens: RDD[(VertexId, Int)] =
+ sc.parallelize(0L to 100L)
+ .map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
+ // diff should keep only the changed vertices
+ assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
+ }
+ }
+
test("diff vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))