aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-13 16:36:46 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-14 15:28:23 -0800
commit77b92748ad0224f55df9e2dbb80aa98f1597a49a (patch)
treed510bfc3445310f11be4c9ac60a1e447d26e3dc6 /graph/src
parentd161caa6eb42b2c399b7f0878bd5aea3978febcf (diff)
downloadspark-77b92748ad0224f55df9e2dbb80aa98f1597a49a.tar.gz
spark-77b92748ad0224f55df9e2dbb80aa98f1597a49a.tar.bz2
spark-77b92748ad0224f55df9e2dbb80aa98f1597a49a.zip
Replace update with innerJoin (has a bug)
There is a conflict between vertices that didn't change so are not moved but still need to run, and vertices that were deleted by the innerJoin so should not run.
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala7
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala11
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala19
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala23
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala3
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala30
6 files changed, 58 insertions, 35 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index 475553b6ab..8c28cd4373 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -281,9 +281,12 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
: Graph[VD2, ED]
/**
- * Replace vertices in the graph with corresponding vertices in `updates`.
+ * Replace vertices in the graph with corresponding vertices in `updates`, and restrict vertices
+ * without a corresponding vertex in `updates`. Edges adjacent to restricted vertices will still
+ * appear in graph.edges, but not in triplets or mapReduceTriplets.
*/
- def updateVertices(updates: VertexRDD[VD]): Graph[VD, ED]
+ def innerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)])
+ (f: (Vid, VD, U) => VD2): Graph[VD2, ED]
// Save a copy of the GraphOps object so there is always one unique GraphOps object
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 6a267d8c28..6bb8bd77e8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -98,13 +98,14 @@ object Pregel {
// Loop
var i = 0
while (activeMessages > 0 && i < maxIterations) {
- // receive the messages
- val changedVerts = g.vertices.zipJoin(messages)(vprog).cache() // updating the vertices
- // replicate the changed vertices
- g = g.updateVertices(changedVerts)
+ // Receive the messages. Vertices that didn't get any messages do not appear in changedVerts.
+ val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
+ // Update the graph with the new vertices, removing vertices that didn't get any messages.
+ g = g.innerJoinVertices(newVerts)
val oldMessages = messages
- // compute the messages
+ // Send new messages. Vertices that didn't get any messages in the previous round don't appear
+ // in the graph, so don't get to send messages.
messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
activeMessages = messages.count()
// after counting we can unpersist the old messages
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
index 0c6b20fd1d..5b8e00aa13 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
@@ -279,26 +279,31 @@ class VertexRDD[@specialized VD: ClassManifest](
* Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
* must have the same index.
*/
- def zipUpdate(other: VertexRDD[VD]): VertexRDD[VD] = {
+ def innerZipJoin[U: ClassManifest, VD2: ClassManifest](other: VertexRDD[U])
+ (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
- thisPart.update(otherPart.iterator)
+ thisPart.innerJoin(otherPart)(f)
}
}
- /** Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) } */
- def update(other: RDD[(Vid, VD)]): VertexRDD[VD] = {
+ /**
+ * Replace vertices with corresponding vertices in `other`, and drop vertices without a
+ * corresponding vertex in `other.
+ */
+ def innerJoin[U: ClassManifest, VD2: ClassManifest](other: RDD[(Vid, U)])
+ (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
- // If the other set is a VertexRDD then we use the much more efficient leftOuterZipJoin
+ // If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match {
case other: VertexRDD[_] =>
- zipUpdate(other)
+ innerZipJoin(other)(f)
case _ =>
new VertexRDD[VD](
partitionsRDD.zipPartitions(
other.partitionBy(this.partitioner.get), preservesPartitioning = true)
{ (part, msgs) =>
val vertexPartition: VertexPartition[VD] = part.next()
- Iterator(vertexPartition.update(msgs))
+ Iterator(vertexPartition.innerJoin(msgs)(f))
}
)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 8ba90fd9bb..a954e7f907 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -268,11 +268,24 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
}
- override def updateVertices(updates: VertexRDD[VD]): Graph[VD, ED] = {
- val newVerts = vertices.update(updates)
- val newVTableReplicated = new VTableReplicated(
- updates, edges, vertexPlacement, Some(vTableReplicated))
- new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ override def innerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)])
+ (f: (Vid, VD, U) => VD2): Graph[VD2, ED]
+ if (classManifest[VD] equals classManifest[VD2]) {
+ // f preserves type, so we can use incremental replication
+ val newVerts = vertices.innerJoin(table)(f)
+ val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
+ // TODO(ankurdave): Need to resolve conflict between vertices that didn't change so are not
+ // moved but still need to run, and vertices that were deleted by the innerJoin so should not
+ // run
+ val newVTableReplicated = new VTableReplicated(
+ changedVerts, edges, vertexPlacement,
+ Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
+ new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ } else {
+ // updateF does not preserve type, so we must re-replicate all vertices in table
+ val newVerts = vertices.innerJoin(table)(f)
+ new GraphImpl(newVerts, edges, vertexPlacement)
+ }
}
} // end of class GraphImpl
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
index 4cccac02cf..b90d2f14ee 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
@@ -79,8 +79,7 @@ class VTableReplicated[VD: ClassManifest](
// VertexPartitions
prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) =>
val (pid, prevVPart) = prevViewIter.next()
- val newVPart = prevVPart.updateHideUnchanged(
- shippedVertsIter.flatMap(_._2.iterator))
+ val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
Iterator((pid, newVPart))
}.cache().setName("VTableReplicated delta %s %s".format(includeSrcAttr, includeDstAttr))
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
index 55e7d74a28..c4aff6455d 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
@@ -172,6 +172,19 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
leftJoin(createUsingIndex(other))(f)
}
+ /** Inner join another VertexPartition. */
+ def innerJoin[U: ClassManifest, VD2: ClassManifest](other: VertexPartition[U])
+ (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
+ val newMask = mask & other.mask
+ val newValues = new Array[VD2](capacity)
+ var i = newMask.nextSetBit(0)
+ while (i >= 0) {
+ newValues(i) = f(index.getValue(i), values(i), other.values(i))
+ i = newMask.nextSetBit(i + 1)
+ }
+ new VertexPartition(index, newValues, newMask)
+ }
+
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
@@ -187,22 +200,11 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
new VertexPartition[VD2](index, newValues, newMask)
}
- /** Same effect as leftJoin(iter) { (vid, a, bOpt) => bOpt.getOrElse(a) } */
- def update(iter: Iterator[Product2[Vid, VD]]): VertexPartition[VD] = {
- val newValues = new Array[VD](capacity)
- System.arraycopy(values, 0, newValues, 0, newValues.length)
- iter.foreach { case (vid, vdata) =>
- val pos = index.getPos(vid)
- newValues(pos) = vdata
- }
- new VertexPartition(index, newValues, mask)
- }
-
/**
- * Same effect as leftJoin(iter) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but unchanged vertices
- * are hidden using the bitmask.
+ * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
+ * the partition, hidden by the bitmask.
*/
- def updateHideUnchanged(iter: Iterator[Product2[Vid, VD]])
+ def innerJoinKeepLeft(iter: Iterator[Product2[Vid, VD]])
: VertexPartition[VD] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD](capacity)