aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/main
diff options
context:
space:
mode:
authorBrennon York <brennon.york@capitalone.com>2015-03-26 19:08:09 -0700
committerAnkur Dave <ankurdave@gmail.com>2015-03-26 19:08:09 -0700
commit39fb57968352549f2276ac4fcd2b92988ed6fe42 (patch)
tree811c1ecc0e813d299cdfdac6f1d6521883beb4b8 /graphx/src/main
parentaad00322765d6041e817a6bd3fcff2187d212057 (diff)
downloadspark-39fb57968352549f2276ac4fcd2b92988ed6fe42.tar.gz
spark-39fb57968352549f2276ac4fcd2b92988ed6fe42.tar.bz2
spark-39fb57968352549f2276ac4fcd2b92988ed6fe42.zip
[SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference
Adds a `Graph#minus` method which will return only unique `VertexId`'s from the calling `VertexRDD`. To demonstrate a basic example with pseudocode: ``` Set((0L,0),(1L,1)).minus(Set((1L,1),(2L,2))) > Set((0L,0)) ``` Author: Brennon York <brennon.york@capitalone.com> Closes #5175 from brennonyork/SPARK-6510 and squashes the following commits: 248d5c8 [Brennon York] added minus(VertexRDD[VD]) method to avoid createUsingIndex and updated the mask operations to simplify with andNot call 3fb7cce [Brennon York] updated graphx doc to reflect the addition of minus method 6575d92 [Brennon York] updated mima exclude aaa030b [Brennon York] completed graph#minus functionality 7227c0f [Brennon York] beginning work on minus functionality
Diffstat (limited to 'graphx/src/main')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala16
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala15
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala25
3 files changed, 56 insertions, 0 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index ad4bfe0772..a9f04b559c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -122,6 +122,22 @@ abstract class VertexRDD[VD](
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
/**
+ * For each VertexId present in both `this` and `other`, minus will act as a set difference
+ * operation returning only those unique VertexId's present in `this`.
+ *
+ * @param other an RDD to run the set operation against
+ */
+ def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+ /**
+ * For each VertexId present in both `this` and `other`, minus will act as a set difference
+ * operation returning only those unique VertexId's present in `this`.
+ *
+ * @param other a VertexRDD to run the set operation against
+ */
+ def minus(other: VertexRDD[VD]): VertexRDD[VD]
+
+ /**
* For each vertex present in both `this` and `other`, `diff` returns only those vertices with
* differing values; for values that are different, keeps the values from `other`. This is
* only guaranteed to work if the VertexRDDs share a common ancestor.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index 4fd2548b7f..b90f9fa327 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -88,6 +88,21 @@ private[graphx] abstract class VertexPartitionBaseOps
this.withMask(newMask)
}
+ /** Hides the VertexId's that are the same between `this` and `other`. */
+ def minus(other: Self[VD]): Self[VD] = {
+ if (self.index != other.index) {
+ logWarning("Minus operations on two VertexPartitions with different indexes is slow.")
+ minus(createUsingIndex(other.iterator))
+ } else {
+ self.withMask(self.mask.andNot(other.mask))
+ }
+ }
+
+ /** Hides the VertexId's that are the same between `this` and `other`. */
+ def minus(other: Iterator[(VertexId, VD)]): Self[VD] = {
+ minus(createUsingIndex(other))
+ }
+
/**
* Hides vertices that are the same between this and other. For vertices that are different, keeps
* the values from `other`. The indices of `this` and `other` must be the same.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 125692ddaa..349c8545bf 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -103,6 +103,31 @@ class VertexRDDImpl[VD] private[graphx] (
override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
+ override def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+ minus(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
+ }
+
+ override def minus (other: VertexRDD[VD]): VertexRDD[VD] = {
+ other match {
+ case other: VertexRDD[_] if this.partitioner == other.partitioner =>
+ this.withPartitionsRDD[VD](
+ partitionsRDD.zipPartitions(
+ other.partitionsRDD, preservesPartitioning = true) {
+ (thisIter, otherIter) =>
+ val thisPart = thisIter.next()
+ val otherPart = otherIter.next()
+ Iterator(thisPart.minus(otherPart))
+ })
+ case _ =>
+ this.withPartitionsRDD[VD](
+ partitionsRDD.zipPartitions(
+ other.partitionBy(this.partitioner.get), preservesPartitioning = true) {
+ (partIter, msgs) => partIter.map(_.minus(msgs))
+ }
+ )
+ }
+ }
+
override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
}