aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-13 17:40:36 -0800
committerReynold Xin <rxin@apache.org>2014-01-13 17:40:36 -0800
commit02a8f54bfa4572908d2d605a85e7a5adf9a36fbc (patch)
tree85acf6b51b3ca5c4c9ca3f53ec24acfaa5f14838 /graphx
parentdc041cd3b6b3b75df42d9a74dcf95179a25ee50f (diff)
downloadspark-02a8f54bfa4572908d2d605a85e7a5adf9a36fbc.tar.gz
spark-02a8f54bfa4572908d2d605a85e7a5adf9a36fbc.tar.bz2
spark-02a8f54bfa4572908d2d605a85e7a5adf9a36fbc.zip
Miscel doc update.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Edge.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala75
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala1
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala14
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala7
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala15
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala95
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala1
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala43
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/package.scala5
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala7
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala2
17 files changed, 158 insertions, 143 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 85f27d2c8d..6c396c3dbe 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -13,7 +13,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED]
/** The vertex id of the target vertex. */
var dstId: VertexID = 0,
/** The attribute associated with the edge. */
- var attr: ED = nullValue[ED])
+ var attr: ED = null.asInstanceOf[ED])
extends Serializable {
/**
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
index 057d63a0ac..4253b24b5a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala
@@ -1,7 +1,5 @@
package org.apache.spark.graphx
-import org.apache.spark.graphx.impl.VertexPartition
-
/**
* An edge triplet represents an edge along with the vertex attributes of its neighboring vertices.
*
@@ -47,5 +45,5 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
def vertexAttr(vid: VertexID): VD =
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
- override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
+ override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 7d4f0de3d6..d2ba6fde4a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -45,7 +45,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
- * the adjacent vertices.
+ * the adjacent vertices. The caller should use [[edges]] if the vertex data are not needed, i.e.
+ * if only the edge data and adjacent vertex ids are needed.
*
* @return an RDD containing edge triplets
*
@@ -54,13 +55,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* different color.
* {{{
* type Color = Int
- * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv")
+ * val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv")
* val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
* }}}
- *
- * @see `edges` if only the edge data and adjacent vertex ids are
- * required.
- *
*/
val triplets: RDD[EdgeTriplet[VD, ED]]
@@ -68,9 +65,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* Caches the vertices and edges associated with this graph at the specified storage level.
*
* @param newLevel the level at which to cache the graph.
-
- * @return A reference to this graph for convenience.
*
+ * @return A reference to this graph for convenience.
*/
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
@@ -159,8 +155,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* @tparam ED2 the new edge data type
*
*/
- def mapEdges[ED2: ClassTag](
- map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
+ def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2])
+ : Graph[VD, ED2]
/**
* Transforms each edge attribute using the map function, passing it the adjacent vertex attributes
@@ -203,9 +199,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* @tparam ED2 the new edge data type
*
*/
- def mapTriplets[ED2: ClassTag](
- map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
- Graph[VD, ED2]
+ def mapTriplets[ED2: ClassTag](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
+ : Graph[VD, ED2]
/**
* Reverses all edges in the graph. If this graph contains an edge from a to b then the returned
@@ -233,8 +228,10 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* @return the subgraph containing only the vertices and edges that
* satisfy the predicates
*/
- def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
- vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
+ def subgraph(
+ epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
+ vpred: (VertexID, VD) => Boolean = ((v, d) => true))
+ : Graph[VD, ED]
/**
* Restricts the graph to only the vertices and edges that are also in `other`, but keeps the
@@ -249,14 +246,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* Merges multiple edges between two vertices into a single edge. For correct results, the graph
* must have been partitioned using [[partitionBy]].
*
- * @tparam ED2 the type of the resulting edge data after grouping.
- *
- * @param f the user-supplied commutative associative function to merge edge attributes for
- * duplicate edges.
+ * @param merge the user-supplied commutative associative function to merge edge attributes
+ * for duplicate edges.
*
* @return The resulting graph with a single edge for each (source, dest) vertex pair.
*/
- def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
+ def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
/**
* Computes statistics about the neighboring edges and vertices of each vertex. The user supplied
@@ -270,7 +265,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* more messages to neighboring vertices
*
* @param reduceFunc the user defined reduce function which should
- * be commutative and assosciative and is used to combine the output
+ * be commutative and associative and is used to combine the output
* of the map phase
*
* @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider
@@ -301,21 +296,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
/**
* Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. The
- * input table should contain at most one entry for each vertex. If no entry in `table` is
+ * input table should contain at most one entry for each vertex. If no entry in `other` is
* provided for a particular vertex in the graph, the map function receives `None`.
*
* @tparam U the type of entry in the table of updates
* @tparam VD2 the new vertex value type
*
- * @param table the table to join with the vertices in the graph.
- * The table should contain at most one entry for each vertex.
- *
- * @param mapFunc the function used to compute the new vertex
- * values. The map function is invoked for all vertices, even those
- * that do not have a corresponding entry in the table.
+ * @param other the table to join with the vertices in the graph.
+ * The table should contain at most one entry for each vertex.
+ * @param mapFunc the function used to compute the new vertex values.
+ * The map function is invoked for all vertices, even those
+ * that do not have a corresponding entry in the table.
*
* @example This function is used to update the vertices with new values based on external data.
- * For example we could add the out-degree to each vertex record:
+ * For example we could add the out-degree to each vertex record:
*
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
@@ -324,20 +318,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] {
* (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
- *
*/
- def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)])
+ def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
+ /**
+ * The associated [[GraphOps]] object.
+ */
// Save a copy of the GraphOps object so there is always one unique GraphOps object
// for a given Graph object, and thus the lazy vals in GraphOps would work as intended.
val ops = new GraphOps(this)
} // end of Graph
-
-
/**
* The Graph object contains a collection of routines used to construct graphs from RDDs.
*/
@@ -357,7 +351,8 @@ object Graph {
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexID, VertexID)],
defaultValue: VD,
- uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
+ uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
+ {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue)
uniqueEdges match {
@@ -391,10 +386,8 @@ object Graph {
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
- * @param defaultVertexAttr the default vertex attribute to use for
- * vertices that are mentioned in edges but not in vertices
- * @param partitionStrategy the partition strategy to use when
- * partitioning the edges
+ * @param defaultVertexAttr the default vertex attribute to use for vertices that are
+ * mentioned in edges but not in vertices
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexID, VD)],
@@ -406,9 +399,9 @@ object Graph {
/**
* Implicitly extracts the [[GraphOps]] member from a graph.
*
- * To improve modularity the Graph type only contains a small set of basic operations. All the
- * convenience operations are defined in the [[GraphOps]] class which may be shared across multiple
- * graph implementations.
+ * To improve modularity the Graph type only contains a small set of basic operations.
+ * All the convenience operations are defined in the [[GraphOps]] class which may be
+ * shared across multiple graph implementations.
*/
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops
} // end of Graph object
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 3c06a403ea..7bdb101efb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -1,12 +1,7 @@
package org.apache.spark.graphx
-import java.util.{Arrays => JArrays}
-import scala.reflect.ClassTag
-
-import org.apache.spark.graphx.impl.EdgePartitionBuilder
import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.graphx.impl.{EdgePartition, GraphImpl}
-import org.apache.spark.util.collection.PrimitiveVector
+import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
/**
* Provides utilities for loading [[Graph]]s from files.
@@ -31,19 +26,20 @@ object GraphLoader extends Logging {
* 1 8
* }}}
*
- * @param sc
+ * @param sc SparkContext
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the
* the edge RDD
- * @tparam ED
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
- minEdgePartitions: Int = 1): Graph[Int, Int] = {
+ minEdgePartitions: Int = 1)
+ : Graph[Int, Int] =
+ {
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
index fc7635a033..b9ccd8765e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala
@@ -5,6 +5,7 @@ package org.apache.spark.graphx
* vertex IDs.
*/
sealed trait PartitionStrategy extends Serializable {
+ /** Returns the partition number for a given edge. */
def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index 83e28d0ab2..ce4eb53829 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -89,14 +89,16 @@ object Pregel {
*
*/
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
- (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue,
- activeDirection: EdgeDirection = EdgeDirection.Out)(
- vprog: (VertexID, VD, A) => VD,
+ (graph: Graph[VD, ED],
+ initialMsg: A,
+ maxIterations: Int = Int.MaxValue,
+ activeDirection: EdgeDirection = EdgeDirection.Out)
+ (vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
- : Graph[VD, ED] = {
-
- var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
+ : Graph[VD, ED] =
+ {
+ var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index a03e73ee79..d4d71627e1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -100,10 +100,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
*/
def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = {
val builder = new EdgePartitionBuilder[ED]
- var firstIter: Boolean = true
- var currSrcId: VertexID = nullValue[VertexID]
- var currDstId: VertexID = nullValue[VertexID]
- var currAttr: ED = nullValue[ED]
+ var currSrcId: VertexID = null.asInstanceOf[VertexID]
+ var currDstId: VertexID = null.asInstanceOf[VertexID]
+ var currAttr: ED = null.asInstanceOf[ED]
var i = 0
while (i < size) {
if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 6a2abc71cc..9e39519200 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -249,8 +249,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
// For each vertex, replicate its attribute only to partitions where it is
// in the relevant position in an edge.
- val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
- val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
+ val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr")
+ val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr")
val vs = activeSetOpt match {
case Some((activeSet, _)) =>
replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
@@ -308,10 +308,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
} // end of mapReduceTriplets
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
- (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = {
+ (other: RDD[(VertexID, U)])
+ (updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] =
+ {
if (classTag[VD] equals classTag[VD2]) {
// updateF preserves type, so we can use incremental replication
- val newVerts = vertices.leftJoin(updates)(updateF)
+ val newVerts = vertices.leftJoin(other)(updateF)
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = new ReplicatedVertexView[VD2](
changedVerts, edges, routingTable,
@@ -319,12 +321,13 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
- val newVerts = vertices.leftJoin(updates)(updateF)
+ val newVerts = vertices.leftJoin(other)(updateF)
GraphImpl(newVerts, edges, routingTable)
}
}
- private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
+ /** Test whether the closure accesses the the attribute with name `attrName`. */
+ private def accessesVertexAttr(closure: AnyRef, attrName: String): Boolean = {
try {
BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
} catch {
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 8c35f4206e..d5e1de1ce0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -37,14 +37,12 @@ object Analytics extends Logging {
case "pagerank" =>
var tol: Float = 0.001F
var outFname = ""
- var numVPart = 4
var numEPart = 4
var partitionStrategy: Option[PartitionStrategy] = None
options.foreach{
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
- case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
@@ -90,16 +88,12 @@ object Analytics extends Logging {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
- if(!isDynamic && numIter == Int.MaxValue) {
+ if (!isDynamic && numIter == Int.MaxValue) {
println("Set number of iterations!")
sys.exit(1)
}
println("======================================")
println("| Connected Components |")
- println("--------------------------------------")
- println(" Using parameters:")
- println(" \tDynamic: " + isDynamic)
- println(" \tNumIter: " + numIter)
println("======================================")
val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")", conf)
@@ -112,20 +106,18 @@ object Analytics extends Logging {
sc.stop()
case "triangles" =>
- var numVPart = 4
var numEPart = 4
// TriangleCount requires the graph to be partitioned
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
println("======================================")
println("| Triangle Count |")
- println("--------------------------------------")
+ println("======================================")
val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf)
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index d078d2acdb..da03d99264 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -4,7 +4,7 @@ import scala.reflect.ClassTag
import org.apache.spark.graphx._
-
+/** Connected components algorithm. */
object ConnectedComponents {
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index cf95267e77..853ef38712 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -5,7 +5,7 @@ import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.graphx._
-
+/** PageRank algorithm implementation. */
object PageRank extends Logging {
/**
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index f5570daec1..fa6b1db29b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -1,11 +1,12 @@
package org.apache.spark.graphx.lib
-import org.apache.spark.rdd._
-import org.apache.spark.graphx._
import scala.util.Random
import org.apache.commons.math.linear._
+import org.apache.spark.rdd._
+import org.apache.spark.graphx._
-class SVDPlusPlusConf( // SVDPlusPlus parameters
+/** Configuration parameters for SVDPlusPlus. */
+class SVDPlusPlusConf(
var rank: Int,
var maxIters: Int,
var minVal: Double,
@@ -15,11 +16,15 @@ class SVDPlusPlusConf( // SVDPlusPlus parameters
var gamma6: Double,
var gamma7: Double) extends Serializable
+/** Implementation of SVD++ algorithm. */
object SVDPlusPlus {
/**
- * Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model",
- * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
- * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6.
+ * Implement SVD++ based on "Factorization Meets the Neighborhood:
+ * a Multifaceted Collaborative Filtering Model",
+ * available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
+ *
+ * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)),
+ * see the details on page 6.
*
* @param edges edges for constructing the graph
*
@@ -27,16 +32,16 @@ object SVDPlusPlus {
*
* @return a graph with vertex attributes containing the trained model
*/
-
- def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = {
-
- // generate default vertex attribute
+ def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf)
+ : (Graph[(RealVector, RealVector, Double, Double), Double], Double) =
+ {
+ // Generate default vertex attribute
def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = {
val v1 = new ArrayRealVector(rank)
val v2 = new ArrayRealVector(rank)
for (i <- 0 until rank) {
- v1.setEntry(i, Random.nextDouble)
- v2.setEntry(i, Random.nextDouble)
+ v1.setEntry(i, Random.nextDouble())
+ v2.setEntry(i, Random.nextDouble())
}
(v1, v2, 0.0, 0.0)
}
@@ -49,14 +54,18 @@ object SVDPlusPlus {
// construct graph
var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache()
- // calculate initial bias and norm
- var t0 = g.mapReduceTriplets(et =>
- Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
- g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
- (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
+ // Calculate initial bias and norm
+ val t0 = g.mapReduceTriplets(
+ et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
+ (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
+
+ g = g.outerJoinVertices(t0) {
+ (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
- def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+ def mapTrainF(conf: SVDPlusPlusConf, u: Double)
+ (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
: Iterator[(VertexID, (RealVector, RealVector, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
@@ -64,31 +73,49 @@ object SVDPlusPlus {
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = et.attr - pred
- val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
- val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
- val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
+ val updateP = q.mapMultiply(err)
+ .subtract(p.mapMultiply(conf.gamma7))
+ .mapMultiply(conf.gamma2)
+ val updateQ = usr._2.mapMultiply(err)
+ .subtract(q.mapMultiply(conf.gamma7))
+ .mapMultiply(conf.gamma2)
+ val updateY = q.mapMultiply(err * usr._4)
+ .subtract(itm._2.mapMultiply(conf.gamma7))
+ .mapMultiply(conf.gamma2)
Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)),
(et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1)))
}
for (i <- 0 until conf.maxIters) {
- // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
+ // Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
g.cache()
- var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2))
- g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
- if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
+ val t1 = g.mapReduceTriplets(
+ et => Iterator((et.srcId, et.dstAttr._2)),
+ (g1: RealVector, g2: RealVector) => g1.add(g2))
+ g = g.outerJoinVertices(t1) {
+ (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+ if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
}
- // phase 2, update p for user nodes and q, y for item nodes
+
+ // Phase 2, update p for user nodes and q, y for item nodes
g.cache()
- val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
- (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
- g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
- (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
+ val t2 = g.mapReduceTriplets(
+ mapTrainF(conf, u),
+ (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
+ (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
+ g = g.outerJoinVertices(t2) {
+ (vid: VertexID,
+ vd: (RealVector, RealVector, Double, Double),
+ msg: Option[(RealVector, RealVector, Double)]) =>
+ (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
}
}
// calculate error on training set
- def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = {
+ def mapTestF(conf: SVDPlusPlusConf, u: Double)
+ (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+ : Iterator[(VertexID, Double)] =
+ {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
@@ -99,9 +126,11 @@ object SVDPlusPlus {
}
g.cache()
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
- g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
- if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
+ g = g.outerJoinVertices(t3) {
+ (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}
+
(g, u)
}
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
index 43c4b9cf2d..11847509da 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala
@@ -4,6 +4,7 @@ import scala.reflect.ClassTag
import org.apache.spark.graphx._
+/** Strongly connected components algorithm implementation. */
object StronglyConnectedComponents {
/**
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index 58da9e3aed..f87eab9505 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -4,27 +4,26 @@ import scala.reflect.ClassTag
import org.apache.spark.graphx._
-
+/**
+ * Compute the number of triangles passing through each vertex.
+ *
+ * The algorithm is relatively straightforward and can be computed in three steps:
+ *
+ * 1) Compute the set of neighbors for each vertex
+ * 2) For each edge compute the intersection of the sets and send the
+ * count to both vertices.
+ * 3) Compute the sum at each vertex and divide by two since each
+ * triangle is counted twice.
+ *
+ * Note that the input graph should have its edges in canonical direction
+ * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned
+ * using [[org.apache.spark.graphx.Graph#partitionBy]].
+ */
object TriangleCount {
- /**
- * Compute the number of triangles passing through each vertex.
- *
- * The algorithm is relatively straightforward and can be computed in three steps:
- *
- * 1) Compute the set of neighbors for each vertex
- * 2) For each edge compute the intersection of the sets and send the
- * count to both vertices.
- * 3) Compute the sum at each vertex and divide by two since each
- * triangle is counted twice.
- *
- *
- * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned
- * using [[org.apache.spark.graphx.Graph#partitionBy]], and its edges must be in canonical
- * orientation (srcId < dstId).
- */
+
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
// Remove redundant edges
- val g = graph.groupEdges((a, b) => a).cache
+ val g = graph.groupEdges((a, b) => a).cache()
// Construct set representations of the neighborhoods
val nbrSets: VertexRDD[VertexSet] =
@@ -56,8 +55,10 @@ object TriangleCount {
val iter = smallSet.iterator
var counter: Int = 0
while (iter.hasNext) {
- val vid = iter.next
- if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 }
+ val vid = iter.next()
+ if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) {
+ counter += 1
+ }
}
Iterator((et.srcId, counter), (et.dstId, counter))
}
@@ -71,7 +72,5 @@ object TriangleCount {
assert((dblCount & 1) == 0)
dblCount / 2
}
-
} // end of TriangleCount
-
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
index e70d2fd09f..60dfc1dc37 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala
@@ -2,6 +2,7 @@ package org.apache.spark
import org.apache.spark.util.collection.OpenHashSet
+/** GraphX is a graph processing framework built on top of Spark. */
package object graphx {
/**
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
@@ -9,11 +10,9 @@ package object graphx {
*/
type VertexID = Long
+ /** Integer identifer of a graph partition. */
// TODO: Consider using Char.
type PartitionID = Int
private[graphx] type VertexSet = OpenHashSet[VertexID]
-
- /** Returns the default null-like value for a data type T. */
- private[graphx] def nullValue[T] = null.asInstanceOf[T]
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index ec8d534333..1c5b234d74 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -10,8 +10,11 @@ import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor}
import org.objectweb.asm.Opcodes._
-
-private[spark] object BytecodeUtils {
+/**
+ * Includes an utility function to test whether a function accesses a specific attribute
+ * of an object.
+ */
+private[graphx] object BytecodeUtils {
/**
* Test whether the given closure invokes the specified method in the specified class.
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
index 1088944cd3..7b02e2ed1a 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -28,7 +28,7 @@ import scala.reflect._
*
* Under the hood, it uses our OpenHashSet implementation.
*/
-private[spark]
+private[graphx]
class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
@specialized(Long, Int, Double) V: ClassTag](
val keySet: OpenHashSet[K], var _values: Array[V])