aboutsummaryrefslogtreecommitdiff
path: root/graph/src/main
diff options
context:
space:
mode:
authorAkihiro Matsukawa <amatsukawa@twitter.com>2013-12-04 18:17:14 -0800
committerAkihiro Matsukawa <amatsukawa@twitter.com>2013-12-04 18:17:14 -0800
commit38c6f5f6122ba32e1ef5d8b8a48ec99e6446d7e1 (patch)
tree00677e506560baf5c7599c049ee537a72409ccf3 /graph/src/main
parent088995f917548ac549397f24916ea72b0c3fc9d0 (diff)
downloadspark-38c6f5f6122ba32e1ef5d8b8a48ec99e6446d7e1.tar.gz
spark-38c6f5f6122ba32e1ef5d8b8a48ec99e6446d7e1.tar.bz2
spark-38c6f5f6122ba32e1ef5d8b8a48ec99e6446d7e1.zip
add a predicate to GraphLab to indicate active vertices at start
Diffstat (limited to 'graph/src/main')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index bf1f4168da..799c0fc901 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -24,6 +24,8 @@ object GraphLab {
* @param scatterFunc Executed after the apply function the scatter function takes
* a triplet and signals whether the neighboring vertex program
* must be recomputed.
+ * @param startVertices predicate to determine which vertices to start the computation on.
+ * these will be the active vertices in the first iteration.
* @param numIter The maximum number of iterations to run.
* @param gatherDirection The direction of edges to consider during the gather phase
* @param scatterDirection The direction of edges to consider during the scatter phase
@@ -40,12 +42,13 @@ object GraphLab {
(gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
applyFunc: (Vid, VD, Option[A]) => VD,
- scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean): Graph[VD, ED] = {
+ scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
+ startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = {
// Add an active attribute to all vertices to track convergence.
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
- case (id, data) => (true, data)
+ case (id, data) => (startVertices(id, data), data)
}.cache()
// The gather function wrapper strips the active attribute and