diff options
author | Semih Salihoglu <semihsalihoglu@gmail.com> | 2014-02-24 22:42:30 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-02-24 22:42:30 -0800 |
commit | 1f4c7f7ecc9d2393663fc4d059e71fe4c70bad84 (patch) | |
tree | adb39bfef3d08da56b8e33c882070d911ebaf1bd /graphx/src/main/scala | |
parent | a4f4fbc8fa5886a8c6ee58ee614de0cc6e67dcd7 (diff) | |
download | spark-1f4c7f7ecc9d2393663fc4d059e71fe4c70bad84.tar.gz spark-1f4c7f7ecc9d2393663fc4d059e71fe4c70bad84.tar.bz2 spark-1f4c7f7ecc9d2393663fc4d059e71fe4c70bad84.zip |
Graph primitives2
Hi guys,
I'm following Joey and Ankur's suggestions to add collectEdges and pickRandomVertex. I'm also adding the tests for collectEdges and refactoring one method getCycleGraph in GraphOpsSuite.scala.
Thank you,
semih
Author: Semih Salihoglu <semihsalihoglu@gmail.com>
Closes #580 from semihsalihoglu/GraphPrimitives2 and squashes the following commits:
937d3ec [Semih Salihoglu] - Fixed the scalastyle errors.
a69a152 [Semih Salihoglu] - Adding collectEdges and pickRandomVertices. - Adding tests for collectEdges. - Refactoring a getCycle utility function for GraphOpsSuite.scala.
41265a6 [Semih Salihoglu] - Adding collectEdges and pickRandomVertex. - Adding tests for collectEdges. - Recycling a getCycle utility test file.
Diffstat (limited to 'graphx/src/main/scala')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 59 |
1 files changed, 58 insertions, 1 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 0fc1e4df68..377d9d6bd5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -18,11 +18,11 @@ package org.apache.spark.graphx import scala.reflect.ClassTag - import org.apache.spark.SparkContext._ import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD +import scala.util.Random /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the @@ -138,6 +138,42 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } // end of collectNeighbor /** + * Returns an RDD that contains for each vertex v its local edges, + * i.e., the edges that are incident on v, in the user-specified direction. + * Warning: note that singleton vertices, those with no edges in the given + * direction will not be part of the return value. + * + * @note This function could be highly inefficient on power-law + * graphs where high degree vertices may force a large amount of + * information to be collected to a single location. + * + * @param edgeDirection the direction along which to collect + * the local edges of vertices + * + * @return the local edges for each vertex + */ + def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]] = { + edgeDirection match { + case EdgeDirection.Either => + graph.mapReduceTriplets[Array[Edge[ED]]]( + edge => Iterator((edge.srcId, Array(new Edge(edge.srcId, edge.dstId, edge.attr))), + (edge.dstId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))), + (a, b) => a ++ b) + case EdgeDirection.In => + graph.mapReduceTriplets[Array[Edge[ED]]]( + edge => Iterator((edge.dstId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))), + (a, b) => a ++ b) + case EdgeDirection.Out => + graph.mapReduceTriplets[Array[Edge[ED]]]( + edge => Iterator((edge.srcId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))), + (a, b) => a ++ b) + case EdgeDirection.Both => + throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" + + "EdgeDirection.Either instead.") + } + } + + /** * Join the vertices with an RDD and then apply a function from the * the vertex and RDD entry to a new vertex value. The input table * should contain at most one entry for each vertex. If no entry is @@ -210,6 +246,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } /** + * Picks a random vertex from the graph and returns its ID. + */ + def pickRandomVertex(): VertexId = { + val probability = 50 / graph.numVertices + var found = false + var retVal: VertexId = null.asInstanceOf[VertexId] + while (!found) { + val selectedVertices = graph.vertices.flatMap { vidVvals => + if (Random.nextDouble() < probability) { Some(vidVvals._1) } + else { None } + } + if (selectedVertices.count > 1) { + found = true + val collectedVertices = selectedVertices.collect() + retVal = collectedVertices(Random.nextInt(collectedVertices.size)) + } + } + retVal + } + + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The * user-defined vertex-program `vprog` is executed in parallel on * each vertex receiving any inbound messages and computing a new |