From 638088923dbfe94215c4e0edfac8beb2e7b483f8 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 3 Jun 2014 02:28:23 +0000 Subject: Suggest workarounds for partitionBy in Spark 1.0.0 due to SPARK-1931 Applied PR #908 to the generated docs: https://github.com/apache/spark/pull/908 --- .../api/scala/org/apache/spark/graphx/Graph.html | 25 ++- site/docs/1.0.0/graphx-programming-guide.html | 174 +++++++++++---------- 2 files changed, 111 insertions(+), 88 deletions(-) (limited to 'site/docs') diff --git a/site/docs/1.0.0/api/scala/org/apache/spark/graphx/Graph.html b/site/docs/1.0.0/api/scala/org/apache/spark/graphx/Graph.html index 0c261ea00..c04282e7f 100644 --- a/site/docs/1.0.0/api/scala/org/apache/spark/graphx/Graph.html +++ b/site/docs/1.0.0/api/scala/org/apache/spark/graphx/Graph.html @@ -316,7 +316,7 @@ provided for a particular vertex in the graph, the map function receives N (vid, data, optDeg) => optDeg.getOrElse(0) } -
  • +
  • @@ -328,7 +328,28 @@ provided for a particular vertex in the graph, the map function receives N partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

    -

    Repartitions the edges in the graph according to partitionStrategy.

    +

    Repartitions the edges in the graph according to partitionStrategy (WARNING: broken in +Spark 1․0․0).

    Repartitions the edges in the graph according to partitionStrategy (WARNING: broken in +Spark 1․0․0).

    To use this function in Spark 1.0.0, either build the latest version of Spark from the master +branch, or apply the following workaround:

    // Define our own version of partitionBy to work around SPARK-1931
    +import org.apache.spark.HashPartitioner
    +def partitionBy[ED](
    +    edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = {
    +  val numPartitions = edges.partitions.size
    +  edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
    +    .partitionBy(new HashPartitioner(numPartitions))
    +    .mapPartitions(_.map(_._2), preservesPartitioning = true)
    +}
    +
    +val vertices = ...
    +val edges = ...
    +
    +// Instead of:
    +val g = Graph(vertices, edges)
    +  .partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0
    +
    +// Use:
    +val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D))
  • diff --git a/site/docs/1.0.0/graphx-programming-guide.html b/site/docs/1.0.0/graphx-programming-guide.html index d77c4b229..8fea9a5d5 100644 --- a/site/docs/1.0.0/graphx-programming-guide.html +++ b/site/docs/1.0.0/graphx-programming-guide.html @@ -126,6 +126,7 @@
  • Background on Graph-Parallel Computation
  • GraphX Replaces the Spark Bagel API
  • Migrating from Spark 0.9.1
  • +
  • Workaround for Graph.partitionBy in Spark 1.0.0
  • Getting Started
  • @@ -238,6 +239,29 @@ explore the new GraphX API and comment on issues that may complicate the transit

    GraphX in Spark 1.0.0 contains one user-facing interface change from Spark 0.9.1. EdgeRDD may now store adjacent vertex attributes to construct the triplets, so it has gained a type parameter. The edges of a graph of type Graph[VD, ED] are of type EdgeRDD[ED, VD] rather than EdgeRDD[ED].

    +

    Workaround for Graph.partitionBy in Spark 1.0.0

    +

    + +

    The Graph.partitionBy operator allows users to choose the graph partitioning strategy, but due to SPARK-1931, this method is broken in Spark 1.0.0. We encourage users to build the latest version of Spark from the master branch, which contains a fix. Alternatively, a workaround is to partition the edges before constructing the graph, as follows:

    + +
    // Define our own version of partitionBy to work around SPARK-1931
    +import org.apache.spark.HashPartitioner
    +def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = {
    +  val numPartitions = edges.partitions.size
    +  edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
    +    .partitionBy(new HashPartitioner(numPartitions))
    +    .mapPartitions(_.map(_._2), preservesPartitioning = true)
    +}
    +
    +val vertices = ...
    +val edges = ...
    +
    +// Instead of:
    +val g = Graph(vertices, edges).partitionBy(PartitionStrategy.EdgePartition2D) // broken in Spark 1.0.0
    +
    +// Use:
    +val g = Graph(vertices, partitionBy(edges, PartitionStrategy.EdgePartition2D))
    +

    Getting Started

    To get started you first need to import Spark and GraphX into your project, as follows:

    @@ -245,8 +269,7 @@ explore the new GraphX API and comment on issues that may complicate the transit
    import org.apache.spark._
     import org.apache.spark.graphx._
     // To make some of the examples work we will also need RDD
    -import org.apache.spark.rdd.RDD
    -
    +import org.apache.spark.rdd.RDD

    If you are not using the Spark shell you will also need a SparkContext. To learn more about getting started with Spark refer to the Spark Quick Start Guide.

    @@ -268,7 +291,7 @@ are the types of the objects associated with each vertex and edge respectively.<

    GraphX optimizes the representation of vertex and edge types when they are plain old data-types -(e.g., int, double, etc…) reducing the in memory footprint by storing them in specialized +(e.g., int, double, etc…) reducing the in memory footprint by storing them in specialized arrays.

    @@ -280,8 +303,7 @@ bipartite graph we might do the following:

    case class UserProperty(val name: String) extends VertexProperty case class ProductProperty(val name: String, val price: Double) extends VertexProperty // The graph might then have the type: -var graph: Graph[VertexProperty, String] = null -
    +var graph: Graph[VertexProperty, String] = null

    Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note @@ -297,8 +319,7 @@ the vertices and edges of the graph:

    class Graph[VD, ED] {
       val vertices: VertexRDD[VD]
       val edges: EdgeRDD[ED, VD]
    -}
    -
    +}

    The classes VertexRDD[VD] and EdgeRDD[ED, VD] extend and are optimized versions of RDD[(VertexID, VD)] and RDD[Edge[ED]] respectively. Both VertexRDD[VD] and EdgeRDD[ED, VD] provide additional @@ -320,8 +341,7 @@ with a string describing the relationships between collaborators:

    The resulting graph would have the type signature:

    -
    val userGraph: Graph[(String, String), String]
    -
    +
    val userGraph: Graph[(String, String), String]

    There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on @@ -342,8 +362,7 @@ code constructs a graph from a collection of RDDs:

    // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph -val graph = Graph(users, relationships, defaultUser) - +val graph = Graph(users, relationships, defaultUser)

    In the above example we make use of the Edge case class. Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge @@ -356,8 +375,7 @@ and graph.edges members respectively.

    // Count all users which are postdocs graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count // Count all the edges where src > dst -graph.edges.filter(e => e.srcId > e.dstId).count - +graph.edges.filter(e => e.srcId > e.dstId).count

    Note that graph.vertices returns an VertexRDD[(String, String)] which extends @@ -365,8 +383,7 @@ and graph.edges members respectively.

    tuple. On the other hand, graph.edges returns an EdgeRDD containing Edge[String] objects. We could have also used the case class type constructor as in the following:

    -
    graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
    -
    +
    graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

    In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. @@ -376,8 +393,7 @@ The triplet view logically joins the vertex and edge properties yielding an

    SELECT src.id, dst.id, src.attr, e.attr, dst.attr
     FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
    -ON e.srcId = src.Id AND e.dstId = dst.Id
    -
    +ON e.srcId = src.Id AND e.dstId = dst.Id

    or graphically as:

    @@ -395,8 +411,7 @@ triplet view of a graph to render a collection of strings describing relationshi val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) -facts.collect.foreach(println(_)) - +facts.collect.foreach(println(_))

    Graph Operators

    @@ -410,8 +425,7 @@ compute the in-degree of each vertex (defined in GraphOps) by the f
    val graph: Graph[(String, String), String]
     // Use the implicit GraphOps.inDegrees operator
    -val inDegrees: VertexRDD[Int] = graph.inDegrees
    -
    +val inDegrees: VertexRDD[Int] = graph.inDegrees

    The reason for differentiating between core graph operations and GraphOps is to be able to support different graph representations in the future. Each graph representation must @@ -442,6 +456,8 @@ operations.

    def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] // Change the partitioning heuristic ============================================================ + // - WARNING: partitionBy is broken in Spark 1.0.0 due to SPARK-1931. + // See the section "Workaround for Graph.partitionBy in Spark 1.0.0" above. def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] // Transform vertex and edge attributes ========================================================== def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] @@ -482,8 +498,7 @@ operations.

    def connectedComponents(): Graph[VertexID, ED] def triangleCount(): Graph[Int, ED] def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] -} - +}

    Property Operators

    @@ -494,8 +509,7 @@ graph contains the following:

    def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] -} - +}

    Each of these operators yields a new graph with the vertex or edge properties modified by the user defined map function.

    @@ -507,15 +521,13 @@ following snippets are logically equivalent, but the first one does not preserve indices and would not benefit from the GraphX system optimizations:

    val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
    -val newGraph = Graph(newVertices, graph.edges)
    -
    +val newGraph = Graph(newVertices, graph.edges)

    Instead, use mapVertices to preserve the indices:

    -
    val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
    -
    +
    val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

    These operators are often used to initialize the graph for a particular computation or project away @@ -528,8 +540,7 @@ unnecessary properties. For example, given a graph with the out-degrees as the // Construct a graph where each edge contains the weight // and each vertex is the initial PageRank val outputGraph: Graph[Double, Double] = - inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0) - + inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

    Structural Operators

    @@ -543,8 +554,7 @@ add more in the future. The following is a list of the basic structural operato vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] -} - +}

    The reverse operator returns a new graph with all the edge directions reversed. This can be useful when, for example, trying to compute the inverse PageRank. Because the reverse @@ -582,8 +592,7 @@ interest or eliminate broken links. For example in the following code we remove validGraph.vertices.collect.foreach(println(_)) validGraph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 - ).collect.foreach(println(_)) - + ).collect.foreach(println(_))

    Note in the above example only the vertex predicate is provided. The subgraph operator defaults @@ -601,8 +610,7 @@ the answer to the valid subgraph.

    // Remove missing vertices as well as the edges to connected to them val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") // Restrict the answer to the valid subgraph -val validCCGraph = ccGraph.mask(validGraph) - +val validCCGraph = ccGraph.mask(validGraph)

    The groupEdges operator merges parallel edges (i.e., duplicate edges between pairs of vertices) in the multigraph. In many numerical applications, parallel edges can be added @@ -621,8 +629,7 @@ using the join operators. Below we list the key join operators:

    : Graph[VD, ED] def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] -} - +}

    The joinVertices operator joins the vertices with the input RDD and returns a new graph with the vertex properties obtained by applying the user defined map function @@ -638,8 +645,7 @@ also pre-index the resulting values to substantially accelerate the sub val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b) val joinedGraph = graph.joinVertices(uniqueCosts)( - (id, oldCost, extraCost) => oldCost + extraCost) - + (id, oldCost, extraCost) => oldCost + extraCost)

    The more general outerJoinVertices behaves similarly to joinVertices @@ -654,8 +660,7 @@ vertex properties with their outDegree.

    case Some(outDeg) => outDeg case None => 0 // No outDegree means zero outDegree } -} - +}

    You may have noticed the multiple parameter lists (e.g., f(a)(b)) curried function pattern used @@ -664,8 +669,7 @@ that type inference on b would not depend on a. As a provide type annotation for the user defined function:

    val joinedGraph = graph.joinVertices(uniqueCosts,
    -  (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)
    -
    + (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)

    Neighborhood Aggregation

    @@ -687,8 +691,7 @@ PageRank Value, shortest path to the source, and smallest reachable vertex id).< map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduce: (A, A) => A) : VertexRDD[A] -} - +}

    The mapReduceTriplets operator takes a user defined map function which is applied to each triplet and can yield messages destined to either (none or both) vertices in @@ -705,8 +708,7 @@ receive a message are not included in the returned VertexRDD.

    vertices in the provided VertexRDD:

    -
      activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None
    -
    +
    activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None

    The EdgeDirection specifies which edges adjacent to the vertex set are included in the map @@ -748,8 +750,7 @@ more senior followers of each user.

    val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ) // Display the results -avgAgeOfOlderFollowers.collect.foreach(println(_)) - +avgAgeOfOlderFollowers.collect.foreach(println(_))

    Note that the mapReduceTriplets operation performs optimally when the messages (and the sums of @@ -773,8 +774,7 @@ compute the max in, out, and total degrees:

    // Compute the max degrees val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) -val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) - +val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)

    Collecting Neighbors

    @@ -786,8 +786,7 @@ attributes at each vertex. This can be easily accomplished using the
    class GraphOps[VD, ED] {
       def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
       def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
    -}
    -
    +}

    Note that these operators can be quite costly as they duplicate information and require @@ -862,8 +861,7 @@ of its implementation (note calls to graph.cache have been removed):

    } g } -} - +}

    Notice that Pregel takes two argument lists (i.e., graph.pregel(list1)(list2)). The first argument list contains configuration parameters including the initial message, the maximum number of @@ -894,13 +892,12 @@ shortest path in the following example.

    }, (a,b) => math.min(a,b) // Merge Message ) -println(sssp.vertices.collect.mkString("\n")) - +println(sssp.vertices.collect.mkString("\n"))

    Graph Builders

    -

    GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph’s edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). Graph.groupEdges requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call Graph.partitionBy before calling groupEdges.

    +

    GraphX provides several ways of building a graph from a collection of vertices and edges in an RDD or on disk. None of the graph builders repartitions the graph’s edges by default; instead, edges are left in their default partitions (such as their original blocks in HDFS). Graph.groupEdges requires the graph to be repartitioned because it assumes identical edges will be colocated on the same partition, so you must call Graph.partitionBy before calling groupEdges. However, note that Graph.partitionBy is broken in Spark 1.0.0 due to SPARK-1931; see the suggested workarounds above.

    object GraphLoader {
       def edgeListFile(
    @@ -909,8 +906,7 @@ shortest path in the following example.

    canonicalOrientation: Boolean = false, minEdgePartitions: Int = 1) : Graph[Int, Int] -} -
    +}

    GraphLoader.edgeListFile provides a way to load a graph from a list of edges on disk. It parses an adjacency list of (source vertex ID, destination vertex ID) pairs of the following form, skipping comment lines that begin with #:

    @@ -938,8 +934,7 @@ shortest path in the following example.

    defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] -} - +}

    Graph.apply allows creating a graph from RDDs of vertices and edges. Duplicate vertices are picked arbitrarily and vertices found in the edge RDD but not the vertex RDD are assigned the default attribute.

    @@ -978,8 +973,7 @@ additional functionality:

    def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] -} - +}

    Notice, for example, how the filter operator returns an VertexRDD. Filter is actually implemented using a BitSet thereby reusing the index and preserving the ability to do fast joins @@ -1001,8 +995,7 @@ both aggregate and then subsequently index the RDD[(VertexID, A)]. // There should be 100 entries in setB setB.count // Joining A and B should now be fast! -val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b) - +val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

    EdgeRDDs

    @@ -1018,8 +1011,7 @@ reuse when changing attribute values.

    // Revere the edges reusing both attributes and structure def reverse: EdgeRDD[ED, VD] // Join two `EdgeRDD`s partitioned using the same partitioning strategy. -def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] - +def innerJoin[ED2, ED3](other: EdgeRDD[ED2, VD])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]

    In most applications we have found that operations on the EdgeRDD are accomplished through the graph operators or rely on operations defined in the base RDD class.

    @@ -1040,10 +1032,12 @@ distributed graph partitioning:

    reduce both the communication and storage overhead. Logically, this corresponds to assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the PartitionStrategy and there are several tradeoffs to the -various heuristics. Users can choose between different strategies by repartitioning the graph with -the Graph.partitionBy operator. The default partitioning strategy is to use -the initial partitioning of the edges as provided on graph construction. However, users can easily -switch to 2D-partitioning or other heuristics included in GraphX.

    +various heuristics. The default partitioning strategy is to use the initial partitioning of the +edges as provided on graph construction. However, users can easily switch to 2D-partitioning or +other heuristics included in GraphX.

    + +

    Users can choose between different strategies by repartitioning the graph with +the Graph.partitionBy operator. However, note that Graph.partitionBy is broken in Spark 1.0.0 due to SPARK-1931; see the suggested workarounds above.

    RDD Graph Representation @@ -1065,7 +1059,7 @@ to broadcast vertices when implementing the join required for operations like PageRank

    -

    PageRank measures the importance of each vertex in a graph, assuming an edge from u to v represents an endorsement of v’s importance by u. For example, if a Twitter user is followed by many others, the user will be ranked highly.

    +

    PageRank measures the importance of each vertex in a graph, assuming an edge from u to v represents an endorsement of v’s importance by u. For example, if a Twitter user is followed by many others, the user will be ranked highly.

    GraphX comes with static and dynamic implementations of PageRank as methods on the PageRank object. Static PageRank runs for a fixed number of iterations, while dynamic PageRank runs until the ranks converge (i.e., stop changing by more than a specified tolerance). GraphOps allows calling these algorithms directly as methods on Graph.

    @@ -1084,8 +1078,7 @@ to broadcast vertices when implementing the join required for operations like case (id, (username, rank)) => (username, rank) } // Print the result -println(ranksByUsername.collect().mkString("\n")) - +println(ranksByUsername.collect().mkString("\n"))

    Connected Components

    @@ -1104,15 +1097,26 @@ to broadcast vertices when implementing the join required for operations like case (id, (username, cc)) => (username, cc) } // Print the result -println(ccByUsername.collect().mkString("\n")) - +println(ccByUsername.collect().mkString("\n"))

    Triangle Counting

    -

    A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the TriangleCount object that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the PageRank section. Note that TriangleCount requires the edges to be in canonical orientation (srcId < dstId) and the graph to be partitioned using Graph.partitionBy.

    +

    A vertex is part of a triangle when it has two adjacent vertices with an edge between them. GraphX implements a triangle counting algorithm in the TriangleCount object that determines the number of triangles passing through each vertex, providing a measure of clustering. We compute the triangle count of the social network dataset from the PageRank section. Note that TriangleCount requires the edges to be in canonical orientation (srcId < dstId) and the graph to be partitioned using Graph.partitionBy. Also note that Graph.partitionBy is broken in Spark 1.0.0 due to SPARK-1931; see the suggested workarounds above.

    + +
    // Define our own version of partitionBy to work around SPARK-1931
    +import org.apache.spark.HashPartitioner
    +def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: PartitionStrategy): RDD[Edge[ED]] = {
    +  val numPartitions = edges.partitions.size
    +  edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions), e))
    +    .partitionBy(new HashPartitioner(numPartitions))
    +    .mapPartitions(_.map(_._2), preservesPartitioning = true)
    +}
     
    -
    // Load the edges in canonical order and partition the graph for triangle count
    -val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
    +// Load the edges in canonical order and partition the graph for triangle count
    +val unpartitionedGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true)
    +val graph = Graph(
    +  partitionBy(unpartitionedGraph.edges, PartitionStrategy.RandomVertexCut),
    +  unpartitionedGraph.vertices)
     // Find the triangle count for each vertex
     val triCounts = graph.triangleCount().vertices
     // Join the triangle counts with the usernames
    @@ -1124,8 +1128,7 @@ to broadcast vertices when implementing the join required for operations like (username, tc)
     }
     // Print the result
    -println(triCountByUsername.collect().mkString("\n"))
    -
    +println(triCountByUsername.collect().mkString("\n"))

    Examples

    @@ -1163,8 +1166,7 @@ all of this in just a few lines with GraphX:

    case (uid, attrList, None) => (0.0, attrList.toList) } -println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) - +println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) -- cgit v1.2.3