diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-13 17:40:36 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-13 17:40:36 -0800 |
commit | 02a8f54bfa4572908d2d605a85e7a5adf9a36fbc (patch) | |
tree | 85acf6b51b3ca5c4c9ca3f53ec24acfaa5f14838 /graphx/src | |
parent | dc041cd3b6b3b75df42d9a74dcf95179a25ee50f (diff) | |
download | spark-02a8f54bfa4572908d2d605a85e7a5adf9a36fbc.tar.gz spark-02a8f54bfa4572908d2d605a85e7a5adf9a36fbc.tar.bz2 spark-02a8f54bfa4572908d2d605a85e7a5adf9a36fbc.zip |
Miscel doc update.
Diffstat (limited to 'graphx/src')
17 files changed, 158 insertions, 143 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala index 85f27d2c8d..6c396c3dbe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -13,7 +13,7 @@ case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] /** The vertex id of the target vertex. */ var dstId: VertexID = 0, /** The attribute associated with the edge. */ - var attr: ED = nullValue[ED]) + var attr: ED = null.asInstanceOf[ED]) extends Serializable { /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala index 057d63a0ac..4253b24b5a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala @@ -1,7 +1,5 @@ package org.apache.spark.graphx -import org.apache.spark.graphx.impl.VertexPartition - /** * An edge triplet represents an edge along with the vertex attributes of its neighboring vertices. * @@ -47,5 +45,5 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { def vertexAttr(vid: VertexID): VD = if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } - override def toString() = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() + override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 7d4f0de3d6..d2ba6fde4a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -45,7 +45,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { /** * An RDD containing the edge triplets, which are edges along with the vertex data associated with - * the adjacent vertices. + * the adjacent vertices. The caller should use [[edges]] if the vertex data are not needed, i.e. + * if only the edge data and adjacent vertex ids are needed. * * @return an RDD containing edge triplets * @@ -54,13 +55,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * different color. * {{{ * type Color = Int - * val graph: Graph[Color, Int] = Graph.textFile("hdfs://file.tsv") + * val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv") * val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum * }}} - * - * @see `edges` if only the edge data and adjacent vertex ids are - * required. - * */ val triplets: RDD[EdgeTriplet[VD, ED]] @@ -68,9 +65,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * Caches the vertices and edges associated with this graph at the specified storage level. * * @param newLevel the level at which to cache the graph. - - * @return A reference to this graph for convenience. * + * @return A reference to this graph for convenience. */ def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] @@ -159,8 +155,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * @tparam ED2 the new edge data type * */ - def mapEdges[ED2: ClassTag]( - map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] + def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]) + : Graph[VD, ED2] /** * Transforms each edge attribute using the map function, passing it the adjacent vertex attributes @@ -203,9 +199,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * @tparam ED2 the new edge data type * */ - def mapTriplets[ED2: ClassTag]( - map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): - Graph[VD, ED2] + def mapTriplets[ED2: ClassTag](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]) + : Graph[VD, ED2] /** * Reverses all edges in the graph. If this graph contains an edge from a to b then the returned @@ -233,8 +228,10 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * @return the subgraph containing only the vertices and edges that * satisfy the predicates */ - def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), - vpred: (VertexID, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED] + def subgraph( + epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (VertexID, VD) => Boolean = ((v, d) => true)) + : Graph[VD, ED] /** * Restricts the graph to only the vertices and edges that are also in `other`, but keeps the @@ -249,14 +246,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * Merges multiple edges between two vertices into a single edge. For correct results, the graph * must have been partitioned using [[partitionBy]]. * - * @tparam ED2 the type of the resulting edge data after grouping. - * - * @param f the user-supplied commutative associative function to merge edge attributes for - * duplicate edges. + * @param merge the user-supplied commutative associative function to merge edge attributes + * for duplicate edges. * * @return The resulting graph with a single edge for each (source, dest) vertex pair. */ - def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] + def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] /** * Computes statistics about the neighboring edges and vertices of each vertex. The user supplied @@ -270,7 +265,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * more messages to neighboring vertices * * @param reduceFunc the user defined reduce function which should - * be commutative and assosciative and is used to combine the output + * be commutative and associative and is used to combine the output * of the map phase * * @param activeSetOpt optionally, a set of "active" vertices and a direction of edges to consider @@ -301,21 +296,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { /** * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. The - * input table should contain at most one entry for each vertex. If no entry in `table` is + * input table should contain at most one entry for each vertex. If no entry in `other` is * provided for a particular vertex in the graph, the map function receives `None`. * * @tparam U the type of entry in the table of updates * @tparam VD2 the new vertex value type * - * @param table the table to join with the vertices in the graph. - * The table should contain at most one entry for each vertex. - * - * @param mapFunc the function used to compute the new vertex - * values. The map function is invoked for all vertices, even those - * that do not have a corresponding entry in the table. + * @param other the table to join with the vertices in the graph. + * The table should contain at most one entry for each vertex. + * @param mapFunc the function used to compute the new vertex values. + * The map function is invoked for all vertices, even those + * that do not have a corresponding entry in the table. * * @example This function is used to update the vertices with new values based on external data. - * For example we could add the out-degree to each vertex record: + * For example we could add the out-degree to each vertex record: * * {{{ * val rawGraph: Graph[_, _] = Graph.textFile("webgraph") @@ -324,20 +318,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] { * (vid, data, optDeg) => optDeg.getOrElse(0) * } * }}} - * */ - def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(VertexID, U)]) + def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexID, U)]) (mapFunc: (VertexID, VD, Option[U]) => VD2) : Graph[VD2, ED] + /** + * The associated [[GraphOps]] object. + */ // Save a copy of the GraphOps object so there is always one unique GraphOps object // for a given Graph object, and thus the lazy vals in GraphOps would work as intended. val ops = new GraphOps(this) } // end of Graph - - /** * The Graph object contains a collection of routines used to construct graphs from RDDs. */ @@ -357,7 +351,8 @@ object Graph { def fromEdgeTuples[VD: ClassTag]( rawEdges: RDD[(VertexID, VertexID)], defaultValue: VD, - uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = { + uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = + { val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) val graph = GraphImpl(edges, defaultValue) uniqueEdges match { @@ -391,10 +386,8 @@ object Graph { * @tparam ED the edge attribute type * @param vertices the "set" of vertices and their attributes * @param edges the collection of edges in the graph - * @param defaultVertexAttr the default vertex attribute to use for - * vertices that are mentioned in edges but not in vertices - * @param partitionStrategy the partition strategy to use when - * partitioning the edges + * @param defaultVertexAttr the default vertex attribute to use for vertices that are + * mentioned in edges but not in vertices */ def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexID, VD)], @@ -406,9 +399,9 @@ object Graph { /** * Implicitly extracts the [[GraphOps]] member from a graph. * - * To improve modularity the Graph type only contains a small set of basic operations. All the - * convenience operations are defined in the [[GraphOps]] class which may be shared across multiple - * graph implementations. + * To improve modularity the Graph type only contains a small set of basic operations. + * All the convenience operations are defined in the [[GraphOps]] class which may be + * shared across multiple graph implementations. */ implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops } // end of Graph object diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 3c06a403ea..7bdb101efb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -1,12 +1,7 @@ package org.apache.spark.graphx -import java.util.{Arrays => JArrays} -import scala.reflect.ClassTag - -import org.apache.spark.graphx.impl.EdgePartitionBuilder import org.apache.spark.{Logging, SparkContext} -import org.apache.spark.graphx.impl.{EdgePartition, GraphImpl} -import org.apache.spark.util.collection.PrimitiveVector +import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} /** * Provides utilities for loading [[Graph]]s from files. @@ -31,19 +26,20 @@ object GraphLoader extends Logging { * 1 8 * }}} * - * @param sc + * @param sc SparkContext * @param path the path to the file (e.g., /home/data/file or hdfs://file) * @param canonicalOrientation whether to orient edges in the positive * direction * @param minEdgePartitions the number of partitions for the * the edge RDD - * @tparam ED */ def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, - minEdgePartitions: Int = 1): Graph[Int, Int] = { + minEdgePartitions: Int = 1) + : Graph[Int, Int] = + { val startTime = System.currentTimeMillis // Parse the edge data table directly into edge partitions diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index fc7635a033..b9ccd8765e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -5,6 +5,7 @@ package org.apache.spark.graphx * vertex IDs. */ sealed trait PartitionStrategy extends Serializable { + /** Returns the partition number for a given edge. */ def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 83e28d0ab2..ce4eb53829 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -89,14 +89,16 @@ object Pregel { * */ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] - (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, - activeDirection: EdgeDirection = EdgeDirection.Out)( - vprog: (VertexID, VD, A) => VD, + (graph: Graph[VD, ED], + initialMsg: A, + maxIterations: Int = Int.MaxValue, + activeDirection: EdgeDirection = EdgeDirection.Out) + (vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) - : Graph[VD, ED] = { - - var g = graph.mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() + : Graph[VD, ED] = + { + var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() // compute the messages var messages = g.mapReduceTriplets(sendMsg, mergeMsg) var activeMessages = messages.count() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index a03e73ee79..d4d71627e1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -100,10 +100,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) */ def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { val builder = new EdgePartitionBuilder[ED] - var firstIter: Boolean = true - var currSrcId: VertexID = nullValue[VertexID] - var currDstId: VertexID = nullValue[VertexID] - var currAttr: ED = nullValue[ED] + var currSrcId: VertexID = null.asInstanceOf[VertexID] + var currDstId: VertexID = null.asInstanceOf[VertexID] + var currAttr: ED = null.asInstanceOf[ED] var i = 0 while (i < size) { if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 6a2abc71cc..9e39519200 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -249,8 +249,8 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( // For each vertex, replicate its attribute only to partitions where it is // in the relevant position in an edge. - val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr") - val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr") + val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr") + val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr") val vs = activeSetOpt match { case Some((activeSet, _)) => replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) @@ -308,10 +308,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } // end of mapReduceTriplets override def outerJoinVertices[U: ClassTag, VD2: ClassTag] - (updates: RDD[(VertexID, U)])(updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = { + (other: RDD[(VertexID, U)]) + (updateF: (VertexID, VD, Option[U]) => VD2): Graph[VD2, ED] = + { if (classTag[VD] equals classTag[VD2]) { // updateF preserves type, so we can use incremental replication - val newVerts = vertices.leftJoin(updates)(updateF) + val newVerts = vertices.leftJoin(other)(updateF) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) val newReplicatedVertexView = new ReplicatedVertexView[VD2]( changedVerts, edges, routingTable, @@ -319,12 +321,13 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) } else { // updateF does not preserve type, so we must re-replicate all vertices - val newVerts = vertices.leftJoin(updates)(updateF) + val newVerts = vertices.leftJoin(other)(updateF) GraphImpl(newVerts, edges, routingTable) } } - private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = { + /** Test whether the closure accesses the the attribute with name `attrName`. */ + private def accessesVertexAttr(closure: AnyRef, attrName: String): Boolean = { try { BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName) } catch { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 8c35f4206e..d5e1de1ce0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -37,14 +37,12 @@ object Analytics extends Logging { case "pagerank" => var tol: Float = 0.001F var outFname = "" - var numVPart = 4 var numEPart = 4 var partitionStrategy: Option[PartitionStrategy] = None options.foreach{ case ("tol", v) => tol = v.toFloat case ("output", v) => outFname = v - case ("numVPart", v) => numVPart = v.toInt case ("numEPart", v) => numEPart = v.toInt case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) @@ -90,16 +88,12 @@ object Analytics extends Logging { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } - if(!isDynamic && numIter == Int.MaxValue) { + if (!isDynamic && numIter == Int.MaxValue) { println("Set number of iterations!") sys.exit(1) } println("======================================") println("| Connected Components |") - println("--------------------------------------") - println(" Using parameters:") - println(" \tDynamic: " + isDynamic) - println(" \tNumIter: " + numIter) println("======================================") val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")", conf) @@ -112,20 +106,18 @@ object Analytics extends Logging { sc.stop() case "triangles" => - var numVPart = 4 var numEPart = 4 // TriangleCount requires the graph to be partitioned var partitionStrategy: PartitionStrategy = RandomVertexCut options.foreach{ case ("numEPart", v) => numEPart = v.toInt - case ("numVPart", v) => numVPart = v.toInt case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } println("======================================") println("| Triangle Count |") - println("--------------------------------------") + println("======================================") val sc = new SparkContext(host, "TriangleCount(" + fname + ")", conf) val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index d078d2acdb..da03d99264 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -4,7 +4,7 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ - +/** Connected components algorithm. */ object ConnectedComponents { /** * Compute the connected component membership of each vertex and return a graph with the vertex diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index cf95267e77..853ef38712 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -5,7 +5,7 @@ import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.graphx._ - +/** PageRank algorithm implementation. */ object PageRank extends Logging { /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index f5570daec1..fa6b1db29b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -1,11 +1,12 @@ package org.apache.spark.graphx.lib -import org.apache.spark.rdd._ -import org.apache.spark.graphx._ import scala.util.Random import org.apache.commons.math.linear._ +import org.apache.spark.rdd._ +import org.apache.spark.graphx._ -class SVDPlusPlusConf( // SVDPlusPlus parameters +/** Configuration parameters for SVDPlusPlus. */ +class SVDPlusPlusConf( var rank: Int, var maxIters: Int, var minVal: Double, @@ -15,11 +16,15 @@ class SVDPlusPlusConf( // SVDPlusPlus parameters var gamma6: Double, var gamma7: Double) extends Serializable +/** Implementation of SVD++ algorithm. */ object SVDPlusPlus { /** - * Implement SVD++ based on "Factorization Meets the Neighborhood: a Multifaceted Collaborative Filtering Model", - * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]]. - * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6. + * Implement SVD++ based on "Factorization Meets the Neighborhood: + * a Multifaceted Collaborative Filtering Model", + * available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]]. + * + * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), + * see the details on page 6. * * @param edges edges for constructing the graph * @@ -27,16 +32,16 @@ object SVDPlusPlus { * * @return a graph with vertex attributes containing the trained model */ - - def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = { - - // generate default vertex attribute + def run(edges: RDD[Edge[Double]], conf: SVDPlusPlusConf) + : (Graph[(RealVector, RealVector, Double, Double), Double], Double) = + { + // Generate default vertex attribute def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = { val v1 = new ArrayRealVector(rank) val v2 = new ArrayRealVector(rank) for (i <- 0 until rank) { - v1.setEntry(i, Random.nextDouble) - v2.setEntry(i, Random.nextDouble) + v1.setEntry(i, Random.nextDouble()) + v2.setEntry(i, Random.nextDouble()) } (v1, v2, 0.0, 0.0) } @@ -49,14 +54,18 @@ object SVDPlusPlus { // construct graph var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache() - // calculate initial bias and norm - var t0 = g.mapReduceTriplets(et => - Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) - g = g.outerJoinVertices(t0) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => - (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) + // Calculate initial bias and norm + val t0 = g.mapReduceTriplets( + et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), + (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) + + g = g.outerJoinVertices(t0) { + (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => + (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) } - def mapTrainF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) + def mapTrainF(conf: SVDPlusPlusConf, u: Double) + (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) : Iterator[(VertexID, (RealVector, RealVector, Double))] = { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) @@ -64,31 +73,49 @@ object SVDPlusPlus { pred = math.max(pred, conf.minVal) pred = math.min(pred, conf.maxVal) val err = et.attr - pred - val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateP = q.mapMultiply(err) + .subtract(p.mapMultiply(conf.gamma7)) + .mapMultiply(conf.gamma2) + val updateQ = usr._2.mapMultiply(err) + .subtract(q.mapMultiply(conf.gamma7)) + .mapMultiply(conf.gamma2) + val updateY = q.mapMultiply(err * usr._4) + .subtract(itm._2.mapMultiply(conf.gamma7)) + .mapMultiply(conf.gamma2) Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)), (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))) } for (i <- 0 until conf.maxIters) { - // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes + // Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes g.cache() - var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) - g = g.outerJoinVertices(t1) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => - if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd + val t1 = g.mapReduceTriplets( + et => Iterator((et.srcId, et.dstAttr._2)), + (g1: RealVector, g2: RealVector) => g1.add(g2)) + g = g.outerJoinVertices(t1) { + (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => + if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd } - // phase 2, update p for user nodes and q, y for item nodes + + // Phase 2, update p for user nodes and q, y for item nodes g.cache() - val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => - (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) - g = g.outerJoinVertices(t2) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => - (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) + val t2 = g.mapReduceTriplets( + mapTrainF(conf, u), + (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => + (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) + g = g.outerJoinVertices(t2) { + (vid: VertexID, + vd: (RealVector, RealVector, Double, Double), + msg: Option[(RealVector, RealVector, Double)]) => + (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) } } // calculate error on training set - def mapTestF(conf: SVDPlusPlusConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(VertexID, Double)] = { + def mapTestF(conf: SVDPlusPlusConf, u: Double) + (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) + : Iterator[(VertexID, Double)] = + { val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr._1, itm._1) var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) @@ -99,9 +126,11 @@ object SVDPlusPlus { } g.cache() val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) - g = g.outerJoinVertices(t3) { (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => - if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd + g = g.outerJoinVertices(t3) { + (vid: VertexID, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd } + (g, u) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala index 43c4b9cf2d..11847509da 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala @@ -4,6 +4,7 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ +/** Strongly connected components algorithm implementation. */ object StronglyConnectedComponents { /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index 58da9e3aed..f87eab9505 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -4,27 +4,26 @@ import scala.reflect.ClassTag import org.apache.spark.graphx._ - +/** + * Compute the number of triangles passing through each vertex. + * + * The algorithm is relatively straightforward and can be computed in three steps: + * + * 1) Compute the set of neighbors for each vertex + * 2) For each edge compute the intersection of the sets and send the + * count to both vertices. + * 3) Compute the sum at each vertex and divide by two since each + * triangle is counted twice. + * + * Note that the input graph should have its edges in canonical direction + * (i.e. the `sourceId` less than `destId`). Also the graph must have been partitioned + * using [[org.apache.spark.graphx.Graph#partitionBy]]. + */ object TriangleCount { - /** - * Compute the number of triangles passing through each vertex. - * - * The algorithm is relatively straightforward and can be computed in three steps: - * - * 1) Compute the set of neighbors for each vertex - * 2) For each edge compute the intersection of the sets and send the - * count to both vertices. - * 3) Compute the sum at each vertex and divide by two since each - * triangle is counted twice. - * - * - * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned - * using [[org.apache.spark.graphx.Graph#partitionBy]], and its edges must be in canonical - * orientation (srcId < dstId). - */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { // Remove redundant edges - val g = graph.groupEdges((a, b) => a).cache + val g = graph.groupEdges((a, b) => a).cache() // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = @@ -56,8 +55,10 @@ object TriangleCount { val iter = smallSet.iterator var counter: Int = 0 while (iter.hasNext) { - val vid = iter.next - if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { counter += 1 } + val vid = iter.next() + if (vid != et.srcId && vid != et.dstId && largeSet.contains(vid)) { + counter += 1 + } } Iterator((et.srcId, counter), (et.dstId, counter)) } @@ -71,7 +72,5 @@ object TriangleCount { assert((dblCount & 1) == 0) dblCount / 2 } - } // end of TriangleCount - } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/package.scala b/graphx/src/main/scala/org/apache/spark/graphx/package.scala index e70d2fd09f..60dfc1dc37 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/package.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/package.scala @@ -2,6 +2,7 @@ package org.apache.spark import org.apache.spark.util.collection.OpenHashSet +/** GraphX is a graph processing framework built on top of Spark. */ package object graphx { /** * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need @@ -9,11 +10,9 @@ package object graphx { */ type VertexID = Long + /** Integer identifer of a graph partition. */ // TODO: Consider using Char. type PartitionID = Int private[graphx] type VertexSet = OpenHashSet[VertexID] - - /** Returns the default null-like value for a data type T. */ - private[graphx] def nullValue[T] = null.asInstanceOf[T] } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala index ec8d534333..1c5b234d74 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala @@ -10,8 +10,11 @@ import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor} import org.objectweb.asm.Opcodes._ - -private[spark] object BytecodeUtils { +/** + * Includes an utility function to test whether a function accesses a specific attribute + * of an object. + */ +private[graphx] object BytecodeUtils { /** * Test whether the given closure invokes the specified method in the specified class. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala index 1088944cd3..7b02e2ed1a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala @@ -28,7 +28,7 @@ import scala.reflect._ * * Under the hood, it uses our OpenHashSet implementation. */ -private[spark] +private[graphx] class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag, @specialized(Long, Int, Double) V: ClassTag]( val keySet: OpenHashSet[K], var _values: Array[V]) |