aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-06 00:51:12 -0800
committerReynold Xin <rxin@apache.org>2013-12-06 00:51:12 -0800
commit41721b1494fe33c184374925bada3981296dee69 (patch)
treee8a219fa3ccf685c168101d27eb62e554126e18d /graph/src
parent3b0ee53eda906eef109e6a4332e62e09e432aeb8 (diff)
downloadspark-41721b1494fe33c184374925bada3981296dee69.tar.gz
spark-41721b1494fe33c184374925bada3981296dee69.tar.bz2
spark-41721b1494fe33c184374925bada3981296dee69.zip
Fixed a bug in VTableReplicated that we only process the first block.
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala3
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala5
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala3
3 files changed, 6 insertions, 5 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
index 0c50ad09c7..8ee631bdba 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
@@ -60,8 +60,7 @@ class VTableReplicated[VD: ClassManifest](
prev.zipPartitions(msgsByPartition) { (vTableIter, msgsIter) =>
val (pid, vertexPartition) = vTableIter.next()
- val (_, block) = msgsIter.next()
- val newVPart = vertexPartition.updateUsingIndex(block.iterator)(vdManifest)
+ val newVPart = vertexPartition.updateUsingIndex(msgsIter.flatMap(_._2.iterator))(vdManifest)
Iterator((pid, newVPart))
}.cache()
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
index 0af445fa7d..fe005c8723 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
@@ -42,7 +42,10 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
/** Return the vertex attribute for the given vertex ID. */
def apply(vid: Vid): VD = values(index.getPos(vid))
- def isDefined(vid: Vid): Boolean = mask.get(index.getPos(vid))
+ def isDefined(vid: Vid): Boolean = {
+ val pos = index.getPos(vid)
+ pos >= 0 && mask.get(pos)
+ }
/**
* Pass each vertex attribute along with the vertex id through a map
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
index e719ca0872..a3ac7470a5 100644
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
@@ -64,7 +64,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
if (pr1 != pr2) { 1 } else { 0 }
}.map { case (vid, test) => test }.sum
assert(notMatching === 0)
- prGraph2.vertices.foreach(println(_))
+ //prGraph2.vertices.foreach(println(_))
val errors = prGraph2.vertices.map { case (vid, pr) =>
val correct = (vid > 0 && pr == resetProb) ||
(vid == 0 && math.abs(pr - (resetProb + (1.0 - resetProb) * (resetProb * (nVertices - 1)) )) < 1.0E-5)
@@ -141,7 +141,6 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
else { assert(cc === 10) }
}
val ccMap = vertices.toMap
- println(ccMap)
for (id <- 0 until 20) {
if (id < 10) {
assert(ccMap(id) === 0)