aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala9
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala13
3 files changed, 26 insertions, 0 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 40ecff7107..ad4bfe0772 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -126,6 +126,15 @@ abstract class VertexRDD[VD](
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
*
+ * @param other the other RDD[(VertexId, VD)] with which to diff against.
+ */
+ def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+ /**
+ * For each vertex present in both `this` and `other`, `diff` returns only those vertices with
+ * differing values; for values that are different, keeps the values from `other`. This is
+ * only guaranteed to work if the VertexRDDs share a common ancestor.
+ *
* @param other the other VertexRDD with which to diff against.
*/
def diff(other: VertexRDD[VD]): VertexRDD[VD]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 904be21314..125692ddaa 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
+ override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+ diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
+ }
+
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
val otherPartition = other match {
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 97533dd3aa..4f7a442ab5 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
import org.apache.spark.{HashPartitioner, SparkContext}
+import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
class VertexRDDSuite extends FunSuite with LocalSparkContext {
@@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}
+ test("diff with RDD[(VertexId, VD)]") {
+ withSpark { sc =>
+ val n = 100
+ val verts = vertices(sc, n).cache()
+ val flipEvens: RDD[(VertexId, Int)] =
+ sc.parallelize(0L to 100L)
+ .map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
+ // diff should keep only the changed vertices
+ assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
+ }
+ }
+
test("diff vertices with the non-equal number of partitions") {
withSpark { sc =>
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))