aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/compute-classpath.sh4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala6
-rw-r--r--graphx/pom.xml (renamed from graph/pom.xml)0
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala (renamed from graph/src/main/scala/org/apache/spark/graph/Analytics.scala)10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Edge.scala (renamed from graph/src/main/scala/org/apache/spark/graph/Edge.scala)12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala (renamed from graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala)4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala (renamed from graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala)21
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala (renamed from graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala)8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala (renamed from graph/src/main/scala/org/apache/spark/graph/Graph.scala)33
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala (renamed from graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala)6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala (renamed from graph/src/main/scala/org/apache/spark/graph/GraphLab.scala)40
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala (renamed from graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala)6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala (renamed from graph/src/main/scala/org/apache/spark/graph/GraphOps.scala)32
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala (renamed from graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala)22
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala (renamed from graph/src/main/scala/org/apache/spark/graph/Pregel.scala)27
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala (renamed from graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala)53
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala56
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala (renamed from graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala)12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala (renamed from graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala)65
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala (renamed from graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala)30
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala (renamed from graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala)25
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala (renamed from graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala)6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala8
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala)20
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala)14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala)6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala)105
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala)22
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala)63
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala)22
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala)14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala)55
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/package.scala (renamed from graph/src/main/scala/org/apache/spark/graph/package.scala)10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala (renamed from graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala)4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala (renamed from graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala)4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala (renamed from graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala)2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala (renamed from graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala)24
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala (renamed from graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala)2
-rw-r--r--graphx/src/test/resources/log4j.properties (renamed from graph/src/test/resources/log4j.properties)2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala)32
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala)43
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala (renamed from graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala)4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala)14
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala)24
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala)18
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala)22
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala)31
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala)17
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala)12
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala)14
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala)6
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/impl/VertexPartitionSuite.scala)4
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala (renamed from graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala)2
-rw-r--r--project/SparkBuild.scala12
54 files changed, 597 insertions, 513 deletions
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index c10725e708..e01cfa1eb8 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -40,7 +40,7 @@ if [ -f "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-dep
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"
- CLASSPATH="$CLASSPATH:$FWDIR/graph/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/classes"
DEPS_ASSEMBLY_JAR=`ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*-deps.jar`
@@ -61,7 +61,7 @@ if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/test-classes"
- CLASSPATH="$CLASSPATH:$FWDIR/graph/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/graphx/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes"
fi
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
index 8dd7fb40e8..cfafbaf23e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala
@@ -31,16 +31,16 @@ import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
import com.esotericsoftware.kryo._
class PageRankUtils extends Serializable {
- def computeWithCombiner(numVertices: Long, epsilon: Double, terminateSteps: Int = 10)(
+ def computeWithCombiner(numVertices: Long, epsilon: Double)(
self: PRVertex, messageSum: Option[Double], superstep: Int
): (PRVertex, Array[PRMessage]) = {
val newValue = messageSum match {
case Some(msgSum) if msgSum != 0 =>
- 0.15 + 0.85 * msgSum
+ 0.15 / numVertices + 0.85 * msgSum
case _ => self.value
}
- val terminate = superstep >= terminateSteps
+ val terminate = superstep >= 10
val outbox: Array[PRMessage] =
if (!terminate)
diff --git a/graph/pom.xml b/graphx/pom.xml
index fd3dcaad7c..fd3dcaad7c 100644
--- a/graph/pom.xml
+++ b/graphx/pom.xml
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala
index 14b9be73f1..2c4c885a04 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Analytics.scala
@@ -1,7 +1,7 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import org.apache.spark._
-import org.apache.spark.graph.algorithms._
+import org.apache.spark.graphx.algorithms._
/**
@@ -49,7 +49,7 @@ object Analytics extends Logging {
val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
//System.setProperty("spark.shuffle.compress", "false")
- System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
taskType match {
case "pagerank" => {
@@ -83,7 +83,7 @@ object Analytics extends Logging {
println("GRAPHX: Number of edges " + graph.edges.count)
//val pr = Analytics.pagerank(graph, numIter)
- val pr = PageRank.runStandalone(graph, tol)
+ val pr = graph.pageRank(tol).vertices.cache()
println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_+_))
@@ -400,7 +400,7 @@ object Analytics extends Logging {
// System.setProperty("spark.serializer", "spark.KryoSerializer")
// //System.setProperty("spark.shuffle.compress", "false")
- // System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
+ // System.setProperty("spark.kryo.registrator", "spark.graphx.GraphKryoRegistrator")
// taskType match {
// case "pagerank" => {
diff --git a/graph/src/main/scala/org/apache/spark/graph/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 5ac77839eb..29b46674f1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
/**
@@ -11,11 +11,11 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
/**
* The vertex id of the source vertex
*/
- var srcId: Vid = 0,
+ var srcId: VertexID = 0,
/**
* The vertex id of the target vertex.
*/
- var dstId: Vid = 0,
+ var dstId: VertexID = 0,
/**
* The attribute associated with the edge.
*/
@@ -27,7 +27,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @param vid the id one of the two vertices on the edge.
* @return the id of the other vertex on the edge.
*/
- def otherVertexId(vid: Vid): Vid =
+ def otherVertexId(vid: VertexID): VertexID =
if (srcId == vid) dstId else { assert(dstId == vid); srcId }
/**
@@ -38,13 +38,13 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
* @return the relative direction of the edge to the corresponding
* vertex.
*/
- def relativeDirection(vid: Vid): EdgeDirection =
+ def relativeDirection(vid: VertexID): EdgeDirection =
if (vid == srcId) EdgeDirection.Out else { assert(vid == dstId); EdgeDirection.In }
}
object Edge {
def lexicographicOrdering[ED] = new Ordering[Edge[ED]] {
override def compare(a: Edge[ED], b: Edge[ED]): Int =
- Ordering[(Vid, Vid)].compare((a.srcId, a.dstId), (b.srcId, b.dstId))
+ Ordering[(VertexID, VertexID)].compare((a.srcId, a.dstId), (b.srcId, b.dstId))
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
index a1468a152b..785f941650 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeDirection.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeDirection.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
/**
@@ -7,7 +7,7 @@ package org.apache.spark.graph
*/
sealed abstract class EdgeDirection {
/**
- * Reverse the direction of an edge. An in becomes out,
+ * Reverse the direction of an edge. An in becomes out,
* out becomes in and both remains both.
*/
def reverse: EdgeDirection = this match {
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 230202d6b0..7fd6580626 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -1,15 +1,15 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
-import org.apache.spark.graph.impl.EdgePartition
+import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
class EdgeRDD[@specialized ED: ClassTag](
- val partitionsRDD: RDD[(Pid, EdgePartition[ED])])
+ val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
partitionsRDD.setName("EdgeRDD")
@@ -17,7 +17,7 @@ class EdgeRDD[@specialized ED: ClassTag](
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
/**
- * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
+ * If partitionsRDD already has a partitioner, use it. Otherwise assume that the PartitionIDs in
* partitionsRDD correspond to the actual partitions and create a new partitioner that allows
* co-partitioning with partitionsRDD.
*/
@@ -25,7 +25,7 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
- firstParent[(Pid, EdgePartition[ED])].iterator(part, context).next._2.iterator
+ firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator
}
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
@@ -44,7 +44,12 @@ class EdgeRDD[@specialized ED: ClassTag](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()
- def mapEdgePartitions[ED2: ClassTag](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2])
+ override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
+ partitionsRDD.unpersist(blocking)
+ this
+ }
+
+ def mapEdgePartitions[ED2: ClassTag](f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2])
: EdgeRDD[ED2] = {
// iter => iter.map { case (pid, ep) => (pid, f(ep)) }
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
@@ -55,7 +60,7 @@ class EdgeRDD[@specialized ED: ClassTag](
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
- (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = {
+ (f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@@ -66,7 +71,7 @@ class EdgeRDD[@specialized ED: ClassTag](
})
}
- def collectVids(): RDD[Vid] = {
+ def collectVertexIDs(): RDD[VertexID] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index 5a384a5f84..b0565b7e0e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -1,6 +1,6 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
-import org.apache.spark.graph.impl.VertexPartition
+import org.apache.spark.graphx.impl.VertexPartition
/**
* An edge triplet represents two vertices and edge along with their
@@ -47,7 +47,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id one of the two vertices on the edge.
* @return the attribute for the other vertex on the edge.
*/
- def otherVertexAttr(vid: Vid): VD =
+ def otherVertexAttr(vid: VertexID): VD =
if (srcId == vid) dstAttr else { assert(dstId == vid); srcAttr }
/**
@@ -56,7 +56,7 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
* @param vid the id of one of the two vertices on the edge
* @return the attr for the vertex with that id.
*/
- def vertexAttr(vid: Vid): VD =
+ def vertexAttr(vid: VertexID): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 9dd26f7679..6f2d19d0da 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import scala.reflect.ClassTag
-import org.apache.spark.graph.impl._
+import org.apache.spark.graphx.impl._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -94,6 +94,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
def cache(): Graph[VD, ED]
/**
+ * Uncache only the vertices of this graph, leaving the edges alone. This is useful because most
+ * graph operations modify the vertices but reuse the edges.
+ */
+ def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
+
+ /**
* Repartition the edges in the graph according to partitionStrategy.
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
@@ -125,7 +131,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* }}}
*
*/
- def mapVertices[VD2: ClassTag](map: (Vid, VD) => VD2): Graph[VD2, ED]
+ def mapVertices[VD2: ClassTag](map: (VertexID, VD) => VD2): Graph[VD2, ED]
/**
* Construct a new graph where the value of each edge is
@@ -170,8 +176,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*
*/
def mapEdges[ED2: ClassTag](
- map: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]):
- Graph[VD, ED2]
+ map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
/**
* Construct a new graph where the value of each edge is
@@ -222,7 +227,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*
*/
def mapTriplets[ED2: ClassTag](
- map: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
+ map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
Graph[VD, ED2]
/**
@@ -255,7 +260,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* satisfy the predicates.
*/
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
- vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
+ vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
/**
* Subgraph of this graph with only vertices and edges from the other graph.
@@ -304,7 +309,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
- * val inDeg: RDD[(Vid, Int)] =
+ * val inDeg: RDD[(VertexID, Int)] =
* mapReduceTriplets[Int](et => Array((et.dst.id, 1)), _ + _)
* }}}
*
@@ -316,7 +321,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*
*/
def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]
@@ -343,15 +348,15 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
*
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("webgraph")
- * val outDeg: RDD[(Vid, Int)] = rawGraph.outDegrees()
+ * val outDeg: RDD[(VertexID, Int)] = rawGraph.outDegrees()
* val graph = rawGraph.outerJoinVertices(outDeg) {
* (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
*
*/
- def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(Vid, U)])
- (mapFunc: (Vid, VD, Option[U]) => VD2)
+ def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)])
+ (mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Save a copy of the GraphOps object so there is always one unique GraphOps object
@@ -379,7 +384,7 @@ object Graph {
* (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
- rawEdges: RDD[(Vid, Vid)],
+ rawEdges: RDD[(VertexID, VertexID)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
@@ -421,7 +426,7 @@ object Graph {
* partitioning the edges.
*/
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(Vid, VD)],
+ vertices: RDD[(VertexID, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr)
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index b8c1b5b0f0..f8aab951f0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import com.esotericsoftware.kryo.Kryo
-import org.apache.spark.graph.impl._
+import org.apache.spark.graphx.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.BoundedPriorityQueue
@@ -14,7 +14,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
- kryo.register(classOf[(Vid, Object)])
+ kryo.register(classOf[(VertexID, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
index c1ce5cd9cc..7efc69c64e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import scala.reflect.ClassTag
@@ -42,11 +42,12 @@ object GraphLab extends Logging {
(graph: Graph[VD, ED], numIter: Int,
gatherDirection: EdgeDirection = EdgeDirection.In,
scatterDirection: EdgeDirection = EdgeDirection.Out)
- (gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
+ (gatherFunc: (VertexID, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
- applyFunc: (Vid, VD, Option[A]) => VD,
- scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
- startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = {
+ applyFunc: (VertexID, VD, Option[A]) => VD,
+ scatterFunc: (VertexID, EdgeTriplet[VD, ED]) => Boolean,
+ startVertices: (VertexID, VD) => Boolean = (vid: VertexID, data: VD) => true)
+ : Graph[VD, ED] = {
// Add an active attribute to all vertices to track convergence.
@@ -56,7 +57,7 @@ object GraphLab extends Logging {
// The gather function wrapper strips the active attribute and
// only invokes the gather function on active vertices
- def gather(vid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
+ def gather(vid: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[A] = {
if (e.vertexAttr(vid)._1) {
val edgeTriplet = new EdgeTriplet[VD,ED]
edgeTriplet.set(e)
@@ -70,7 +71,7 @@ object GraphLab extends Logging {
// The apply function wrapper strips the vertex of the active attribute
// and only invokes the apply function on active vertices
- def apply(vid: Vid, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
+ def apply(vid: VertexID, data: (Boolean, VD), accum: Option[A]): (Boolean, VD) = {
val (active, vData) = data
if (active) (true, applyFunc(vid, vData, accum))
else (false, vData)
@@ -78,8 +79,8 @@ object GraphLab extends Logging {
// The scatter function wrapper strips the vertex of the active attribute
// and only invokes the scatter function on active vertices
- def scatter(rawVid: Vid, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
- val vid = e.otherVertexId(rawVid)
+ def scatter(rawVertexID: VertexID, e: EdgeTriplet[(Boolean, VD), ED]): Option[Boolean] = {
+ val vid = e.otherVertexId(rawVertexID)
if (e.vertexAttr(vid)._1) {
val edgeTriplet = new EdgeTriplet[VD,ED]
edgeTriplet.set(e)
@@ -92,7 +93,8 @@ object GraphLab extends Logging {
}
// Used to set the active status of vertices for the next round
- def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
+ def applyActive(
+ vid: VertexID, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
val (prevActive, vData) = data
(newActiveOpt.getOrElse(false), vData)
}
@@ -100,29 +102,29 @@ object GraphLab extends Logging {
// Main Loop ---------------------------------------------------------------------
var i = 0
var numActive = activeGraph.numVertices
+ var prevActiveGraph: Graph[(Boolean, VD), ED] = null
while (i < numIter && numActive > 0) {
// Gather
- val gathered: RDD[(Vid, A)] =
+ val gathered: RDD[(VertexID, A)] =
activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection)
// Apply
- activeGraph = activeGraph.outerJoinVertices(gathered)(apply).cache()
-
-
+ val applied = activeGraph.outerJoinVertices(gathered)(apply).cache()
// Scatter is basically a gather in the opposite direction so we reverse the edge direction
- // activeGraph: Graph[(Boolean, VD), ED]
- val scattered: RDD[(Vid, Boolean)] =
- activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
+ val scattered: RDD[(VertexID, Boolean)] =
+ applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
- activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
+ prevActiveGraph = activeGraph
+ activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache()
- // Calculate the number of active vertices
+ // Calculate the number of active vertices.
numActive = activeGraph.vertices.map{
case (vid, data) => if (data._1) 1 else 0
}.reduce(_ + _)
logInfo("Number active vertices: " + numActive)
+
i += 1
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 7daac4fcc5..473cfb18cf 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -1,11 +1,11 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import java.util.{Arrays => JArrays}
import scala.reflect.ClassTag
-import org.apache.spark.graph.impl.EdgePartitionBuilder
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.graph.impl.{EdgePartition, GraphImpl}
+import org.apache.spark.graphx.impl.{EdgePartition, GraphImpl}
import org.apache.spark.util.collection.PrimitiveVector
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 11c6120beb..cacfcb1c90 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import scala.reflect.ClassTag
@@ -112,7 +112,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
*
*/
def aggregateNeighbors[A: ClassTag](
- mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
+ mapFunc: (VertexID, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
dir: EdgeDirection)
: VertexRDD[A] = {
@@ -151,25 +151,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* @return the vertex set of neighboring ids for each vertex.
*/
def collectNeighborIds(edgeDirection: EdgeDirection) :
- VertexRDD[Array[Vid]] = {
+ VertexRDD[Array[VertexID]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Both) {
- graph.mapReduceTriplets[Array[Vid]](
+ graph.mapReduceTriplets[Array[VertexID]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId)), (et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _
)
} else if (edgeDirection == EdgeDirection.Out) {
- graph.mapReduceTriplets[Array[Vid]](
+ graph.mapReduceTriplets[Array[VertexID]](
mapFunc = et => Iterator((et.srcId, Array(et.dstId))),
reduceFunc = _ ++ _)
} else if (edgeDirection == EdgeDirection.In) {
- graph.mapReduceTriplets[Array[Vid]](
+ graph.mapReduceTriplets[Array[VertexID]](
mapFunc = et => Iterator((et.dstId, Array(et.srcId))),
reduceFunc = _ ++ _)
} else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a direction.")
}
- graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) }
+ graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
+ nbrsOpt.getOrElse(Array.empty[VertexID])
+ }
} // end of collectNeighborIds
@@ -187,14 +189,16 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* vertex.
*/
def collectNeighbors(edgeDirection: EdgeDirection) :
- VertexRDD[ Array[(Vid, VD)] ] = {
- val nbrs = graph.aggregateNeighbors[Array[(Vid,VD)]](
+ VertexRDD[ Array[(VertexID, VD)] ] = {
+ val nbrs = graph.aggregateNeighbors[Array[(VertexID,VD)]](
(vid, edge) =>
Some(Array( (edge.otherVertexId(vid), edge.otherVertexAttr(vid)) )),
(a, b) => a ++ b,
edgeDirection)
- graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(Vid, VD)]) }
+ graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
+ nbrsOpt.getOrElse(Array.empty[(VertexID, VD)])
+ }
} // end of collectNeighbor
@@ -228,9 +232,9 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* }}}
*
*/
- def joinVertices[U: ClassTag](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
+ def joinVertices[U: ClassTag](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD)
: Graph[VD, ED] = {
- val uf = (id: Vid, data: VD, o: Option[U]) => {
+ val uf = (id: VertexID, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
@@ -259,7 +263,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
* val degrees: VertexSetRDD[Int] = graph.outDegrees
* graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
* },
- * vpred = (vid: Vid, deg:Int) => deg > 0
+ * vpred = (vid: VertexID, deg:Int) => deg > 0
* )
* }}}
*
@@ -267,7 +271,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
- vpred: (Vid, VD2) => Boolean = (v:Vid, d:VD2) => true): Graph[VD, ED] = {
+ vpred: (VertexID, VD2) => Boolean = (v:VertexID, d:VD2) => true): Graph[VD, ED] = {
graph.mask(preprocess(graph).subgraph(epred, vpred))
}
} // end of GraphOps
diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index 293a9d588a..5e80a535f1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
sealed trait PartitionStrategy extends Serializable {
- def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid
+ def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
}
@@ -51,19 +51,19 @@ sealed trait PartitionStrategy extends Serializable {
*
*/
case object EdgePartition2D extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
- val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt
- val mixingPrime: Vid = 1125899906842597L
- val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
- val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
+ val mixingPrime: VertexID = 1125899906842597L
+ val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
}
}
case object EdgePartition1D extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
- val mixingPrime: Vid = 1125899906842597L
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
+ val mixingPrime: VertexID = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
}
}
@@ -74,7 +74,7 @@ case object EdgePartition1D extends PartitionStrategy {
* random vertex cut.
*/
case object RandomVertexCut extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
}
}
@@ -86,7 +86,7 @@ case object RandomVertexCut extends PartitionStrategy {
* will end up on the same partition.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
- override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 4664091b57..0af230ed29 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import scala.reflect.ClassTag
@@ -25,9 +25,9 @@ import scala.reflect.ClassTag
* // Set the vertex attributes to the initial pagerank values
* .mapVertices( (id, attr) => 1.0 )
*
- * def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
+ * def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
* resetProb + (1.0 - resetProb) * msgSum
- * def sendMessage(id: Vid, edge: EdgeTriplet[Double, Double]): Option[Double] =
+ * def sendMessage(id: VertexID, edge: EdgeTriplet[Double, Double]): Option[Double] =
* Some(edge.srcAttr * edge.attr)
* def messageCombiner(a: Double, b: Double): Double = a + b
* val initialMessage = 0.0
@@ -88,30 +88,39 @@ object Pregel {
*/
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
- vprog: (Vid, VD, A) => VD,
- sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
+ vprog: (VertexID, VD, A) => VD,
+ sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
- var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) )
+ var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
- var messages = g.mapReduceTriplets(sendMsg, mergeMsg).cache()
+ var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop
+ var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Update the graph with the new vertices.
+ prevG = g
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
+ g.cache()
val oldMessages = messages
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
- // get to send messages.
+ // get to send messages. We must cache messages so it can be materialized on the next line,
+ // allowing us to uncache the previous iteration.
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, EdgeDirection.Out))).cache()
+ // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
+ // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
+ // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
activeMessages = messages.count()
- // after counting we can unpersist the old messages
+ // Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
+ newVerts.unpersist(blocking=false)
+ prevG.unpersistVertices(blocking=false)
// count the iteration
i += 1
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index c5fb4aeca7..971e2615d4 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.graph
+package org.apache.spark.graphx
import scala.reflect.ClassTag
@@ -24,12 +24,12 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.graph.impl.MsgRDDFunctions
-import org.apache.spark.graph.impl.VertexPartition
+import org.apache.spark.graphx.impl.MsgRDDFunctions
+import org.apache.spark.graphx.impl.VertexPartition
/**
- * A `VertexRDD[VD]` extends the `RDD[(Vid, VD)]` by ensuring that there is
+ * A `VertexRDD[VD]` extends the `RDD[(VertexID, VD)]` by ensuring that there is
* only one entry for each vertex and by pre-indexing the entries for fast,
* efficient joins.
*
@@ -40,12 +40,12 @@ import org.apache.spark.graph.impl.VertexPartition
* @example Construct a `VertexRDD` from a plain RDD
* {{{
* // Construct an intial vertex set
- * val someData: RDD[(Vid, SomeType)] = loadData(someFile)
+ * val someData: RDD[(VertexID, SomeType)] = loadData(someFile)
* val vset = VertexRDD(someData)
* // If there were redundant values in someData we would use a reduceFunc
* val vset2 = VertexRDD(someData, reduceFunc)
* // Finally we can use the VertexRDD to index another dataset
- * val otherData: RDD[(Vid, OtherType)] = loadData(otherFile)
+ * val otherData: RDD[(VertexID, OtherType)] = loadData(otherFile)
* val vset3 = VertexRDD(otherData, vset.index)
* // Now we can construct very fast joins between the two sets
* val vset4: VertexRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
@@ -54,7 +54,7 @@ import org.apache.spark.graph.impl.VertexPartition
*/
class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[VertexPartition[VD]])
- extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
+ extends RDD[(VertexID, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined)
@@ -98,15 +98,20 @@ class VertexRDD[@specialized VD: ClassTag](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): VertexRDD[VD] = persist()
+ override def unpersist(blocking: Boolean = true): VertexRDD[VD] = {
+ partitionsRDD.unpersist(blocking)
+ this
+ }
+
/** Return the number of vertices in this set. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
}
/**
- * Provide the `RDD[(Vid, VD)]` equivalent output.
+ * Provide the `RDD[(VertexID, VD)]` equivalent output.
*/
- override def compute(part: Partition, context: TaskContext): Iterator[(Vid, VD)] = {
+ override def compute(part: Partition, context: TaskContext): Iterator[(VertexID, VD)] = {
firstParent[VertexPartition[VD]].iterator(part, context).next.iterator
}
@@ -125,14 +130,14 @@ class VertexRDD[@specialized VD: ClassTag](
* given predicate.
*
* @param pred the user defined predicate, which takes a tuple to conform to
- * the RDD[(Vid, VD)] interface
+ * the RDD[(VertexID, VD)] interface
*
* @note The vertex set preserves the original index structure
* which means that the returned RDD can be easily joined with
* the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
- override def filter(pred: Tuple2[Vid, VD] => Boolean): VertexRDD[VD] =
+ override def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] =
this.mapVertexPartitions(_.filter(Function.untupled(pred)))
/**
@@ -160,7 +165,7 @@ class VertexRDD[@specialized VD: ClassTag](
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
*/
- def mapValues[VD2: ClassTag](f: (Vid, VD) => VD2): VertexRDD[VD2] =
+ def mapValues[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
@@ -197,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
*
*/
def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
- (other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
+ (other: VertexRDD[VD2])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -228,8 +233,8 @@ class VertexRDD[@specialized VD: ClassTag](
* VertexRDD with the attribute emitted by f.
*/
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: RDD[(Vid, VD2)])
- (f: (Vid, VD, Option[VD2]) => VD3)
+ (other: RDD[(VertexID, VD2)])
+ (f: (VertexID, VD, Option[VD2]) => VD3)
: VertexRDD[VD3] =
{
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
@@ -254,7 +259,7 @@ class VertexRDD[@specialized VD: ClassTag](
* must have the same index.
*/
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
- (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
+ (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
other.partitionsRDD, preservesPartitioning = true
) { (thisIter, otherIter) =>
@@ -269,8 +274,8 @@ class VertexRDD[@specialized VD: ClassTag](
* Replace vertices with corresponding vertices in `other`, and drop vertices without a
* corresponding vertex in `other`.
*/
- def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(Vid, U)])
- (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
+ (f: (VertexID, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
other match {
@@ -293,7 +298,7 @@ class VertexRDD[@specialized VD: ClassTag](
* co-indexed with this one.
*/
def aggregateUsingIndex[VD2: ClassTag](
- messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
+ messages: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
{
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
@@ -319,8 +324,8 @@ object VertexRDD {
*
* @param rdd the collection of vertex-attribute pairs
*/
- def apply[VD: ClassTag](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = {
- val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
+ def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)]): VertexRDD[VD] = {
+ val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -339,9 +344,9 @@ object VertexRDD {
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
- def apply[VD: ClassTag](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
+ def apply[VD: ClassTag](rdd: RDD[(VertexID, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
{
- val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
+ val partitioned: RDD[(VertexID, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
}
@@ -351,7 +356,7 @@ object VertexRDD {
new VertexRDD(vertexPartitions)
}
- def apply[VD: ClassTag](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD)
+ def apply[VD: ClassTag](vids: RDD[VertexID], rdd: RDD[(VertexID, VD)], defaultVal: VD)
: VertexRDD[VD] =
{
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala
new file mode 100644
index 0000000000..4af7af545c
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/Algorithms.scala
@@ -0,0 +1,56 @@
+package org.apache.spark.graphx.algorithms
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx._
+
+class Algorithms[VD: ClassTag, ED: ClassTag](self: Graph[VD, ED]) {
+ /**
+ * Run a dynamic version of PageRank returning a graph with vertex attributes containing the
+ * PageRank and edge attributes containing the normalized edge weight.
+ *
+ * @see [[org.apache.spark.graphx.algorithms.PageRank]], method `runUntilConvergence`.
+ */
+ def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
+ PageRank.runUntilConvergence(self, tol, resetProb)
+ }
+
+ /**
+ * Run PageRank for a fixed number of iterations returning a graph with vertex attributes
+ * containing the PageRank and edge attributes the normalized edge weight.
+ *
+ * @see [[org.apache.spark.graphx.algorithms.PageRank]], method `run`.
+ */
+ def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
+ PageRank.run(self, numIter, resetProb)
+ }
+
+ /**
+ * Compute the connected component membership of each vertex and return a graph with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @see [[org.apache.spark.graphx.algorithms.ConnectedComponents]]
+ */
+ def connectedComponents(): Graph[VertexID, ED] = {
+ ConnectedComponents.run(self)
+ }
+
+ /**
+ * Compute the number of triangles passing through each vertex.
+ *
+ * @see [[org.apache.spark.graphx.algorithms.TriangleCount]]
+ */
+ def triangleCount(): Graph[Int, ED] = {
+ TriangleCount.run(self)
+ }
+
+ /**
+ * Compute the strongly connected component (SCC) of each vertex and return a graph with the
+ * vertex value containing the lowest vertex id in the SCC containing that vertex.
+ *
+ * @see [[org.apache.spark.graphx.algorithms.StronglyConnectedComponents]]
+ */
+ def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] = {
+ StronglyConnectedComponents.run(self, numIter)
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala
index 7cd947d2ba..137a81f4d5 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/ConnectedComponents.scala
@@ -1,11 +1,13 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
-import org.apache.spark.graph._
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx._
object ConnectedComponents {
/**
- * Compute the connected component membership of each vertex and return an RDD with the vertex
+ * Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the computation)
@@ -16,10 +18,10 @@ object ConnectedComponents {
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
- def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]): Graph[Vid, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
- def sendMessage(edge: EdgeTriplet[Vid, ED]) = {
+ def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala
index f77dffd7b4..ab447d5422 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/PageRank.scala
@@ -1,7 +1,9 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
+
+import scala.reflect.ClassTag
import org.apache.spark.Logging
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
object PageRank extends Logging {
@@ -42,7 +44,7 @@ object PageRank extends Logging {
* containing the normalized weight.
*
*/
- def run[VD: Manifest, ED: Manifest](
+ def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{
@@ -59,13 +61,14 @@ object PageRank extends Logging {
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to the initial pagerank values
.mapVertices( (id, attr) => 1.0 )
+ .cache()
// Display statistics about pagerank
logInfo(pagerankGraph.statistics.toString)
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: Vid, attr: Double, msgSum: Double): Double =
+ def vertexProgram(id: VertexID, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]) =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
@@ -109,7 +112,7 @@ object PageRank extends Logging {
* @return the graph containing with each vertex containing the PageRank and each edge
* containing the normalized weight.
*/
- def runUntillConvergence[VD: Manifest, ED: Manifest](
+ def runUntilConvergence[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
{
// Initialize the pagerankGraph with each edge attribute
@@ -123,13 +126,14 @@ object PageRank extends Logging {
.mapTriplets( e => 1.0 / e.srcAttr )
// Set the vertex attributes to (initalPR, delta = 0)
.mapVertices( (id, attr) => (0.0, 0.0) )
+ .cache()
// Display statistics about pagerank
logInfo(pagerankGraph.statistics.toString)
// Define the three functions needed to implement PageRank in the GraphX
// version of Pregel
- def vertexProgram(id: Vid, attr: (Double, Double), msgSum: Double): (Double, Double) = {
+ def vertexProgram(id: VertexID, attr: (Double, Double), msgSum: Double): (Double, Double) = {
val (oldPR, lastDelta) = attr
val newPR = oldPR + (1.0 - resetProb) * msgSum
(newPR, newPR - oldPR)
@@ -153,53 +157,4 @@ object PageRank extends Logging {
.mapVertices((vid, attr) => attr._1)
} // end of deltaPageRank
- def runStandalone[VD: Manifest, ED: Manifest](
- graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): VertexRDD[Double] = {
-
- // Initialize the ranks
- var ranks: VertexRDD[Double] = graph.vertices.mapValues((vid, attr) => resetProb).cache()
-
- // Initialize the delta graph where each vertex stores its delta and each edge knows its weight
- var deltaGraph: Graph[Double, Double] =
- graph.outerJoinVertices(graph.outDegrees)((vid, vdata, deg) => deg.getOrElse(0))
- .mapTriplets(e => 1.0 / e.srcAttr)
- .mapVertices((vid, degree) => resetProb).cache()
- var numDeltas: Long = ranks.count()
-
- var prevDeltas: Option[VertexRDD[Double]] = None
-
- var i = 0
- val weight = (1.0 - resetProb)
- while (numDeltas > 0) {
- // Compute new deltas. Only deltas that existed in the last round (i.e., were greater than
- // `tol`) get to send messages; those that were less than `tol` would send messages less than
- // `tol` as well.
- val deltas = deltaGraph
- .mapReduceTriplets[Double](
- et => Iterator((et.dstId, et.srcAttr * et.attr * weight)),
- _ + _,
- prevDeltas.map((_, EdgeDirection.Out)))
- .filter { case (vid, delta) => delta > tol }
- .cache()
- prevDeltas = Some(deltas)
- numDeltas = deltas.count()
- logInfo("Standalone PageRank: iter %d has %d deltas".format(i, numDeltas))
-
- // Update deltaGraph with the deltas
- deltaGraph = deltaGraph.outerJoinVertices(deltas) { (vid, old, newOpt) =>
- newOpt.getOrElse(old)
- }.cache()
-
- // Update ranks
- ranks = ranks.leftZipJoin(deltas) { (vid, oldRank, deltaOpt) =>
- oldRank + deltaOpt.getOrElse(0.0)
- }
- ranks.foreach(x => {}) // force the iteration for ease of debugging
-
- i += 1
- }
-
- ranks
- }
-
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala
index 18395bdc5f..2a13553d79 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/SVDPlusPlus.scala
@@ -1,11 +1,11 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
import org.apache.spark.rdd._
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
import scala.util.Random
import org.apache.commons.math.linear._
-class SvdppConf( // Svdpp parameters
+class SVDPlusPlusConf( // SVDPlusPlus parameters
var rank: Int,
var maxIters: Int,
var minVal: Double,
@@ -15,7 +15,7 @@ class SvdppConf( // Svdpp parameters
var gamma6: Double,
var gamma7: Double) extends Serializable
-object Svdpp {
+object SVDPlusPlus {
/**
* Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model",
* paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
@@ -23,12 +23,12 @@ object Svdpp {
*
* @param edges edges for constructing the graph
*
- * @param conf Svdpp parameters
+ * @param conf SVDPlusPlus parameters
*
* @return a graph with vertex attributes containing the trained model
*/
- def run(edges: RDD[Edge[Double]], conf: SvdppConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = {
+ def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = {
// generate default vertex attribute
def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = {
@@ -42,6 +42,7 @@ object Svdpp {
}
// calculate global rating mean
+ edges.cache()
val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
val u = rs / rc
@@ -51,12 +52,12 @@ object Svdpp {
// calculate initial bias and norm
var t0 = g.mapReduceTriplets(et =>
Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
- g = g.outerJoinVertices(t0) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
- def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
- : Iterator[(Vid, (RealVector, RealVector, Double))] = {
+ def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+ : Iterator[(VertexID, (RealVector, RealVector, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -72,20 +73,22 @@ object Svdpp {
for (i <- 0 until conf.maxIters) {
// phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
+ g.cache()
var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2))
- g = g.outerJoinVertices(t1) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+ g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
}
// phase 2, update p for user nodes and q, y for item nodes
+ g.cache()
val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
(g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
- g = g.outerJoinVertices(t2) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
+ g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
(vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
}
}
// calculate error on training set
- def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(Vid, Double)] = {
+ def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -94,8 +97,9 @@ object Svdpp {
val err = (et.attr - pred) * (et.attr - pred)
Iterator((et.dstId, err))
}
+ g.cache()
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
- g = g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}
(g, u)
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala
index c324c984d7..864f0ec57c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponents.scala
@@ -1,12 +1,14 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
-import org.apache.spark.graph._
+import scala.reflect.ClassTag
+
+import org.apache.spark.graphx._
object StronglyConnectedComponents {
/**
- * Compute the strongly connected component (SCC) of each vertex and return an RDD with the vertex
- * value containing the lowest vertex id in the SCC containing that vertex.
+ * Compute the strongly connected component (SCC) of each vertex and return a graph with the
+ * vertex value containing the lowest vertex id in the SCC containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
@@ -15,12 +17,12 @@ object StronglyConnectedComponents {
*
* @return a graph with vertex attributes containing the smallest vertex id in each SCC
*/
- def run[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int): Graph[Vid, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexID, ED] = {
// the graph we update with final SCC ids, and the graph we return at the end
var sccGraph = graph.mapVertices { case (vid, _) => vid }
// graph we are going to work with in our iterations
- var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }
+ var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }.cache()
var numVertices = sccWorkGraph.numVertices
var iter = 0
@@ -30,10 +32,9 @@ object StronglyConnectedComponents {
numVertices = sccWorkGraph.numVertices
sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) {
(vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
- }
- sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.inDegrees) {
+ }.outerJoinVertices(sccWorkGraph.inDegrees) {
(vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
- }
+ }.cache()
// get all vertices to be removed
val finalVertices = sccWorkGraph.vertices
@@ -45,14 +46,14 @@ object StronglyConnectedComponents {
(vid, scc, opt) => opt.getOrElse(scc)
}
// only keep vertices that are not final
- sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2)
+ sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache()
} while (sccWorkGraph.numVertices < numVertices)
sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) }
// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
- sccWorkGraph = GraphLab[(Vid, Boolean), ED, Vid](sccWorkGraph, Integer.MAX_VALUE)(
+ sccWorkGraph = GraphLab[(VertexID, Boolean), ED, VertexID](sccWorkGraph, Integer.MAX_VALUE)(
(vid, e) => e.otherVertexAttr(vid)._1,
(vid1, vid2) => math.min(vid1, vid2),
(vid, scc, optScc) =>
@@ -62,7 +63,7 @@ object StronglyConnectedComponents {
// start at root of SCCs. Traverse values in reverse, notify all my neighbors
// do not propagate if colors do not match!
- sccWorkGraph = GraphLab[(Vid, Boolean), ED, Boolean](
+ sccWorkGraph = GraphLab[(VertexID, Boolean), ED, Boolean](
sccWorkGraph,
Integer.MAX_VALUE,
EdgeDirection.Out,
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala
index a6384320ba..b5a93c1bd1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/TriangleCount.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
import scala.reflect.ClassTag
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
object TriangleCount {
@@ -46,7 +46,7 @@ object TriangleCount {
(vid, _, optSet) => optSet.getOrElse(null)
}
// Edge function computes intersection of smaller vertex with larger vertex
- def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(Vid, Int)] = {
+ def edgeFunc(et: EdgeTriplet[VertexSet, ED]): Iterator[(VertexID, Int)] = {
assert(et.srcAttr != null)
assert(et.dstAttr != null)
val (smallSet, largeSet) = if (et.srcAttr.size < et.dstAttr.size) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala
new file mode 100644
index 0000000000..fbabf1257c
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/algorithms/package.scala
@@ -0,0 +1,8 @@
+package org.apache.spark.graphx
+
+import scala.reflect.ClassTag
+
+package object algorithms {
+ implicit def graphToAlgorithms[VD: ClassTag, ED: ClassTag](
+ graph: Graph[VD, ED]): Algorithms[VD, ED] = new Algorithms(graph)
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 7ae4d7df43..4176563d22 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
/**
@@ -16,10 +16,10 @@ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
* @tparam ED the edge attribute type.
*/
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
- val srcIds: Array[Vid],
- val dstIds: Array[Vid],
+ val srcIds: Array[VertexID],
+ val dstIds: Array[VertexID],
val data: Array[ED],
- val index: PrimitiveKeyOpenHashMap[Vid, Int]) extends Serializable {
+ val index: PrimitiveKeyOpenHashMap[VertexID, Int]) extends Serializable {
/**
* Reverse all the edges in this partition.
@@ -101,8 +101,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
val builder = new EdgePartitionBuilder[ED]
var firstIter: Boolean = true
- var currSrcId: Vid = nullValue[Vid]
- var currDstId: Vid = nullValue[Vid]
+ var currSrcId: VertexID = nullValue[VertexID]
+ var currDstId: VertexID = nullValue[VertexID]
var currAttr: ED = nullValue[ED]
var i = 0
while (i < size) {
@@ -136,7 +136,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2])
- (f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = {
+ (f: (VertexID, VertexID, ED, ED2) => ED3): EdgePartition[ED3] = {
val builder = new EdgePartitionBuilder[ED3]
var i = 0
var j = 0
@@ -193,14 +193,14 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* iterator is generated using an index scan, so it is efficient at skipping edges that don't
* match srcIdPred.
*/
- def indexIterator(srcIdPred: Vid => Boolean): Iterator[Edge[ED]] =
+ def indexIterator(srcIdPred: VertexID => Boolean): Iterator[Edge[ED]] =
index.iterator.filter(kv => srcIdPred(kv._1)).flatMap(Function.tupled(clusterIterator))
/**
* Get an iterator over the cluster of edges in this partition with source vertex id `srcId`. The
* cluster must start at position `index`.
*/
- private def clusterIterator(srcId: Vid, index: Int) = new Iterator[Edge[ED]] {
+ private def clusterIterator(srcId: VertexID, index: Int) = new Iterator[Edge[ED]] {
private[this] val edge = new Edge[ED]
private[this] var pos = index
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index ae3f3a6d03..d4f08497a2 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -1,9 +1,9 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import scala.util.Sorting
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
@@ -13,22 +13,22 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I
var edges = new PrimitiveVector[Edge[ED]](size)
/** Add a new edge to the partition. */
- def add(src: Vid, dst: Vid, d: ED) {
+ def add(src: VertexID, dst: VertexID, d: ED) {
edges += Edge(src, dst, d)
}
def toEdgePartition: EdgePartition[ED] = {
val edgeArray = edges.trim().array
Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
- val srcIds = new Array[Vid](edgeArray.size)
- val dstIds = new Array[Vid](edgeArray.size)
+ val srcIds = new Array[VertexID](edgeArray.size)
+ val dstIds = new Array[VertexID](edgeArray.size)
val data = new Array[ED](edgeArray.size)
- val index = new PrimitiveKeyOpenHashMap[Vid, Int]
+ val index = new PrimitiveKeyOpenHashMap[VertexID, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
if (edgeArray.length > 0) {
index.update(srcIds(0), 0)
- var currSrcId: Vid = srcIds(0)
+ var currSrcId: VertexID = srcIds(0)
var i = 0
while (i < edgeArray.size) {
srcIds(i) = edgeArray(i).srcId
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index 4d5eb240a9..79fd962ffd 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
@@ -25,7 +25,7 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
// allocating too many temporary Java objects.
private val triplet = new EdgeTriplet[VD, ED]
- private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
+ private val vmap = new PrimitiveKeyOpenHashMap[VertexID, VD](vidToIndex, vertexArray)
override def hasNext: Boolean = pos < edgePartition.size
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 4d35755e7e..987a646c0c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -1,14 +1,14 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.util.collection.PrimitiveVector
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-import org.apache.spark.graph.impl.GraphImpl._
-import org.apache.spark.graph.impl.MsgRDDFunctions._
-import org.apache.spark.graph.util.BytecodeUtils
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.impl.GraphImpl._
+import org.apache.spark.graphx.impl.MsgRDDFunctions._
+import org.apache.spark.graphx.util.BytecodeUtils
import org.apache.spark.rdd.{ShuffledRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ClosureCleaner
@@ -30,20 +30,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val edges: EdgeRDD[ED],
@transient val routingTable: RoutingTable,
@transient val replicatedVertexView: ReplicatedVertexView[VD])
- extends Graph[VD, ED] {
-
- def this(
- vertices: VertexRDD[VD],
- edges: EdgeRDD[ED],
- routingTable: RoutingTable) = {
- this(vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
- }
-
- def this(
- vertices: VertexRDD[VD],
- edges: EdgeRDD[ED]) = {
- this(vertices, edges, new RoutingTable(edges, vertices))
- }
+ extends Graph[VD, ED] with Serializable {
/** Return a RDD that brings edges together with their source and destination vertices. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
@@ -65,11 +52,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+ override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
+ vertices.unpersist(blocking)
+ replicatedVertexView.unpersist(blocking)
+ this
+ }
+
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
val numPartitions = edges.partitions.size
val edTag = classTag[ED]
val newEdges = new EdgeRDD(edges.map { e =>
- val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+ val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class
new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
@@ -84,12 +77,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true).cache())
- new GraphImpl(vertices, newEdges)
+ GraphImpl(vertices, newEdges)
}
override def statistics: Map[String, Any] = {
// Get the total number of vertices after replication, used to compute the replication ratio.
- def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
+ def numReplicatedVertices(vid2pids: RDD[Array[Array[VertexID]]]): Double = {
vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
}
@@ -157,10 +150,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
}
- override def mapVertices[VD2: ClassTag](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
+ override def mapVertices[VD2: ClassTag](f: (VertexID, VD) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
// The map preserves type, so we can use incremental replication
- val newVerts = vertices.mapVertexPartitions(_.map(f))
+ val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = new ReplicatedVertexView[VD2](
changedVerts, edges, routingTable,
@@ -168,18 +161,18 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
} else {
// The map does not preserve type, so we must re-replicate all vertices
- new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
+ GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
}
}
override def mapEdges[ED2: ClassTag](
- f: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+ f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
}
override def mapTriplets[ED2: ClassTag](
- f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
+ f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdTag = classTag[VD]
@@ -208,7 +201,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = x => true,
- vpred: (Vid, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
+ vpred: (VertexID, VD) => Boolean = (a, b) => true): Graph[VD, ED] = {
// Filter the vertices, reusing the partitioner and the index from this graph
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
@@ -250,7 +243,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
//////////////////////////////////////////////////////////////////////////////////////////////////
override def mapReduceTriplets[A: ClassTag](
- mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
+ mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
@@ -280,14 +273,14 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val edgeIter = activeDirectionOpt match {
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
.filter(e => vPart.isActive(e.dstId))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId))
}
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
- edgePartition.indexIterator(srcVid => vPart.isActive(srcVid))
+ edgePartition.indexIterator(srcVertexID => vPart.isActive(srcVertexID))
} else {
edgePartition.iterator.filter(e => vPart.isActive(e.srcId))
}
@@ -318,7 +311,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} // end of mapReduceTriplets
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
- (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
+ (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(updates)(updateF)
@@ -330,7 +323,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(updates)(updateF)
- new GraphImpl(newVerts, edges, routingTable)
+ GraphImpl(newVerts, edges, routingTable)
}
}
@@ -354,13 +347,13 @@ object GraphImpl {
}
def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
- edgePartitions: RDD[(Pid, EdgePartition[ED])],
+ edgePartitions: RDD[(PartitionID, EdgePartition[ED])],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
}
def apply[VD: ClassTag, ED: ClassTag](
- vertices: RDD[(Vid, VD)],
+ vertices: RDD[(VertexID, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
@@ -369,19 +362,41 @@ object GraphImpl {
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
val vPartitioned = vertices.partitionBy(partitioner)
- val vidsFromEdges = collectVidsFromEdges(edgeRDD, partitioner)
+ val vidsFromEdges = collectVertexIDsFromEdges(edgeRDD, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
- new GraphImpl(vertexRDD, edgeRDD)
+ GraphImpl(vertexRDD, edgeRDD)
+ }
+
+ def apply[VD: ClassTag, ED: ClassTag](
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
+ // Cache RDDs that are referenced multiple times
+ edges.cache()
+
+ GraphImpl(vertices, edges, new RoutingTable(edges, vertices))
+ }
+
+ def apply[VD: ClassTag, ED: ClassTag](
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED],
+ routingTable: RoutingTable): GraphImpl[VD, ED] = {
+ // Cache RDDs that are referenced multiple times. `routingTable` is cached by default, so we
+ // don't cache it explicitly.
+ vertices.cache()
+ edges.cache()
+
+ new GraphImpl(
+ vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
}
/**
* Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
- * data structure (RDD[(Vid, Vid, ED)]).
+ * data structure (RDD[(VertexID, VertexID, ED)]).
*
* The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
* pair: the key is the partition id, and the value is an EdgePartition object containing all the
@@ -404,19 +419,19 @@ object GraphImpl {
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
edges.cache()
// Get the set of all vids
- val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ val vids = collectVertexIDsFromEdges(edges, new HashPartitioner(edges.partitions.size))
// Create the VertexRDD.
val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
- new GraphImpl(vertices, edges)
+ GraphImpl(vertices, edges)
}
/** Collects all vids mentioned in edges and partitions them by partitioner. */
- private def collectVidsFromEdges(
+ private def collectVertexIDsFromEdges(
edges: EdgeRDD[_],
- partitioner: Partitioner): RDD[(Vid, Int)] = {
+ partitioner: Partitioner): RDD[(VertexID, Int)] = {
// TODO: Consider doing map side distinct before shuffle.
- new ShuffledRDD[Vid, Int, (Vid, Int)](
- edges.collectVids.map(vid => (vid, 0)), partitioner)
- .setSerializer(classOf[VidMsgSerializer].getName)
+ new ShuffledRDD[VertexID, Int, (VertexID, Int)](
+ edges.collectVertexIDs.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VertexIDMsgSerializer].getName)
}
} // end of object GraphImpl
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
index bf033945de..ad5daf8f6a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala
@@ -1,17 +1,17 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
import org.apache.spark.Partitioner
-import org.apache.spark.graph.{Pid, Vid}
+import org.apache.spark.graphx.{PartitionID, VertexID}
import org.apache.spark.rdd.{ShuffledRDD, RDD}
class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
- @transient var partition: Pid,
- var vid: Vid,
+ @transient var partition: PartitionID,
+ var vid: VertexID,
var data: T)
- extends Product2[Pid, (Vid, T)] with Serializable {
+ extends Product2[PartitionID, (VertexID, T)] with Serializable {
override def _1 = partition
@@ -27,9 +27,9 @@ class VertexBroadcastMsg[@specialized(Int, Long, Double, Boolean) T](
* @param data value to send
*/
class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T](
- @transient var partition: Pid,
+ @transient var partition: PartitionID,
var data: T)
- extends Product2[Pid, T] with Serializable {
+ extends Product2[PartitionID, T] with Serializable {
override def _1 = partition
@@ -41,7 +41,7 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
- val rdd = new ShuffledRDD[Pid, (Vid, T), VertexBroadcastMsg[T]](self, partitioner)
+ val rdd = new ShuffledRDD[PartitionID, (VertexID, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
@@ -62,7 +62,7 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[MessageToPartition[T]] = {
- new ShuffledRDD[Pid, T, MessageToPartition[T]](self, partitioner)
+ new ShuffledRDD[PartitionID, T, MessageToPartition[T]](self, partitioner)
}
}
@@ -77,8 +77,8 @@ object MsgRDDFunctions {
new VertexBroadcastMsgRDDFunctions(rdd)
}
- def partitionForAggregation[T: ClassTag](msgs: RDD[(Vid, T)], partitioner: Partitioner) = {
- val rdd = new ShuffledRDD[Vid, T, (Vid, T)](msgs, partitioner)
+ def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexID, T)], partitioner: Partitioner) = {
+ val rdd = new ShuffledRDD[VertexID, T, (VertexID, T)](msgs, partitioner)
// Set a custom serializer if the data is of int or double type.
if (classTag[T] == ClassTag.Int) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 970acfed27..0e2f5a9dd9 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.{classTag, ClassTag}
@@ -6,7 +6,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
/**
* A view of the vertices after they are shipped to the join sites specified in
@@ -14,9 +14,11 @@ import org.apache.spark.graph._
* specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
* fresh view is created.
*
- * The view is always cached (i.e., once it is created, it remains materialized). This avoids
+ * The view is always cached (i.e., once it is evaluated, it remains materialized). This avoids
* constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
- * example.
+ * example. However, it means iterative algorithms must manually call `Graph.unpersist` on previous
+ * iterations' graphs for best GC performance. See the implementation of
+ * [[org.apache.spark.graphx.Pregel]] for an example.
*/
private[impl]
class ReplicatedVertexView[VD: ClassTag](
@@ -31,9 +33,9 @@ class ReplicatedVertexView[VD: ClassTag](
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
- private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
+ private val localVertexIDMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
case Some(prevView) =>
- prevView.localVidMap
+ prevView.localVertexIDMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
@@ -43,15 +45,25 @@ class ReplicatedVertexView[VD: ClassTag](
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
- }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVidMap")
+ }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIDMap")
}
- private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
- private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false)
- private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
- private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
+ private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true)
+ private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false)
+ private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true)
+ private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false)
+
+ def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = {
+ bothAttrs.unpersist(blocking)
+ srcAttrOnly.unpersist(blocking)
+ dstAttrOnly.unpersist(blocking)
+ noAttrs.unpersist(blocking)
+ // Don't unpersist localVertexIDMap because a future ReplicatedVertexView may be using it
+ // without modification
+ this
+ }
- def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
+ def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = {
(includeSrc, includeDst) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
@@ -63,7 +75,7 @@ class ReplicatedVertexView[VD: ClassTag](
def get(
includeSrc: Boolean,
includeDst: Boolean,
- actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
+ actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = {
// Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
// includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
// shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
@@ -81,7 +93,7 @@ class ReplicatedVertexView[VD: ClassTag](
}
private def create(includeSrc: Boolean, includeDst: Boolean)
- : RDD[(Pid, VertexPartition[VD])] = {
+ : RDD[(PartitionID, VertexPartition[VD])] = {
val vdTag = classTag[VD]
// Ship vertex attributes to edge partitions according to vertexPlacement
@@ -104,8 +116,8 @@ class ReplicatedVertexView[VD: ClassTag](
case None =>
// Within each edge partition, place the shipped vertex attributes into the correct
- // locations specified in localVidMap
- localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
+ // locations specified in localVertexIDMap
+ localVertexIDMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
@@ -128,15 +140,15 @@ class ReplicatedVertexView[VD: ClassTag](
object ReplicatedVertexView {
protected def buildBuffer[VD: ClassTag](
- pid2vidIter: Iterator[Array[Array[Vid]]],
+ pid2vidIter: Iterator[Array[Array[VertexID]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {
- val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
+ val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
val vertexPart: VertexPartition[VD] = vertexPartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val vids = new PrimitiveVector[Vid](pid2vid(pid).size)
+ val vids = new PrimitiveVector[VertexID](pid2vid(pid).size)
val attrs = new PrimitiveVector[VD](pid2vid(pid).size)
var i = 0
while (i < size) {
@@ -152,16 +164,16 @@ object ReplicatedVertexView {
}
protected def buildActiveBuffer(
- pid2vidIter: Iterator[Array[Array[Vid]]],
+ pid2vidIter: Iterator[Array[Array[VertexID]]],
activePartIter: Iterator[VertexPartition[_]])
- : Iterator[(Int, Array[Vid])] = {
- val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
+ : Iterator[(Int, Array[VertexID])] = {
+ val pid2vid: Array[Array[VertexID]] = pid2vidIter.next()
val activePart: VertexPartition[_] = activePartIter.next()
Iterator.tabulate(pid2vid.size) { pid =>
val vidsCandidate = pid2vid(pid)
val size = vidsCandidate.length
- val actives = new PrimitiveVector[Vid](vidsCandidate.size)
+ val actives = new PrimitiveVector[VertexID](vidsCandidate.size)
var i = 0
while (i < size) {
val vid = vidsCandidate(i)
@@ -175,7 +187,8 @@ object ReplicatedVertexView {
}
}
-class VertexAttributeBlock[VD: ClassTag](val vids: Array[Vid], val attrs: Array[VD])
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexID], val attrs: Array[VD])
extends Serializable {
- def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
+ def iterator: Iterator[(VertexID, VD)] =
+ (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
index b6cd048b33..3bd8b24133 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala
@@ -1,7 +1,7 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.PrimitiveVector
@@ -14,12 +14,12 @@ import org.apache.spark.util.collection.PrimitiveVector
*/
class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
- val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true)
- val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false)
- val dstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(false, true)
- val noAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(false, false)
+ val bothAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(true, true)
+ val srcAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(true, false)
+ val dstAttrOnly: RDD[Array[Array[VertexID]]] = createPid2Vid(false, true)
+ val noAttrs: RDD[Array[Array[VertexID]]] = createPid2Vid(false, false)
- def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] =
+ def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] =
(includeSrcAttr, includeDstAttr) match {
case (true, true) => bothAttrs
case (true, false) => srcAttrOnly
@@ -28,10 +28,10 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
}
private def createPid2Vid(
- includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
+ includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexID]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
- val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
+ val vid2pid: RDD[(VertexID, PartitionID)] = edges.partitionsRDD.mapPartitions { iter =>
+ val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
if (includeSrcAttr) { // Add src vertices to the set.
@@ -53,7 +53,7 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
val numPartitions = vertices.partitions.size
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexID])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
index dcf619fa85..1c3c87f08d 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala
@@ -1,18 +1,18 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
import org.apache.spark.SparkConf
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
import org.apache.spark.serializer._
-class VidMsgSerializer(conf: SparkConf) extends Serializer {
+class VertexIDMsgSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new ShuffleSerializerInstance {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, _)]
+ val msg = t.asInstanceOf[(VertexID, _)]
writeVarLong(msg._1, optimizePositive = false)
this
}
@@ -101,7 +101,7 @@ class IntAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, Int)]
+ val msg = t.asInstanceOf[(VertexID, Int)]
writeVarLong(msg._1, optimizePositive = false)
writeUnsignedVarInt(msg._2)
this
@@ -124,7 +124,7 @@ class LongAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, Long)]
+ val msg = t.asInstanceOf[(VertexID, Long)]
writeVarLong(msg._1, optimizePositive = false)
writeVarLong(msg._2, optimizePositive = true)
this
@@ -147,7 +147,7 @@ class DoubleAggMsgSerializer(conf: SparkConf) extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
- val msg = t.asInstanceOf[(Vid, Double)]
+ val msg = t.asInstanceOf[(VertexID, Double)]
writeVarLong(msg._1, optimizePositive = false)
writeDouble(msg._2)
this
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index 7048a40f42..7c83497ca9 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -1,27 +1,27 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.Logging
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
-private[graph] object VertexPartition {
+private[graphx] object VertexPartition {
- def apply[VD: ClassTag](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = {
- val map = new PrimitiveKeyOpenHashMap[Vid, VD]
+ def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)]): VertexPartition[VD] = {
+ val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
iter.foreach { case (k, v) =>
map(k) = v
}
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
}
- def apply[VD: ClassTag](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD)
+ def apply[VD: ClassTag](iter: Iterator[(VertexID, VD)], mergeFunc: (VD, VD) => VD)
: VertexPartition[VD] =
{
- val map = new PrimitiveKeyOpenHashMap[Vid, VD]
+ val map = new PrimitiveKeyOpenHashMap[VertexID, VD]
iter.foreach { case (k, v) =>
map.setMerge(k, v, mergeFunc)
}
@@ -30,7 +30,7 @@ private[graph] object VertexPartition {
}
-private[graph]
+private[graphx]
class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
val index: VertexIdToIndexMap,
val values: Array[VD],
@@ -44,15 +44,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
def size: Int = mask.cardinality()
/** Return the vertex attribute for the given vertex ID. */
- def apply(vid: Vid): VD = values(index.getPos(vid))
+ def apply(vid: VertexID): VD = values(index.getPos(vid))
- def isDefined(vid: Vid): Boolean = {
+ def isDefined(vid: VertexID): Boolean = {
val pos = index.getPos(vid)
pos >= 0 && mask.get(pos)
}
/** Look up vid in activeSet, throwing an exception if it is None. */
- def isActive(vid: Vid): Boolean = {
+ def isActive(vid: VertexID): Boolean = {
activeSet.get.contains(vid)
}
@@ -72,7 +72,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
- def map[VD2: ClassTag](f: (Vid, VD) => VD2): VertexPartition[VD2] = {
+ def map[VD2: ClassTag](f: (VertexID, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](capacity)
var i = mask.nextSetBit(0)
@@ -92,7 +92,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
- def filter(pred: (Vid, VD) => Boolean): VertexPartition[VD] = {
+ def filter(pred: (VertexID, VD) => Boolean): VertexPartition[VD] = {
// Allocate the array to store the results into
val newMask = new BitSet(capacity)
// Iterate over the active bits in the old mask and evaluate the predicate
@@ -130,7 +130,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexPartition[VD2])
- (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
leftJoin(createUsingIndex(other.iterator))(f)
@@ -149,14 +149,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
/** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
- (other: Iterator[(Vid, VD2)])
- (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
+ (other: Iterator[(VertexID, VD2)])
+ (f: (VertexID, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
leftJoin(createUsingIndex(other))(f)
}
/** Inner join another VertexPartition. */
def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
- (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
+ (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f)
@@ -176,15 +176,15 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Inner join an iterator of messages.
*/
def innerJoin[U: ClassTag, VD2: ClassTag]
- (iter: Iterator[Product2[Vid, U]])
- (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
+ (iter: Iterator[Product2[VertexID, U]])
+ (f: (VertexID, VD, U) => VD2): VertexPartition[VD2] = {
innerJoin(createUsingIndex(iter))(f)
}
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
- def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[Vid, VD2]])
+ def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexID, VD2]])
: VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
@@ -202,7 +202,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
- def innerJoinKeepLeft(iter: Iterator[Product2[Vid, VD]]): VertexPartition[VD] = {
+ def innerJoinKeepLeft(iter: Iterator[Product2[VertexID, VD]]): VertexPartition[VD] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD](capacity)
System.arraycopy(values, 0, newValues, 0, newValues.length)
@@ -217,8 +217,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
}
def aggregateUsingIndex[VD2: ClassTag](
- iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] =
- {
+ iter: Iterator[Product2[VertexID, VD2]],
+ reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
iter.foreach { product =>
@@ -237,7 +237,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition[VD2](index, newValues, newMask)
}
- def replaceActives(iter: Iterator[Vid]): VertexPartition[VD] = {
+ def replaceActives(iter: Iterator[VertexID]): VertexPartition[VD] = {
val newActiveSet = new VertexSet
iter.foreach(newActiveSet.add(_))
new VertexPartition(index, values, mask, Some(newActiveSet))
@@ -247,7 +247,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): VertexPartition[VD] = {
- val hashMap = new PrimitiveKeyOpenHashMap[Vid, VD]
+ val hashMap = new PrimitiveKeyOpenHashMap[VertexID, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- this.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
@@ -255,7 +255,8 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet)
}
- def iterator: Iterator[(Vid, VD)] = mask.iterator.map(ind => (index.getValue(ind), values(ind)))
+ def iterator: Iterator[(VertexID, VD)] =
+ mask.iterator.map(ind => (index.getValue(ind), values(ind)))
- def vidIterator: Iterator[Vid] = mask.iterator.map(ind => index.getValue(ind))
+ def vidIterator: Iterator[VertexID] = mask.iterator.map(ind => index.getValue(ind))
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
index 655ae53bf8..96f0d91c9b 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
@@ -3,17 +3,17 @@ package org.apache.spark
import org.apache.spark.util.collection.OpenHashSet
-package object graph {
+package object graphx {
- type Vid = Long
+ type VertexID = Long
// TODO: Consider using Char.
- type Pid = Int
+ type PartitionID = Int
- type VertexSet = OpenHashSet[Vid]
+ type VertexSet = OpenHashSet[VertexID]
// type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
- type VertexIdToIndexMap = OpenHashSet[Vid]
+ type VertexIdToIndexMap = OpenHashSet[VertexID]
/**
* Return the default null-like value for a data type T.
diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala b/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala
index eaff27a33e..81332e0800 100644
--- a/graph/src/main/scala/org/apache/spark/graph/perf/BagelTest.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/perf/BagelTest.scala
@@ -1,6 +1,6 @@
///// This file creates circular dependencies between examples bagle and graph
-// package org.apache.spark.graph.perf
+// package org.apache.spark.graphx.perf
// import org.apache.spark._
// import org.apache.spark.SparkContext._
@@ -8,7 +8,7 @@
// import org.apache.spark.examples.bagel
// //import org.apache.spark.bagel.examples._
-// import org.apache.spark.graph._
+// import org.apache.spark.graphx._
// object BagelTest {
diff --git a/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala b/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala
index 01bd968550..24262640ab 100644
--- a/graph/src/main/scala/org/apache/spark/graph/perf/SparkTest.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/perf/SparkTest.scala
@@ -1,13 +1,13 @@
///// This file creates circular dependencies between examples bagle and graph
-// package org.apache.spark.graph.perf
+// package org.apache.spark.graphx.perf
// import org.apache.spark._
// import org.apache.spark.SparkContext._
// import org.apache.spark.bagel.Bagel
// import org.apache.spark.bagel.examples._
-// import org.apache.spark.graph._
+// import org.apache.spark.graphx._
// object SparkTest {
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index bc00ce2151..ec8d534333 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph.util
+package org.apache.spark.graphx.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index d61f358bb0..57117241ad 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph.util
+package org.apache.spark.graphx.util
import scala.annotation.tailrec
import scala.math._
@@ -10,10 +10,10 @@ import org.apache.spark.serializer._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-import org.apache.spark.graph.Graph
-import org.apache.spark.graph.Edge
-import org.apache.spark.graph.impl.GraphImpl
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.Graph
+import org.apache.spark.graphx.Edge
+import org.apache.spark.graphx.impl.GraphImpl
/**
* @todo cleanup and modularize code
@@ -31,7 +31,7 @@ object GraphGenerators {
val serializer = "org.apache.spark.serializer.KryoSerializer"
System.setProperty("spark.serializer", serializer)
//System.setProperty("spark.shuffle.compress", "false")
- System.setProperty("spark.kryo.registrator", "spark.graph.GraphKryoRegistrator")
+ System.setProperty("spark.kryo.registrator", "spark.graphx.GraphKryoRegistrator")
val host = "local[4]"
val sc = new SparkContext(host, "Lognormal graph generator")
@@ -70,7 +70,7 @@ object GraphGenerators {
val sigma = 1.3
//val vertsAndEdges = (0 until numVertices).flatMap { src => {
- val vertices: RDD[(Vid, Int)] = sc.parallelize(0 until numVertices).map{
+ val vertices: RDD[(VertexID, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices))
}
@@ -92,11 +92,11 @@ object GraphGenerators {
}
- def generateRandomEdges(src: Int, numEdges: Int, maxVid: Int): Array[Edge[Int]] = {
+ def generateRandomEdges(src: Int, numEdges: Int, maxVertexID: Int): Array[Edge[Int]] = {
val rand = new Random()
var dsts: Set[Int] = Set()
while (dsts.size < numEdges) {
- val nextDst = rand.nextInt(maxVid)
+ val nextDst = rand.nextInt(maxVertexID)
if (nextDst != src) {
dsts += nextDst
}
@@ -251,9 +251,9 @@ object GraphGenerators {
*/
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int,Int), Double] = {
// Convert row column address into vertex ids (row major order)
- def sub2ind(r: Int, c: Int): Vid = r * cols + c
+ def sub2ind(r: Int, c: Int): VertexID = r * cols + c
- val vertices: RDD[(Vid, (Int,Int))] =
+ val vertices: RDD[(VertexID, (Int,Int))] =
sc.parallelize(0 until rows).flatMap( r => (0 until cols).map( c => (sub2ind(r,c), (r,c)) ) )
val edges: RDD[Edge[Double]] =
vertices.flatMap{ case (vid, (r,c)) =>
@@ -273,7 +273,7 @@ object GraphGenerators {
* being the center vertex.
*/
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int] = {
- val edges: RDD[(Vid, Vid)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
+ val edges: RDD[(VertexID, VertexID)] = sc.parallelize(1 until nverts).map(vid => (vid, 0))
Graph.fromEdgeTuples(edges, 1)
} // end of starGraph
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala
index cb18ef3d26..7a79d33350 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/HashUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/HashUtils.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph.util
+package org.apache.spark.graphx.util
object HashUtils {
diff --git a/graph/src/test/resources/log4j.properties b/graphx/src/test/resources/log4j.properties
index 896936d8c4..85e57f0c4b 100644
--- a/graph/src/test/resources/log4j.properties
+++ b/graphx/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=graph/target/unit-tests.log
+log4j.appender.file.file=graphx/target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index 9e9213631f..cd3c0bbd30 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import org.apache.spark.SparkContext
-import org.apache.spark.graph.Graph._
-import org.apache.spark.graph.impl.EdgePartition
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd._
import org.scalatest.FunSuite
@@ -11,7 +11,8 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test("aggregateNeighbors") {
withSpark { sc =>
val n = 3
- val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))), 1)
+ val star =
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID))), 1)
val indegrees = star.aggregateNeighbors(
(vid, edge) => Some(1),
@@ -26,21 +27,22 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
assert(outdegrees.collect().toSet === Set((0, n)))
val noVertexValues = star.aggregateNeighbors[Int](
- (vid: Vid, edge: EdgeTriplet[Int, Int]) => None,
+ (vid: VertexID, edge: EdgeTriplet[Int, Int]) => None,
(a: Int, b: Int) => throw new Exception("reduceFunc called unexpectedly"),
EdgeDirection.In)
- assert(noVertexValues.collect().toSet === Set.empty[(Vid, Int)])
+ assert(noVertexValues.collect().toSet === Set.empty[(VertexID, Int)])
}
}
test("joinVertices") {
withSpark { sc =>
- val vertices = sc.parallelize(Seq[(Vid, String)]((1, "one"), (2, "two"), (3, "three")), 2)
+ val vertices =
+ sc.parallelize(Seq[(VertexID, String)]((1, "one"), (2, "two"), (3, "three")), 2)
val edges = sc.parallelize((Seq(Edge(1, 2, "onetwo"))))
val g: Graph[String, String] = Graph(vertices, edges)
- val tbl = sc.parallelize(Seq[(Vid, Int)]((1, 10), (2, 20)))
- val g1 = g.joinVertices(tbl) { (vid: Vid, attr: String, u: Int) => attr + u }
+ val tbl = sc.parallelize(Seq[(VertexID, Int)]((1, 10), (2, 20)))
+ val g1 = g.joinVertices(tbl) { (vid: VertexID, attr: String, u: Int) => attr + u }
val v = g1.vertices.collect().toSet
assert(v === Set((1, "one10"), (2, "two20"), (3, "three")))
@@ -51,8 +53,8 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val chain = (0 until 100).map(x => (x, (x+1)%100) )
val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val graph = Graph.fromEdgeTuples(rawEdges, 1.0)
- val nbrs = graph.collectNeighborIds(EdgeDirection.Both)
+ val graph = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
+ val nbrs = graph.collectNeighborIds(EdgeDirection.Both).cache()
assert(nbrs.count === chain.size)
assert(graph.numVertices === nbrs.count)
nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) }
@@ -67,16 +69,16 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
test ("filter") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
- val graph: Graph[Int, Int] = Graph(vertices, edges)
+ val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
val filteredGraph = graph.filter(
graph => {
val degrees: VertexRDD[Int] = graph.outDegrees
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
},
- vpred = (vid: Vid, deg:Int) => deg > 0
- )
+ vpred = (vid: VertexID, deg:Int) => deg > 0
+ ).cache()
val v = filteredGraph.vertices.collect().toSet
assert(v === Set((0,0)))
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index e6c19dbc40..c32a6cbb81 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -1,15 +1,15 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
-import org.apache.spark.graph.Graph._
+import org.apache.spark.graphx.Graph._
import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
def starGraph(sc: SparkContext, n: Int): Graph[String, Int] = {
- Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
+ Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: VertexID, x: VertexID)), 3), "v")
}
test("Graph.fromEdgeTuples") {
@@ -39,7 +39,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L)
val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) }
- val vertices: RDD[(Vid, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
+ val vertices: RDD[(VertexID, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true)))
val graph = Graph(vertices, edges, false)
assert( graph.edges.count() === rawEdges.size )
// Vertices not explicitly provided but referenced by edges should be created automatically
@@ -56,7 +56,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val star = starGraph(sc, n)
assert(star.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)).collect.toSet ===
- (1 to n).map(x => (0: Vid, x: Vid, "v", "v")).toSet)
+ (1 to n).map(x => (0: VertexID, x: VertexID, "v", "v")).toSet)
}
}
@@ -92,7 +92,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val p = 100
val verts = 1 to n
val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x =>
- verts.filter(y => y % x == 0).map(y => (x: Vid, y: Vid))), p), 0)
+ verts.filter(y => y % x == 0).map(y => (x: VertexID, y: VertexID))), p), 0)
assert(graph.edges.partitions.length === p)
val partitionedGraph = graph.partitionBy(EdgePartition2D)
assert(graph.edges.partitions.length === p)
@@ -118,10 +118,10 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val star = starGraph(sc, n)
// mapVertices preserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
- assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: Vid, "v2")).toSet)
+ assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, "v2")).toSet)
// mapVertices changing type
val mappedVAttrs2 = star.mapVertices((vid, attr) => attr.length)
- assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: Vid, 1)).toSet)
+ assert(mappedVAttrs2.vertices.collect.toSet === (0 to n).map(x => (x: VertexID, 1)).toSet)
}
}
@@ -150,7 +150,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
- assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: Vid, 1)).toSet)
+ assert(star.reverse.outDegrees.collect.toSet === (1 to n).map(x => (x: VertexID, 1)).toSet)
}
}
@@ -173,9 +173,9 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mask") {
withSpark { sc =>
val n = 5
- val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
+ val vertices = sc.parallelize((0 to n).map(x => (x:VertexID, x)))
val edges = sc.parallelize((1 to n).map(x => Edge(0, x, x)))
- val graph: Graph[Int, Int] = Graph(vertices, edges)
+ val graph: Graph[Int, Int] = Graph(vertices, edges).cache()
val subgraph = graph.subgraph(
e => e.dstId != 4L,
@@ -199,7 +199,8 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val n = 5
val star = starGraph(sc, n)
val doubleStar = Graph.fromEdgeTuples(
- sc.parallelize((1 to n).flatMap(x => List((0: Vid, x: Vid), (0: Vid, x: Vid))), 1), "v")
+ sc.parallelize((1 to n).flatMap(x =>
+ List((0: VertexID, x: VertexID), (0: VertexID, x: VertexID))), 1), "v")
val star2 = doubleStar.groupEdges { (a, b) => a}
assert(star2.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]) ===
star.edges.collect.toArray.sorted(Edge.lexicographicOrdering[Int]))
@@ -210,7 +211,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("mapReduceTriplets") {
withSpark { sc =>
val n = 5
- val star = starGraph(sc, n).mapVertices { (_, _) => 0 }
+ val star = starGraph(sc, n).mapVertices { (_, _) => 0 }.cache()
val starDeg = star.joinVertices(star.degrees){ (vid, oldV, deg) => deg }
val neighborDegreeSums = starDeg.mapReduceTriplets(
edge => Iterator((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
@@ -218,7 +219,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
// activeSetOpt
- val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: Vid, y: Vid)
+ val allPairs = for (x <- 1 to n; y <- 1 to n) yield (x: VertexID, y: VertexID)
val complete = Graph.fromEdgeTuples(sc.parallelize(allPairs, 3), 0)
val vids = complete.mapVertices((vid, attr) => vid).cache()
val active = vids.vertices.filter { case (vid, attr) => attr % 2 == 0 }
@@ -229,12 +230,12 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.srcId, 1))
}, (a: Int, b: Int) => a + b, Some((active, EdgeDirection.In))).collect.toSet
- assert(numEvenNeighbors === (1 to n).map(x => (x: Vid, n / 2)).toSet)
+ assert(numEvenNeighbors === (1 to n).map(x => (x: VertexID, n / 2)).toSet)
// outerJoinVertices followed by mapReduceTriplets(activeSetOpt)
- val ring = Graph.fromEdgeTuples(sc.parallelize((0 until n).map(x => (x: Vid, (x+1) % n: Vid)), 3), 0)
- .mapVertices((vid, attr) => vid).cache()
- val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_)
+ val ringEdges = sc.parallelize((0 until n).map(x => (x: VertexID, (x+1) % n: VertexID)), 3)
+ val ring = Graph.fromEdgeTuples(ringEdges, 0) .mapVertices((vid, attr) => vid).cache()
+ val changed = ring.vertices.filter { case (vid, attr) => attr % 2 == 1 }.mapValues(-_).cache()
val changedGraph = ring.outerJoinVertices(changed) { (vid, old, newOpt) => newOpt.getOrElse(old) }
val numOddNeighbors = changedGraph.mapReduceTriplets(et => {
// Map function should only run on edges with source in the active set
@@ -243,7 +244,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
Iterator((et.dstId, 1))
}, (a: Int, b: Int) => a + b, Some(changed, EdgeDirection.Out)).collect.toSet
- assert(numOddNeighbors === (2 to n by 2).map(x => (x: Vid, 1)).toSet)
+ assert(numOddNeighbors === (2 to n by 2).map(x => (x: VertexID, 1)).toSet)
}
}
@@ -251,14 +252,14 @@ class GraphSuite extends FunSuite with LocalSparkContext {
test("outerJoinVertices") {
withSpark { sc =>
val n = 5
- val reverseStar = starGraph(sc, n).reverse
+ val reverseStar = starGraph(sc, n).reverse.cache()
// outerJoinVertices changing type
val reverseStarDegrees =
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
- assert(neighborDegreeSums === Set((0: Vid, n)) ++ (1 to n).map(x => (x: Vid, 0)))
+ assert(neighborDegreeSums === Set((0: VertexID, n)) ++ (1 to n).map(x => (x: VertexID, 0)))
// outerJoinVertices preserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
diff --git a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
index 5c20d559aa..6aec2ea8a9 100644
--- a/graph/src/test/scala/org/apache/spark/graph/LocalSparkContext.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import org.scalatest.Suite
import org.scalatest.BeforeAndAfterEach
@@ -12,7 +12,7 @@ import org.apache.spark.SparkContext
*/
trait LocalSparkContext {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
+ System.setProperty("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
def withSpark[T](f: SparkContext => T) = {
diff --git a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
index 44182e85ee..1ff3d75633 100644
--- a/graph/src/test/scala/org/apache/spark/graph/PregelSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/PregelSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import org.scalatest.FunSuite
@@ -10,7 +10,8 @@ class PregelSuite extends FunSuite with LocalSparkContext {
test("1 iteration") {
withSpark { sc =>
val n = 5
- val star = Graph.fromEdgeTuples(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid)), 3), "v")
+ val starEdges = (1 to n).map(x => (0: VertexID, x: VertexID))
+ val star = Graph.fromEdgeTuples(sc.parallelize(starEdges, 3), "v").cache()
val result = Pregel(star, 0)(
(vid, attr, msg) => attr,
et => Iterator.empty,
@@ -23,11 +24,12 @@ class PregelSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val chain = Graph.fromEdgeTuples(
- sc.parallelize((1 until n).map(x => (x: Vid, x + 1: Vid)), 3),
+ sc.parallelize((1 until n).map(x => (x: VertexID, x + 1: VertexID)), 3),
0).cache()
- assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: Vid, 0)).toSet)
- val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }
- assert(chainWithSeed.vertices.collect.toSet === Set((1: Vid, 1)) ++ (2 to n).map(x => (x: Vid, 0)).toSet)
+ assert(chain.vertices.collect.toSet === (1 to n).map(x => (x: VertexID, 0)).toSet)
+ val chainWithSeed = chain.mapVertices { (vid, attr) => if (vid == 1) 1 else 0 }.cache()
+ assert(chainWithSeed.vertices.collect.toSet ===
+ Set((1: VertexID, 1)) ++ (2 to n).map(x => (x: VertexID, 0)).toSet)
val result = Pregel(chainWithSeed, 0)(
(vid, attr, msg) => math.max(msg, attr),
et => Iterator((et.dstId, et.srcAttr)),
diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
index 4014cbe440..3ba412c1f8 100644
--- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/SerializerSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import java.io.{EOFException, ByteArrayInputStream, ByteArrayOutputStream}
@@ -7,8 +7,8 @@ import scala.util.Random
import org.scalatest.FunSuite
import org.apache.spark._
-import org.apache.spark.graph.impl._
-import org.apache.spark.graph.impl.MsgRDDFunctions._
+import org.apache.spark.graphx.impl._
+import org.apache.spark.graphx.impl.MsgRDDFunctions._
import org.apache.spark.serializer.SerializationStream
@@ -82,7 +82,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("IntAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: Vid, 5)
+ val outMsg = (4: VertexID, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -90,8 +90,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (Vid, Int) = inStrm.readObject()
- val inMsg2: (Vid, Int) = inStrm.readObject()
+ val inMsg1: (VertexID, Int) = inStrm.readObject()
+ val inMsg2: (VertexID, Int) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -102,7 +102,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("LongAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: Vid, 1L << 32)
+ val outMsg = (4: VertexID, 1L << 32)
val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -110,8 +110,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (Vid, Long) = inStrm.readObject()
- val inMsg2: (Vid, Long) = inStrm.readObject()
+ val inMsg1: (VertexID, Long) = inStrm.readObject()
+ val inMsg2: (VertexID, Long) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
@@ -122,7 +122,7 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
test("DoubleAggMsgSerializer") {
val conf = new SparkConf(false)
- val outMsg = (4: Vid, 5.0)
+ val outMsg = (4: VertexID, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer(conf).newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
@@ -130,8 +130,8 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer(conf).newInstance().deserializeStream(bin)
- val inMsg1: (Vid, Double) = inStrm.readObject()
- val inMsg2: (Vid, Double) = inStrm.readObject()
+ val inMsg1: (VertexID, Double) = inStrm.readObject()
+ val inMsg2: (VertexID, Double) = inStrm.readObject()
assert(outMsg === inMsg1)
assert(outMsg === inMsg2)
diff --git a/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index e876b8e4e8..d94a3aa67c 100644
--- a/graph/src/test/scala/org/apache/spark/graph/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -1,8 +1,8 @@
-package org.apache.spark.graph
+package org.apache.spark.graphx
import org.apache.spark.SparkContext
-import org.apache.spark.graph.Graph._
-import org.apache.spark.graph.impl.EdgePartition
+import org.apache.spark.graphx.Graph._
+import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd._
import org.scalatest.FunSuite
@@ -33,8 +33,8 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
test("diff") {
withSpark { sc =>
val n = 100
- val verts = vertices(sc, n)
- val flipEvens = verts.mapValues(x => if (x % 2 == 0) -x else x)
+ val verts = vertices(sc, n).cache()
+ val flipEvens = verts.mapValues(x => if (x % 2 == 0) -x else x).cache()
// diff should keep only the changed vertices
assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
// diff should keep the vertex values from `other`
@@ -45,8 +45,8 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
test("leftJoin") {
withSpark { sc =>
val n = 100
- val verts = vertices(sc, n)
- val evens = verts.filter(q => ((q._2 % 2) == 0))
+ val verts = vertices(sc, n).cache()
+ val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// leftJoin with another VertexRDD
assert(verts.leftJoin(evens) { (id, a, bOpt) => a - bOpt.getOrElse(0) }.collect.toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet ++ (1 to n by 2).map(x => (x.toLong, x)).toSet)
@@ -60,8 +60,8 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
test("innerJoin") {
withSpark { sc =>
val n = 100
- val verts = vertices(sc, n)
- val evens = verts.filter(q => ((q._2 % 2) == 0))
+ val verts = vertices(sc, n).cache()
+ val evens = verts.filter(q => ((q._2 % 2) == 0)).cache()
// innerJoin with another VertexRDD
assert(verts.innerJoin(evens) { (id, a, b) => a - b }.collect.toSet ===
(0 to n by 2).map(x => (x.toLong, 0)).toSet)
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala
index 81a1b7337f..16fc3fe5a2 100644
--- a/graph/src/test/scala/org/apache/spark/graph/algorithms/ConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/ConnectedComponentsSuite.scala
@@ -1,11 +1,11 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._
@@ -13,8 +13,8 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
test("Grid Connected Components") {
withSpark { sc =>
- val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).cache()
- val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
+ val ccGraph = gridGraph.connectedComponents()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
assert(maxCCid === 0)
}
@@ -23,8 +23,8 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
test("Reverse Grid Connected Components") {
withSpark { sc =>
- val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse.cache()
- val ccGraph = ConnectedComponents.run(gridGraph).cache()
+ val gridGraph = GraphGenerators.gridGraph(sc, 10, 10).reverse
+ val ccGraph = gridGraph.connectedComponents()
val maxCCid = ccGraph.vertices.map { case (vid, ccId) => ccId }.sum
assert(maxCCid === 0)
}
@@ -36,8 +36,8 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
- val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val twoChains = Graph.fromEdgeTuples(rawEdges, 1.0)
+ val ccGraph = twoChains.connectedComponents()
val vertices = ccGraph.vertices.collect()
for ( (id, cc) <- vertices ) {
if(id < 10) { assert(cc === 0) }
@@ -59,8 +59,8 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
val chain1 = (0 until 9).map(x => (x, x+1) )
val chain2 = (10 until 20).map(x => (x, x+1) )
val rawEdges = sc.parallelize(chain1 ++ chain2, 3).map { case (s,d) => (s.toLong, d.toLong) }
- val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse.cache()
- val ccGraph = ConnectedComponents.run(twoChains).cache()
+ val twoChains = Graph.fromEdgeTuples(rawEdges, true).reverse
+ val ccGraph = twoChains.connectedComponents()
val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) {
if (id < 10) {
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala
index 81d82a5a6b..de2c2d1107 100644
--- a/graph/src/test/scala/org/apache/spark/graph/algorithms/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/PageRankSuite.scala
@@ -1,15 +1,14 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
import org.scalatest.FunSuite
-import org.apache.spark.graph._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.algorithms._
+import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._
-import org.apache.spark.graph.util.GraphGenerators
-
-
object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double) = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
@@ -58,8 +57,8 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
val resetProb = 0.15
val errorTol = 1.0e-5
- val staticRanks1 = PageRank.run(starGraph, numIter = 1, resetProb).vertices.cache()
- val staticRanks2 = PageRank.run(starGraph, numIter = 2, resetProb).vertices.cache()
+ val staticRanks1 = starGraph.staticPageRank(numIter = 1, resetProb).vertices
+ val staticRanks2 = starGraph.staticPageRank(numIter = 2, resetProb).vertices.cache()
// Static PageRank should only take 2 iterations to converge
val notMatching = staticRanks1.innerZipJoin(staticRanks2) { (vid, pr1, pr2) =>
@@ -74,10 +73,8 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
}
assert(staticErrors.sum === 0)
- val dynamicRanks = PageRank.runUntillConvergence(starGraph, 0, resetProb).vertices.cache()
- val standaloneRanks = PageRank.runStandalone(starGraph, 0, resetProb).cache()
+ val dynamicRanks = starGraph.pageRank(0, resetProb).vertices.cache()
assert(compareRanks(staticRanks2, dynamicRanks) < errorTol)
- assert(compareRanks(staticRanks2, standaloneRanks) < errorTol)
}
} // end of test Star PageRank
@@ -93,14 +90,12 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
val errorTol = 1.0e-5
val gridGraph = GraphGenerators.gridGraph(sc, rows, cols).cache()
- val staticRanks = PageRank.run(gridGraph, numIter, resetProb).vertices.cache()
- val dynamicRanks = PageRank.runUntillConvergence(gridGraph, tol, resetProb).vertices.cache()
- val standaloneRanks = PageRank.runStandalone(gridGraph, tol, resetProb).cache()
- val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb)))
+ val staticRanks = gridGraph.staticPageRank(numIter, resetProb).vertices.cache()
+ val dynamicRanks = gridGraph.pageRank(tol, resetProb).vertices.cache()
+ val referenceRanks = VertexRDD(sc.parallelize(GridPageRank(rows, cols, numIter, resetProb))).cache()
assert(compareRanks(staticRanks, referenceRanks) < errorTol)
assert(compareRanks(dynamicRanks, referenceRanks) < errorTol)
- assert(compareRanks(standaloneRanks, referenceRanks) < errorTol)
}
} // end of Grid PageRank
@@ -115,12 +110,10 @@ class PageRankSuite extends FunSuite with LocalSparkContext {
val numIter = 10
val errorTol = 1.0e-5
- val staticRanks = PageRank.run(chain, numIter, resetProb).vertices.cache()
- val dynamicRanks = PageRank.runUntillConvergence(chain, tol, resetProb).vertices.cache()
- val standaloneRanks = PageRank.runStandalone(chain, tol, resetProb).cache()
+ val staticRanks = chain.staticPageRank(numIter, resetProb).vertices
+ val dynamicRanks = chain.pageRank(tol, resetProb).vertices
assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
- assert(compareRanks(dynamicRanks, standaloneRanks) < errorTol)
}
}
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala
index 411dd3d336..7bd93e0e6c 100644
--- a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/SVDPlusPlusSuite.scala
@@ -1,29 +1,30 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._
-class SvdppSuite extends FunSuite with LocalSparkContext {
+class SVDPlusPlusSuite extends FunSuite with LocalSparkContext {
test("Test SVD++ with mean square error on training set") {
withSpark { sc =>
- val SvdppErr = 8.0
+ val svdppErr = 8.0
val edges = sc.textFile("mllib/data/als/test.data").map { line =>
val fields = line.split(",")
Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
}
- val conf = new SvdppConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
- var (graph, u) = Svdpp.run(edges, conf)
+ val conf = new SVDPlusPlusConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
+ var (graph, u) = SVDPlusPlus.run(edges, conf)
+ graph.cache()
val err = graph.vertices.collect.map{ case (vid, vd) =>
if (vid % 2 == 1) vd._4 else 0.0
}.reduce(_ + _) / graph.triplets.collect.size
- assert(err <= SvdppErr)
+ assert(err <= svdppErr)
}
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala
index 4afb158a68..fee7d20161 100644
--- a/graph/src/test/scala/org/apache/spark/graph/algorithms/StronglyConnectedComponentsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/StronglyConnectedComponentsSuite.scala
@@ -1,11 +1,11 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._
@@ -16,7 +16,7 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
val vertices = sc.parallelize((1L to 5L).map(x => (x, -1)))
val edges = sc.parallelize(Seq.empty[Edge[Int]])
val graph = Graph(vertices, edges)
- val sccGraph = StronglyConnectedComponents.run(graph, 5)
+ val sccGraph = graph.stronglyConnectedComponents(5)
for ((id, scc) <- sccGraph.vertices.collect) {
assert(id == scc)
}
@@ -27,7 +27,7 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val rawEdges = sc.parallelize((0L to 6L).map(x => (x, (x + 1) % 7)))
val graph = Graph.fromEdgeTuples(rawEdges, -1)
- val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ val sccGraph = graph.stronglyConnectedComponents(20)
for ((id, scc) <- sccGraph.vertices.collect) {
assert(0L == scc)
}
@@ -42,7 +42,7 @@ class StronglyConnectedComponentsSuite extends FunSuite with LocalSparkContext {
Array(6L -> 0L, 5L -> 7L)
val rawEdges = sc.parallelize(edges)
val graph = Graph.fromEdgeTuples(rawEdges, -1)
- val sccGraph = StronglyConnectedComponents.run(graph, 20)
+ val sccGraph = graph.stronglyConnectedComponents(20)
for ((id, scc) <- sccGraph.vertices.collect) {
if (id < 3)
assert(0L == scc)
diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala
index 274ab11f0c..b85b289da6 100644
--- a/graph/src/test/scala/org/apache/spark/graph/algorithms/TriangleCountSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/algorithms/TriangleCountSuite.scala
@@ -1,11 +1,11 @@
-package org.apache.spark.graph.algorithms
+package org.apache.spark.graphx.algorithms
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-import org.apache.spark.graph.util.GraphGenerators
+import org.apache.spark.graphx._
+import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._
@@ -15,7 +15,7 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val rawEdges = sc.parallelize(Array( 0L->1L, 1L->2L, 2L->0L ), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
- val triangleCount = TriangleCount.run(graph)
+ val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }
}
@@ -27,7 +27,7 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
- val triangleCount = TriangleCount.run(graph)
+ val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
@@ -47,7 +47,7 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
val revTriangles = triangles.map { case (a,b) => (b,a) }
val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
- val triangleCount = TriangleCount.run(graph)
+ val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
verts.collect().foreach { case (vid, count) =>
if (vid == 0) {
@@ -64,7 +64,7 @@ class TriangleCountSuite extends FunSuite with LocalSparkContext {
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
- val triangleCount = TriangleCount.run(graph)
+ val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index f951fd7a82..eb82436f09 100644
--- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -1,11 +1,11 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
import scala.util.Random
import org.scalatest.FunSuite
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
class EdgePartitionSuite extends FunSuite {
@@ -62,7 +62,7 @@ class EdgePartitionSuite extends FunSuite {
test("innerJoin") {
def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
val builder = new EdgePartitionBuilder[A]
- for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }
+ for ((src, dst, attr) <- xs) { builder.add(src: VertexID, dst: VertexID, attr) }
builder.toEdgePartition
}
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index 72579a48c2..d37d64e8c8 100644
--- a/graph/src/test/scala/org/apache/spark/graph/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -1,6 +1,6 @@
-package org.apache.spark.graph.impl
+package org.apache.spark.graphx.impl
-import org.apache.spark.graph._
+import org.apache.spark.graphx._
import org.scalatest.FunSuite
class VertexPartitionSuite extends FunSuite {
diff --git a/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala
index d85e877ddf..11db339750 100644
--- a/graph/src/test/scala/org/apache/spark/graph/util/BytecodeUtilsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala
@@ -1,4 +1,4 @@
-package org.apache.spark.graph.util
+package org.apache.spark.graphx.util
import org.scalatest.FunSuite
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c2cd6fb45a..c2b1c0c35c 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -48,20 +48,20 @@ object SparkBuild extends Build {
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings)
- .dependsOn(core, graph, bagel, mllib)
+ .dependsOn(core, graphx, bagel, mllib)
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
- lazy val graph = Project("graph", file("graph"), settings = graphSettings) dependsOn(core)
+ lazy val graphx = Project("graphx", file("graphx"), settings = graphxSettings) dependsOn(core)
lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core)
lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
- .dependsOn(core, graph, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
+ .dependsOn(core, graphx, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
lazy val assembleDeps = TaskKey[Unit]("assemble-deps", "Build assembly of dependencies and packages Spark projects")
@@ -111,10 +111,10 @@ object SparkBuild extends Build {
lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, graph, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
+ .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter) dependsOn(allExternal: _*)
// Everything except assembly, tools and examples belong to packageProjects
- lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graph) ++ maybeYarnRef
+ lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx) ++ maybeYarnRef
lazy val allProjects = packageProjects ++ allExternalRefs ++ Seq[ProjectReference](examples, tools, assemblyProj)
@@ -308,7 +308,7 @@ object SparkBuild extends Build {
name := "spark-tools"
) ++ assemblySettings ++ extraAssemblySettings
- def graphSettings = sharedSettings ++ Seq(
+ def graphxSettings = sharedSettings ++ Seq(
name := "spark-graphx"
)