From 638088923dbfe94215c4e0edfac8beb2e7b483f8 Mon Sep 17 00:00:00 2001
From: Ankur Dave Repartitions the edges in the graph according to Repartitions the edges in the graph according to GraphX in Spark 1.0.0 contains one user-facing interface change from Spark 0.9.1. The To get started you first need to import Spark and GraphX into your project, as follows: If you are not using the Spark shell you will also need a GraphX optimizes the representation of vertex and edge types when they are plain old data-types
-(e.g., int, double, etc…) reducing the in memory footprint by storing them in specialized
+(e.g., int, double, etc…) reducing the in memory footprint by storing them in specialized
arrays.N
(vid, data, optDeg) => optDeg.getOrElse(0)
}
-
@@ -328,7 +328,28 @@ provided for a particular vertex in the graph, the map function receives
- N
partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
partitionStrategy
.partitionStrategy
(WARNING: broken in
+Spark 1․0․0).Graph.partitionBy
in Spark 1.0.0EdgeRDD
may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type Graph[VD, ED]
are of type EdgeRDD[ED, VD]
rather than EdgeRDD[ED]
.Workaround for
+
+
+Graph.partitionBy
in Spark 1.0.0Graph.partitionBy
operator allows users to choose the graph partitioning strategy, but due to SPARK-1931, this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from the master branch, which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows:// Define our own version of partitionBy to work around SPARK-1931
+import org.apache.spark.HashPartitioner
+def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = {
+ val numPartitions = edges.partitions.size
+ edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitions(_.map(_._2), preservesPartitioning = true)
+}
+
+val vertices = ...
+val edges = ...
+
+// Instead of:
+val g = Graph(vertices, edges).partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0
+
+// Use:
+val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D))
Getting Started
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
-import org.apache.spark.rdd.RDD
-
SparkContext
. To learn more about
getting started with Spark refer to the Spark Quick Start Guide.
@@ -280,8 +303,7 @@ bipartite graph we might do the following:
Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note @@ -297,8 +319,7 @@ the vertices and edges of the graph:
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED, VD]
-}
-
The classes VertexRDD[VD]
and EdgeRDD[ED, VD]
extend and are optimized versions of RDD[(VertexID,
VD)]
and RDD[Edge[ED]]
respectively. Both VertexRDD[VD]
and EdgeRDD[ED, VD]
provide additional
@@ -320,8 +341,7 @@ with a string describing the relationships between collaborators:
The resulting graph would have the type signature:
-val userGraph: Graph[(String, String), String]
-
val userGraph: Graph[(String, String), String]
There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on @@ -342,8 +362,7 @@ code constructs a graph from a collection of RDDs:
// Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph -val graph = Graph(users, relationships, defaultUser) - +val graph = Graph(users, relationships, defaultUser)In the above example we make use of the Edge
case class. Edges have a srcId
and a
dstId
corresponding to the source and destination vertex identifiers. In addition, the Edge
@@ -356,8 +375,7 @@ and graph.edges
members respectively.
Note that
tuple. On the other hand,graph.vertices
returns anVertexRDD[(String, String)]
which extends @@ -365,8 +383,7 @@ andgraph.edges
members respectively.graph.edges
returns anEdgeRDD
containingEdge[String]
objects. We could have also used the case class type constructor as in the following: -+graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count -
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. @@ -376,8 +393,7 @@ The triplet view logically joins the vertex and edge properties yielding an
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
-ON e.srcId = src.Id AND e.dstId = dst.Id
-
or graphically as:
@@ -395,8 +411,7 @@ triplet view of a graph to render a collection of strings describing relationshi val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) -facts.collect.foreach(println(_)) - +facts.collect.foreach(println(_))GraphOps
) by the f
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
-val inDegrees: VertexRDD[Int] = graph.inDegrees
-
The reason for differentiating between core graph operations and GraphOps
is to be
able to support different graph representations in the future. Each graph representation must
@@ -442,6 +456,8 @@ operations.
Each of these operators yields a new graph with the vertex or edge properties modified by the user
defined map
function.
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
-val newGraph = Graph(newVertices, graph.edges)
-
Instead, use
-mapVertices
to preserve the indices:+val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr)) -
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
These operators are often used to initialize the graph for a particular computation or project away @@ -528,8 +540,7 @@ unnecessary properties. For example, given a graph with the out-degrees as the // Construct a graph where each edge contains the weight // and each vertex is the initial PageRank val outputGraph: Graph[Double, Double] = - inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0) - + inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
The reverse
operator returns a new graph with all the edge directions reversed.
This can be useful when, for example, trying to compute the inverse PageRank. Because the reverse
@@ -582,8 +592,7 @@ interest or eliminate broken links. For example in the following code we remove
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
- ).collect.foreach(println(_))
-
+ ).collect.foreach(println(_))
Note in the above example only the vertex predicate is provided. The
// Remove missing vertices as well as the edges to connected to them val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") // Restrict the answer to the valid subgraph -val validCCGraph = ccGraph.mask(validGraph) - +val validCCGraph = ccGraph.mask(validGraph)subgraph
operator defaults @@ -601,8 +610,7 @@ the answer to the valid subgraph.The
: Graph[VD, ED] def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] -} - +}groupEdges
operator merges parallel edges (i.e., duplicate edges between pairs of vertices) in the multigraph. In many numerical applications, parallel edges can be added @@ -621,8 +629,7 @@ using the join operators. Below we list the key join operators:The
joinVertices
operator joins the vertices with the input RDD and returns a new graph with the vertex properties obtained by applying the user definedmap
function @@ -638,8 +645,7 @@ also pre-index the resulting values to substantially accelerate the sub val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b) val joinedGraph = graph.joinVertices(uniqueCosts)( - (id, oldCost, extraCost) => oldCost + extraCost) - + (id, oldCost, extraCost) => oldCost + extraCost)
The more general outerJoinVertices
behaves similarly to joinVertices
@@ -654,8 +660,7 @@ vertex properties with their outDegree
.
You may have noticed the multiple parameter lists (e.g.,
f(a)(b)
) curried function pattern used @@ -664,8 +669,7 @@ that type inference onb
would not depend ona
. As a provide type annotation for the user defined function:+ (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)val joinedGraph = graph.joinVertices(uniqueCosts, - (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost) -
The mapReduceTriplets
operator takes a user defined map function which
is applied to each triplet and can yield messages destined to either (none or both) vertices in
@@ -705,8 +708,7 @@ receive a message are not included in the returned VertexRDD
.
VertexRDD
:
- activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None
-
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None
The EdgeDirection specifies which edges adjacent to the vertex set are included in the map @@ -748,8 +750,7 @@ more senior followers of each user.
val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ) // Display the results -avgAgeOfOlderFollowers.collect.foreach(println(_)) - +avgAgeOfOlderFollowers.collect.foreach(println(_))Note that the
// Compute the max degrees val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) -val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) - +val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)mapReduceTriplets
operation performs optimally when the messages (and the sums of @@ -773,8 +774,7 @@ compute the max in, out, and total degrees:Collecting Neighbors
@@ -786,8 +786,7 @@ attributes at each vertex. This can be easily accomplished using the+}class GraphOps[VD, ED] { def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ] -} -
Note that these operators can be quite costly as they duplicate information and require @@ -862,8 +861,7 @@ of its implementation (note calls to graph.cache have been removed):
} g } -} - +}Notice that Pregel takes two argument lists (i.e.,
}, (a,b) => math.min(a,b) // Merge Message ) -println(sssp.vertices.collect.mkString("\n")) - +println(sssp.vertices.collect.mkString("\n"))graph.pregel(list1)(list2)
). The first argument list contains configuration parameters including the initial message, the maximum number of @@ -894,13 +892,12 @@ shortest path in the following example.Graph Builders
-GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph’s edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS).
+Graph.groupEdges
requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must callGraph.partitionBy
before callinggroupEdges
.GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph’s edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS).
Graph.groupEdges
requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must callGraph.partitionBy
before callinggroupEdges
. However, note thatGraph.partitionBy
is broken in Spark 1.0.0 due to SPARK-1931; see the suggested workarounds above.+}object GraphLoader { def edgeListFile( @@ -909,8 +906,7 @@ shortest path in the following example. canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1) : Graph[Int, Int] -} -
@@ -938,8 +934,7 @@ shortest path in the following example. defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] -} - +}
GraphLoader.edgeListFile
provides a way to load a graph from a list of edges on disk. It parses an adjacency list of (source vertex ID, destination vertex ID) pairs of the following form, skipping comment lines that begin with#
:@@ -978,8 +973,7 @@ additional functionality: def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] -} - +}
Graph.apply
allows creating a graph from RDDs of vertices and edges. Duplicate vertices are picked arbitrarily and vertices found in the edge RDD but not the vertex RDD are assigned the default attribute.Notice, for example, how the
filter
operator returns anVertexRDD
. Filter is actually implemented using aBitSet
thereby reusing the index and preserving the ability to do fast joins @@ -1001,8 +995,7 @@ both aggregate and then subsequently index theRDD[(VertexID, A)]
. // There should be 100 entries in setB setB.count // Joining A and B should now be fast! -val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b) - +val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)EdgeRDDs
@@ -1018,8 +1011,7 @@ reuse when changing attribute values. // Revere the edges reusing both attributes and structure def reverse: EdgeRDD[ED, VD] // Join two `EdgeRDD`s partitioned using the same partitioning strategy. -def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] - +def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]In most applications we have found that operations on the
@@ -1040,10 +1032,12 @@ distributed graph partitioning: reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on theEdgeRDD
are accomplished through the graph operators or rely on operations defined in the baseRDD
class.PartitionStrategy
and there are several tradeoffs to the -various heuristics. Users can choose between different strategies by repartitioning the graph with -theGraph.partitionBy
operator. The default partitioning strategy is to use -the initial partitioning of the edges as provided on graph construction. However, users can easily -switch to 2D-partitioning or other heuristics included in GraphX. +various heuristics. The default partitioning strategy is to use the initial partitioning of the +edges as provided on graph construction. However, users can easily switch to 2D-partitioning or +other heuristics included in GraphX. + +Users can choose between different strategies by repartitioning the graph with +the
Graph.partitionBy
operator. However, note thatGraph.partitionBy
is broken in Spark 1.0.0 due to SPARK-1931; see the suggested workarounds above.@@ -1065,7 +1059,7 @@ to broadcast vertices when implementing the join required for operations like
case (uid, attrList, None) => (0.0, attrList.toList) } -println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) - +println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) -- cgit v1.2.3PageRank - PageRank measures the importance of each vertex in a graph, assuming an edge from u to v represents an endorsement of v’s importance by u. For example, if a Twitter user is followed by many others, the user will be ranked highly.
+PageRank measures the importance of each vertex in a graph, assuming an edge from u to v represents an endorsement of v’s importance by u. For example, if a Twitter user is followed by many others, the user will be ranked highly.
GraphX comes with static and dynamic implementations of PageRank as methods on the
@@ -1084,8 +1078,7 @@ to broadcast vertices when implementing the join required for operations likePageRank
object. Static PageRank runs for a fixed number of iterations, while dynamic PageRank runs until the ranks converge (i.e., stop changing by more than a specified tolerance).GraphOps
allows calling these algorithms directly as methods onGraph
.case (id, (username, rank)) => (username, rank) } // Print the result -println(ranksByUsername.collect().mkString("\n")) - +println(ranksByUsername.collect().mkString("\n")) Connected Components
@@ -1104,15 +1097,26 @@ to broadcast vertices when implementing the join required for operations likecase (id, (username, cc)) => (username, cc) } // Print the result -println(ccByUsername.collect().mkString("\n")) - +println(ccByUsername.collect().mkString("\n")) Triangle Counting
-A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the
+TriangleCount
object that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the PageRank section. Note thatTriangleCount
requires the edges to be in canonical orientation (srcId < dstId
) and the graph to be partitioned usingGraph.partitionBy
.A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the
+ +TriangleCount
object that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the PageRank section. Note thatTriangleCount
requires the edges to be in canonical orientation (srcId < dstId
) and the graph to be partitioned usingGraph.partitionBy
. Also note thatGraph.partitionBy
is broken in Spark 1.0.0 due to SPARK-1931; see the suggested workarounds above.// Define our own version of partitionBy to work around SPARK-1931 +import org.apache.spark.HashPartitioner +def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = { + val numPartitions = edges.partitions.size + edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e)) + .partitionBy(new HashPartitioner(numPartitions)) + .mapPartitions(_.map(_._2), preservesPartitioning = true) +} -
+println(triCountByUsername.collect().mkString("\n"))// Load the edges in canonical order and partition the graph for triangle count -val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut) +// Load the edges in canonical order and partition the graph for triangle count +val unpartitionedGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true) +val graph = Graph( + partitionBy(unpartitionedGraph.edges, PartitionStrategy.RandomVertexCut), + unpartitionedGraph.vertices) // Find the triangle count for each vertex val triCounts = graph.triangleCount().vertices // Join the triangle counts with the usernames @@ -1124,8 +1128,7 @@ to broadcast vertices when implementing the join required for operations like
(username, tc) } // Print the result -println(triCountByUsername.collect().mkString("\n")) - Examples
@@ -1163,8 +1166,7 @@ all of this in just a few lines with GraphX:
Repartitions the edges in the graph according to
partitionStrategy
(WARNING: broken in +Spark 1․0․0).To use this function in Spark 1.0.0, either build the latest version of Spark from the master +branch, or apply the following workaround: