aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorJoey <joseph.e.gonzalez@gmail.com>2013-12-16 15:08:08 -0800
committerJoey <joseph.e.gonzalez@gmail.com>2013-12-16 15:08:08 -0800
commit5192ef38594f8341c25f7091cd50209f64128e0f (patch)
tree8f79bbc777a0400d40b1993b71a23473eab32dd8 /graph/src
parentded10ce5b082f5d8d7b39c93dffba152f7de7162 (diff)
parent016cabceca624611b2be33c29649a78aee9c0f39 (diff)
downloadspark-5192ef38594f8341c25f7091cd50209f64128e0f.tar.gz
spark-5192ef38594f8341c25f7091cd50209f64128e0f.tar.bz2
spark-5192ef38594f8341c25f7091cd50209f64128e0f.zip
Merge pull request #94 from ankurdave/load-edges-columnar
Load edges in columnar format
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala82
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala17
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala52
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala61
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala141
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala2
7 files changed, 202 insertions, 157 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index ac50e9a388..2012dadb2f 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -58,14 +58,14 @@ object Analytics extends Logging {
var outFname = ""
var numVPart = 4
var numEPart = 4
- var partitionStrategy: PartitionStrategy = RandomVertexCut
+ var partitionStrategy: Option[PartitionStrategy] = None
options.foreach{
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
@@ -75,8 +75,9 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "PageRank(" + fname + ")")
- val graph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart, partitionStrategy = partitionStrategy).cache()
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
@@ -96,44 +97,47 @@ object Analytics extends Logging {
case "cc" => {
- var numIter = Int.MaxValue
- var numVPart = 4
- var numEPart = 4
- var isDynamic = false
- var partitionStrategy: PartitionStrategy = RandomVertexCut
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- if(!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
- println("======================================")
- println("| Connected Components |")
- println("--------------------------------------")
- println(" Using parameters:")
- println(" \tDynamic: " + isDynamic)
- println(" \tNumIter: " + numIter)
- println("======================================")
-
- val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
- val graph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
- val cc = ConnectedComponents.run(graph)
- println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
- sc.stop()
- }
+ var numIter = Int.MaxValue
+ var numVPart = 4
+ var numEPart = 4
+ var isDynamic = false
+ var partitionStrategy: Option[PartitionStrategy] = None
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ if(!isDynamic && numIter == Int.MaxValue) {
+ println("Set number of iterations!")
+ sys.exit(1)
+ }
+ println("======================================")
+ println("| Connected Components |")
+ println("--------------------------------------")
+ println(" Using parameters:")
+ println(" \tDynamic: " + isDynamic)
+ println(" \tNumIter: " + numIter)
+ println("======================================")
+
+ val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ val cc = ConnectedComponents.run(graph)
+ println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
+ sc.stop()
+ }
case "triangles" => {
var numVPart = 4
var numEPart = 4
+ // TriangleCount requires the graph to be partitioned
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
@@ -147,7 +151,7 @@ object Analytics extends Logging {
println("--------------------------------------")
val sc = new SparkContext(host, "TriangleCount(" + fname + ")")
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
+ minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
case (vid,data) => data.toLong
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index 24844262bc..9a1ebbcdcc 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -1,7 +1,6 @@
package org.apache.spark.graph
-
-import org.apache.spark.{TaskContext, Partition, OneToOneDependency}
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graph.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -13,10 +12,16 @@ class EdgeRDD[@specialized ED: ClassManifest](
partitionsRDD.setName("EdgeRDD")
- override val partitioner = partitionsRDD.partitioner
-
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+ /**
+ * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
+ * partitionsRDD correspond to the actual partitions and create a new partitioner that allows
+ * co-partitioning with partitionsRDD.
+ */
+ override val partitioner =
+ partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+
override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val edgePartition = partitionsRDD.compute(split, context).next()._2
edgePartition.iterator
@@ -57,4 +62,8 @@ class EdgeRDD[@specialized ED: ClassManifest](
}
}
+ def collectVids(): RDD[Vid] = {
+ partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
+ }
+
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index a0907c319a..d31d9dead0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -86,6 +86,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def cache(): Graph[VD, ED]
/**
+ * Repartition the edges in the graph according to partitionStrategy.
+ */
+ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
+
+ /**
* Compute statistics describing the graph representation.
*/
def statistics: Map[String, Any]
@@ -193,18 +198,15 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
/**
- * This function merges multiple edges between two vertices into a
- * single Edge. See
- * [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more
- * detail.
+ * This function merges multiple edges between two vertices into a single Edge. For correct
+ * results, the graph must have been partitioned using partitionBy.
*
* @tparam ED2 the type of the resulting edge data after grouping.
*
- * @param f the user supplied commutative associative function to merge
- * edge attributes for duplicate edges.
+ * @param f the user supplied commutative associative function to merge edge attributes for
+ * duplicate edges.
*
- * @return Graph[VD,ED2] The resulting graph with a single Edge for
- * each source, dest vertex pair.
+ * @return Graph[VD,ED2] The resulting graph with a single Edge for each source, dest vertex pair.
*/
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
@@ -294,26 +296,26 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
object Graph {
/**
- * Construct a graph from a collection of edges encoded as vertex id
- * pairs.
+ * Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src,dst) form.
- * @param uniqueEdges if multiple identical edges are found they are
- * combined and the edge attribute is set to the sum. Otherwise
- * duplicate edges are treated as separate.
+ * @param uniqueEdges if multiple identical edges are found they are combined and the edge
+ * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
+ * uniqueEdges, a [[PartitionStrategy]] must be provided.
*
- * @return a graph with edge attributes containing either the count
- * of duplicate edges or 1 (if `uniqueEdges=false`) and vertex
- * attributes containing the total degree of each vertex.
+ * @return a graph with edge attributes containing either the count of duplicate edges or 1
+ * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassManifest](
rawEdges: RDD[(Vid, Vid)],
defaultValue: VD,
- uniqueEdges: Boolean = false,
- partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, Int] = {
+ uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
- val graph = GraphImpl(edges, defaultValue, partitionStrategy)
- if (uniqueEdges) graph.groupEdges((a, b) => a + b) else graph
+ val graph = GraphImpl(edges, defaultValue)
+ uniqueEdges match {
+ case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
+ case None => graph
+ }
}
/**
@@ -327,9 +329,8 @@ object Graph {
*/
def fromEdges[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
- defaultValue: VD,
- partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = {
- GraphImpl(edges, defaultValue, partitionStrategy)
+ defaultValue: VD): Graph[VD, ED] = {
+ GraphImpl(edges, defaultValue)
}
/**
@@ -350,9 +351,8 @@ object Graph {
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD = null.asInstanceOf[VD],
- partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = {
- GraphImpl(vertices, edges, defaultVertexAttr, partitionStrategy)
+ defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
+ GraphImpl(vertices, edges, defaultVertexAttr)
}
/**
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index 29d14452de..a69bfde532 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -1,9 +1,13 @@
package org.apache.spark.graph
-import org.apache.spark.SparkContext
+import java.util.{Arrays => JArrays}
+import org.apache.spark.graph.impl.EdgePartitionBuilder
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.graph.impl.{EdgePartition, GraphImpl}
+import org.apache.spark.util.collection.PrimitiveVector
-object GraphLoader {
+object GraphLoader extends Logging {
/**
* Load an edge list from file initializing the Graph
@@ -22,8 +26,7 @@ object GraphLoader {
sc: SparkContext,
path: String,
edgeParser: Array[String] => ED,
- minEdgePartitions: Int = 1,
- partitionStrategy: PartitionStrategy = RandomVertexCut):
+ minEdgePartitions: Int = 1):
Graph[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
@@ -40,7 +43,7 @@ object GraphLoader {
Edge(source, target, edata)
})
val defaultVertexAttr = 1
- Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
+ Graph.fromEdges(edges, defaultVertexAttr)
}
/**
@@ -70,31 +73,39 @@ object GraphLoader {
* @tparam ED
* @return
*/
- def edgeListFile[ED: ClassManifest](
+ def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
- minEdgePartitions: Int = 1,
- partitionStrategy: PartitionStrategy = RandomVertexCut):
+ minEdgePartitions: Int = 1):
Graph[Int, Int] = {
- // Parse the edge data table
- val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
- iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
- val lineArray = line.split("\\s+")
- if(lineArray.length < 2) {
- println("Invalid line: " + line)
- assert(false)
- }
- val source = lineArray(0).trim.toLong
- val target = lineArray(1).trim.toLong
- if (canonicalOrientation && target > source) {
- Edge(target, source, 1)
- } else {
- Edge(source, target, 1)
+ val startTime = System.currentTimeMillis
+
+ // Parse the edge data table directly into edge partitions
+ val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[Int]
+ iter.foreach { line =>
+ if (!line.isEmpty && line(0) != '#') {
+ val lineArray = line.split("\\s+")
+ if (lineArray.length < 2) {
+ logWarning("Invalid line: " + line)
+ }
+ val srcId = lineArray(0).toLong
+ val dstId = lineArray(1).toLong
+ if (canonicalOrientation && dstId > srcId) {
+ builder.add(dstId, srcId, 1)
+ } else {
+ builder.add(srcId, dstId, 1)
+ }
}
- })
- val defaultVertexAttr = 1
- Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }.cache()
+ edges.count()
+
+ logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
+
+ GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
} // end of edgeListFile
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
index 963986d20d..b1cd3c47d0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
@@ -16,7 +16,9 @@ object TriangleCount {
* triangle is counted twice.
*
*
- * @param graph a graph with `sourceId` less than `destId`
+ * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned
+ * using Graph.partitionBy.
+ *
* @return
*/
def run[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD,ED]): Graph[Int, ED] = {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 771c460345..ba2ebe6497 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -1,5 +1,6 @@
package org.apache.spark.graph.impl
+import org.apache.spark.util.collection.PrimitiveVector
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.graph._
@@ -40,6 +41,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement))
}
+ def this(
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED]) = {
+ this(vertices, edges, new VertexPlacement(edges, vertices))
+ }
+
/** Return a RDD that brings edges with its source and destination vertices together. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
val vdManifest = classManifest[VD]
@@ -60,6 +67,28 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+ override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
+ val numPartitions = edges.partitions.size
+ val edManifest = classManifest[ED]
+ val newEdges = new EdgeRDD(edges.map { e =>
+ val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+
+ // Should we be using 3-tuple or an optimized class
+ new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+ }
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]()(edManifest)
+ iter.foreach { message =>
+ val data = message.data
+ builder.add(data._1, data._2, data._3)
+ }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true).cache())
+ new GraphImpl(vertices, newEdges)
+ }
+
override def statistics: Map[String, Any] = {
// Get the total number of vertices after replication, used to compute the replication ratio.
def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
@@ -175,10 +204,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
- // Construct the VertexPlacement map
- val newVertexPlacement = new VertexPlacement(newETable, newVTable)
-
- new GraphImpl(newVTable, newETable, newVertexPlacement)
+ new GraphImpl(newVTable, newETable)
} // end of subgraph
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
@@ -246,6 +272,14 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
changedVerts, edges, vertexPlacement, Some(vTableReplicated))
new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
}
+
+ private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
+ try {
+ BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
+ } catch {
+ case _: ClassNotFoundException => true // if we don't know, be conservative
+ }
+ }
} // end of class GraphImpl
@@ -253,54 +287,35 @@ object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
- defaultValue: VD,
- partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- val etable = createETable(edges, partitionStrategy).cache()
-
- // Get the set of all vids
- val vids = etable.flatMap { e =>
- Iterator((e.srcId, 0), (e.dstId, 0))
- }
-
- // Shuffle the vids and create the VertexRDD.
- // TODO: Consider doing map side distinct before shuffle.
- val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)](
- vids, new HashPartitioner(edges.partitions.size))
- shuffled.setSerializer(classOf[VidMsgSerializer].getName)
- val vtable = VertexRDD(shuffled.mapValues(x => defaultValue))
+ fromEdgeRDD(createETable(edges), defaultVertexAttr)
+ }
- val vertexPlacement = new VertexPlacement(etable, vtable)
- new GraphImpl(vtable, etable, vertexPlacement)
+ def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr)
}
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD,
- partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- vertices.cache()
- val etable = createETable(edges, partitionStrategy).cache()
+ val etable = createETable(edges).cache()
+
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
-
val vPartitioned = vertices.partitionBy(partitioner)
-
- val vidsFromEdges = {
- etable.partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
- .map(vid => (vid, 0))
- .partitionBy(partitioner)
- }
-
+ val vidsFromEdges = collectVidsFromEdges(etable, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr)
- val vertexPlacement = new VertexPlacement(etable, vtable)
- new GraphImpl(vtable, etable, vertexPlacement)
+ new GraphImpl(vtable, etable)
}
/**
@@ -311,37 +326,41 @@ object GraphImpl {
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
- protected def createETable[ED: ClassManifest](
- edges: RDD[Edge[ED]],
- partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
- // Get the number of partitions
- val numPartitions = edges.partitions.size
-
- val eTable = edges.map { e =>
- val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
-
- // Should we be using 3-tuple or an optimized class
- new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
- }
- .partitionBy(new HashPartitioner(numPartitions))
- .mapPartitionsWithIndex( { (pid, iter) =>
+ private def createETable[ED: ClassManifest](
+ edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+ val eTable = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
- iter.foreach { message =>
- val data = message.data
- builder.add(data._1, data._2, data._3)
+ iter.foreach { e =>
+ builder.add(e.srcId, e.dstId, e.attr)
}
- val edgePartition = builder.toEdgePartition
- Iterator((pid, edgePartition))
- }, preservesPartitioning = true).cache()
+ Iterator((pid, builder.toEdgePartition))
+ }
new EdgeRDD(eTable)
}
- private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
- try {
- BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
- } catch {
- case _: ClassNotFoundException => true // if we don't know, be conservative
- }
+ private def createETableFromEdgePartitions[ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = {
+ new EdgeRDD(edges)
}
+ private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
+ edges: EdgeRDD[ED],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ edges.cache()
+ // Get the set of all vids
+ val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ // Create the VertexRDD.
+ val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr))
+ new GraphImpl(vtable, edges)
+ }
+
+ /** Collects all vids mentioned in edges and partitions them by partitioner. */
+ private def collectVidsFromEdges(
+ edges: EdgeRDD[_],
+ partitioner: Partitioner): RDD[(Vid, Int)] = {
+ // TODO: Consider doing map side distinct before shuffle.
+ new ShuffledRDD[Vid, Int, (Vid, Int)](
+ edges.collectVids.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VidMsgSerializer].getName)
+ }
} // end of object GraphImpl
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
index 2e6b57a8ec..b413b4587e 100644
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
@@ -251,7 +251,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
- val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = TriangleCount.run(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }