aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-16 16:24:43 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-16 16:24:43 -0800
commit9df565007b3d9cc167b0b55d3d9bb5fc3df0e225 (patch)
treeab6559074cf2fbbd0a31c044c584f04aa707fd6b /graph/src
parent0459747c1c146f036f99b38a0db75372257e41a1 (diff)
parent5192ef38594f8341c25f7091cd50209f64128e0f (diff)
downloadspark-9df565007b3d9cc167b0b55d3d9bb5fc3df0e225.tar.gz
spark-9df565007b3d9cc167b0b55d3d9bb5fc3df0e225.tar.bz2
spark-9df565007b3d9cc167b0b55d3d9bb5fc3df0e225.zip
Merge remote-tracking branch 'upstream/master' into mrTriplets-active-set
Conflicts: graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala82
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala17
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala52
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala13
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala61
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala141
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala44
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala2
9 files changed, 244 insertions, 172 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index ac50e9a388..2012dadb2f 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -58,14 +58,14 @@ object Analytics extends Logging {
var outFname = ""
var numVPart = 4
var numEPart = 4
- var partitionStrategy: PartitionStrategy = RandomVertexCut
+ var partitionStrategy: Option[PartitionStrategy] = None
options.foreach{
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
@@ -75,8 +75,9 @@ object Analytics extends Logging {
val sc = new SparkContext(host, "PageRank(" + fname + ")")
- val graph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart, partitionStrategy = partitionStrategy).cache()
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
println("GRAPHX: Number of vertices " + graph.vertices.count)
println("GRAPHX: Number of edges " + graph.edges.count)
@@ -96,44 +97,47 @@ object Analytics extends Logging {
case "cc" => {
- var numIter = Int.MaxValue
- var numVPart = 4
- var numEPart = 4
- var isDynamic = false
- var partitionStrategy: PartitionStrategy = RandomVertexCut
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
- case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
- }
-
- if(!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
- println("======================================")
- println("| Connected Components |")
- println("--------------------------------------")
- println(" Using parameters:")
- println(" \tDynamic: " + isDynamic)
- println(" \tNumIter: " + numIter)
- println("======================================")
-
- val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
- val graph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
- val cc = ConnectedComponents.run(graph)
- println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
- sc.stop()
- }
+ var numIter = Int.MaxValue
+ var numVPart = 4
+ var numEPart = 4
+ var isDynamic = false
+ var partitionStrategy: Option[PartitionStrategy] = None
+
+ options.foreach{
+ case ("numIter", v) => numIter = v.toInt
+ case ("dynamic", v) => isDynamic = v.toBoolean
+ case ("numEPart", v) => numEPart = v.toInt
+ case ("numVPart", v) => numVPart = v.toInt
+ case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
+ }
+
+ if(!isDynamic && numIter == Int.MaxValue) {
+ println("Set number of iterations!")
+ sys.exit(1)
+ }
+ println("======================================")
+ println("| Connected Components |")
+ println("--------------------------------------")
+ println(" Using parameters:")
+ println(" \tDynamic: " + isDynamic)
+ println(" \tNumIter: " + numIter)
+ println("======================================")
+
+ val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")")
+ val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
+ minEdgePartitions = numEPart).cache()
+ val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
+
+ val cc = ConnectedComponents.run(graph)
+ println("Components: " + cc.vertices.map{ case (vid,data) => data}.distinct())
+ sc.stop()
+ }
case "triangles" => {
var numVPart = 4
var numEPart = 4
+ // TriangleCount requires the graph to be partitioned
var partitionStrategy: PartitionStrategy = RandomVertexCut
options.foreach{
@@ -147,7 +151,7 @@ object Analytics extends Logging {
println("--------------------------------------")
val sc = new SparkContext(host, "TriangleCount(" + fname + ")")
val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
- minEdgePartitions = numEPart, partitionStrategy=partitionStrategy).cache()
+ minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
case (vid,data) => data.toLong
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index 126379d1ce..ee368ebb41 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -1,7 +1,6 @@
package org.apache.spark.graph
-
-import org.apache.spark.{TaskContext, Partition, OneToOneDependency}
+import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graph.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -13,10 +12,16 @@ class EdgeRDD[@specialized ED: ClassManifest](
partitionsRDD.setName("EdgeRDD")
- override val partitioner = partitionsRDD.partitioner
-
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+ /**
+ * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
+ * partitionsRDD correspond to the actual partitions and create a new partitioner that allows
+ * co-partitioning with partitionsRDD.
+ */
+ override val partitioner =
+ partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+
override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val edgePartition = partitionsRDD.compute(split, context).next()._2
edgePartition.iterator
@@ -55,4 +60,8 @@ class EdgeRDD[@specialized ED: ClassManifest](
}
}
+ def collectVids(): RDD[Vid] = {
+ partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
+ }
+
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index f824b75af9..0dc5ec8b24 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -86,6 +86,11 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
def cache(): Graph[VD, ED]
/**
+ * Repartition the edges in the graph according to partitionStrategy.
+ */
+ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
+
+ /**
* Compute statistics describing the graph representation.
*/
def statistics: Map[String, Any]
@@ -193,18 +198,15 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
vpred: (Vid, VD) => Boolean = ((v,d) => true) ): Graph[VD, ED]
/**
- * This function merges multiple edges between two vertices into a
- * single Edge. See
- * [[org.apache.spark.graph.Graph.groupEdgeTriplets]] for more
- * detail.
+ * This function merges multiple edges between two vertices into a single Edge. For correct
+ * results, the graph must have been partitioned using partitionBy.
*
* @tparam ED2 the type of the resulting edge data after grouping.
*
- * @param f the user supplied commutative associative function to merge
- * edge attributes for duplicate edges.
+ * @param f the user supplied commutative associative function to merge edge attributes for
+ * duplicate edges.
*
- * @return Graph[VD,ED2] The resulting graph with a single Edge for
- * each source, dest vertex pair.
+ * @return Graph[VD,ED2] The resulting graph with a single Edge for each source, dest vertex pair.
*/
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
@@ -298,26 +300,26 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
object Graph {
/**
- * Construct a graph from a collection of edges encoded as vertex id
- * pairs.
+ * Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src,dst) form.
- * @param uniqueEdges if multiple identical edges are found they are
- * combined and the edge attribute is set to the sum. Otherwise
- * duplicate edges are treated as separate.
+ * @param uniqueEdges if multiple identical edges are found they are combined and the edge
+ * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
+ * uniqueEdges, a [[PartitionStrategy]] must be provided.
*
- * @return a graph with edge attributes containing either the count
- * of duplicate edges or 1 (if `uniqueEdges=false`) and vertex
- * attributes containing the total degree of each vertex.
+ * @return a graph with edge attributes containing either the count of duplicate edges or 1
+ * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassManifest](
rawEdges: RDD[(Vid, Vid)],
defaultValue: VD,
- uniqueEdges: Boolean = false,
- partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, Int] = {
+ uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
- val graph = GraphImpl(edges, defaultValue, partitionStrategy)
- if (uniqueEdges) graph.groupEdges((a, b) => a + b) else graph
+ val graph = GraphImpl(edges, defaultValue)
+ uniqueEdges match {
+ case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
+ case None => graph
+ }
}
/**
@@ -331,9 +333,8 @@ object Graph {
*/
def fromEdges[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
- defaultValue: VD,
- partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = {
- GraphImpl(edges, defaultValue, partitionStrategy)
+ defaultValue: VD): Graph[VD, ED] = {
+ GraphImpl(edges, defaultValue)
}
/**
@@ -354,9 +355,8 @@ object Graph {
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD = null.asInstanceOf[VD],
- partitionStrategy: PartitionStrategy = RandomVertexCut): Graph[VD, ED] = {
- GraphImpl(vertices, edges, defaultVertexAttr, partitionStrategy)
+ defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
+ GraphImpl(vertices, edges, defaultVertexAttr)
}
/**
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index 856a9aca37..5618ce6272 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -24,6 +24,8 @@ object GraphLab {
* @param scatterFunc Executed after the apply function the scatter function takes
* a triplet and signals whether the neighboring vertex program
* must be recomputed.
+ * @param startVertices predicate to determine which vertices to start the computation on.
+ * these will be the active vertices in the first iteration.
* @param numIter The maximum number of iterations to run.
* @param gatherDirection The direction of edges to consider during the gather phase
* @param scatterDirection The direction of edges to consider during the scatter phase
@@ -40,12 +42,13 @@ object GraphLab {
(gatherFunc: (Vid, EdgeTriplet[VD, ED]) => A,
mergeFunc: (A, A) => A,
applyFunc: (Vid, VD, Option[A]) => VD,
- scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean): Graph[VD, ED] = {
+ scatterFunc: (Vid, EdgeTriplet[VD, ED]) => Boolean,
+ startVertices: (Vid, VD) => Boolean = (vid: Vid, data: VD) => true): Graph[VD, ED] = {
// Add an active attribute to all vertices to track convergence.
var activeGraph: Graph[(Boolean, VD), ED] = graph.mapVertices {
- case (id, data) => (true, data)
+ case (id, data) => (startVertices(id, data), data)
}.cache()
// The gather function wrapper strips the active attribute and
@@ -86,9 +89,9 @@ object GraphLab {
}
// Used to set the active status of vertices for the next round
- def applyActive(vid: Vid, data: (Boolean, VD), newActive: Boolean): (Boolean, VD) = {
+ def applyActive(vid: Vid, data: (Boolean, VD), newActiveOpt: Option[Boolean]): (Boolean, VD) = {
val (prevActive, vData) = data
- (newActive, vData)
+ (newActiveOpt.getOrElse(false), vData)
}
// Main Loop ---------------------------------------------------------------------
@@ -110,7 +113,7 @@ object GraphLab {
val scattered: RDD[(Vid, Boolean)] =
activeGraph.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse)
- activeGraph = activeGraph.joinVertices(scattered)(applyActive).cache()
+ activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache()
// Calculate the number of active vertices
numActive = activeGraph.vertices.map{
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index 29d14452de..a69bfde532 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -1,9 +1,13 @@
package org.apache.spark.graph
-import org.apache.spark.SparkContext
+import java.util.{Arrays => JArrays}
+import org.apache.spark.graph.impl.EdgePartitionBuilder
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.graph.impl.{EdgePartition, GraphImpl}
+import org.apache.spark.util.collection.PrimitiveVector
-object GraphLoader {
+object GraphLoader extends Logging {
/**
* Load an edge list from file initializing the Graph
@@ -22,8 +26,7 @@ object GraphLoader {
sc: SparkContext,
path: String,
edgeParser: Array[String] => ED,
- minEdgePartitions: Int = 1,
- partitionStrategy: PartitionStrategy = RandomVertexCut):
+ minEdgePartitions: Int = 1):
Graph[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
@@ -40,7 +43,7 @@ object GraphLoader {
Edge(source, target, edata)
})
val defaultVertexAttr = 1
- Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
+ Graph.fromEdges(edges, defaultVertexAttr)
}
/**
@@ -70,31 +73,39 @@ object GraphLoader {
* @tparam ED
* @return
*/
- def edgeListFile[ED: ClassManifest](
+ def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
- minEdgePartitions: Int = 1,
- partitionStrategy: PartitionStrategy = RandomVertexCut):
+ minEdgePartitions: Int = 1):
Graph[Int, Int] = {
- // Parse the edge data table
- val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
- iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
- val lineArray = line.split("\\s+")
- if(lineArray.length < 2) {
- println("Invalid line: " + line)
- assert(false)
- }
- val source = lineArray(0).trim.toLong
- val target = lineArray(1).trim.toLong
- if (canonicalOrientation && target > source) {
- Edge(target, source, 1)
- } else {
- Edge(source, target, 1)
+ val startTime = System.currentTimeMillis
+
+ // Parse the edge data table directly into edge partitions
+ val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[Int]
+ iter.foreach { line =>
+ if (!line.isEmpty && line(0) != '#') {
+ val lineArray = line.split("\\s+")
+ if (lineArray.length < 2) {
+ logWarning("Invalid line: " + line)
+ }
+ val srcId = lineArray(0).toLong
+ val dstId = lineArray(1).toLong
+ if (canonicalOrientation && dstId > srcId) {
+ builder.add(dstId, srcId, 1)
+ } else {
+ builder.add(srcId, dstId, 1)
+ }
}
- })
- val defaultVertexAttr = 1
- Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }.cache()
+ edges.count()
+
+ logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
+
+ GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
} // end of edgeListFile
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
index 963986d20d..b1cd3c47d0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
@@ -16,7 +16,9 @@ object TriangleCount {
* triangle is counted twice.
*
*
- * @param graph a graph with `sourceId` less than `destId`
+ * @param graph a graph with `sourceId` less than `destId`. The graph must have been partitioned
+ * using Graph.partitionBy.
+ *
* @return
*/
def run[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD,ED]): Graph[Int, ED] = {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index e3d5e37c8d..08bef82150 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -1,5 +1,6 @@
package org.apache.spark.graph.impl
+import org.apache.spark.util.collection.PrimitiveVector
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
import org.apache.spark.graph._
@@ -40,6 +41,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement))
}
+ def this(
+ vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED]) = {
+ this(vertices, edges, new VertexPlacement(edges, vertices))
+ }
+
/** Return a RDD that brings edges together with their source and destination vertices. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
val vdManifest = classManifest[VD]
@@ -59,6 +66,28 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+ override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
+ val numPartitions = edges.partitions.size
+ val edManifest = classManifest[ED]
+ val newEdges = new EdgeRDD(edges.map { e =>
+ val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+
+ // Should we be using 3-tuple or an optimized class
+ new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+ }
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]()(edManifest)
+ iter.foreach { message =>
+ val data = message.data
+ builder.add(data._1, data._2, data._3)
+ }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true).cache())
+ new GraphImpl(vertices, newEdges)
+ }
+
override def statistics: Map[String, Any] = {
// Get the total number of vertices after replication, used to compute the replication ratio.
def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
@@ -183,10 +212,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
- // Construct the VertexPlacement map
- val newVertexPlacement = new VertexPlacement(newETable, newVTable)
-
- new GraphImpl(newVTable, newETable, newVertexPlacement)
+ new GraphImpl(newVTable, newETable)
} // end of subgraph
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
@@ -272,6 +298,14 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
new GraphImpl(newVerts, edges, vertexPlacement)
}
}
+
+ private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
+ try {
+ BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
+ } catch {
+ case _: ClassNotFoundException => true // if we don't know, be conservative
+ }
+ }
} // end of class GraphImpl
@@ -279,54 +313,35 @@ object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
- defaultValue: VD,
- partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- val etable = createETable(edges, partitionStrategy).cache()
-
- // Get the set of all vids
- val vids = etable.flatMap { e =>
- Iterator((e.srcId, 0), (e.dstId, 0))
- }
-
- // Shuffle the vids and create the VertexRDD.
- // TODO: Consider doing map side distinct before shuffle.
- val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)](
- vids, new HashPartitioner(edges.partitions.size))
- shuffled.setSerializer(classOf[VidMsgSerializer].getName)
- val vtable = VertexRDD(shuffled.mapValues(x => defaultValue))
+ fromEdgeRDD(createETable(edges), defaultVertexAttr)
+ }
- val vertexPlacement = new VertexPlacement(etable, vtable)
- new GraphImpl(vtable, etable, vertexPlacement)
+ def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr)
}
def apply[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD,
- partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
+ defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- vertices.cache()
- val etable = createETable(edges, partitionStrategy).cache()
+ val etable = createETable(edges).cache()
+
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
-
val vPartitioned = vertices.partitionBy(partitioner)
-
- val vidsFromEdges = {
- etable.partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
- .map(vid => (vid, 0))
- .partitionBy(partitioner)
- }
-
+ val vidsFromEdges = collectVidsFromEdges(etable, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr)
- val vertexPlacement = new VertexPlacement(etable, vtable)
- new GraphImpl(vtable, etable, vertexPlacement)
+ new GraphImpl(vtable, etable)
}
/**
@@ -337,37 +352,41 @@ object GraphImpl {
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
- protected def createETable[ED: ClassManifest](
- edges: RDD[Edge[ED]],
- partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
- // Get the number of partitions
- val numPartitions = edges.partitions.size
-
- val eTable = edges.map { e =>
- val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
-
- // Should we be using 3-tuple or an optimized class
- new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
- }
- .partitionBy(new HashPartitioner(numPartitions))
- .mapPartitionsWithIndex( { (pid, iter) =>
+ private def createETable[ED: ClassManifest](
+ edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
+ val eTable = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
- iter.foreach { message =>
- val data = message.data
- builder.add(data._1, data._2, data._3)
+ iter.foreach { e =>
+ builder.add(e.srcId, e.dstId, e.attr)
}
- val edgePartition = builder.toEdgePartition
- Iterator((pid, edgePartition))
- }, preservesPartitioning = true).cache()
+ Iterator((pid, builder.toEdgePartition))
+ }
new EdgeRDD(eTable)
}
- private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
- try {
- BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
- } catch {
- case _: ClassNotFoundException => true // if we don't know, be conservative
- }
+ private def createETableFromEdgePartitions[ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = {
+ new EdgeRDD(edges)
}
+ private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
+ edges: EdgeRDD[ED],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ edges.cache()
+ // Get the set of all vids
+ val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size))
+ // Create the VertexRDD.
+ val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr))
+ new GraphImpl(vtable, edges)
+ }
+
+ /** Collects all vids mentioned in edges and partitions them by partitioner. */
+ private def collectVidsFromEdges(
+ edges: EdgeRDD[_],
+ partitioner: Partitioner): RDD[(Vid, Int)] = {
+ // TODO: Consider doing map side distinct before shuffle.
+ new ShuffledRDD[Vid, Int, (Vid, Int)](
+ edges.collectVids.map(vid => (vid, 0)), partitioner)
+ .setSerializer(classOf[VidMsgSerializer].getName)
+ }
} // end of object GraphImpl
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
index 68b38de2b8..e4fa4a4421 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
@@ -295,7 +295,7 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat
var i: Int = 0
def readOrThrow(): Int = {
val in = s.read()
- if (in < 0) throw new java.io.EOFException
+ if (in < 0) throw new EOFException
in & 0xFF
}
var b: Int = readOrThrow()
@@ -309,21 +309,45 @@ abstract class ShuffleDeserializationStream(s: InputStream) extends Deserializat
}
def readVarLong(optimizePositive: Boolean): Long = {
- // TODO: unroll the while loop.
- var value: Long = 0L
def readOrThrow(): Int = {
val in = s.read()
- if (in < 0) throw new java.io.EOFException
+ if (in < 0) throw new EOFException
in & 0xFF
}
- var i: Int = 0
- var b: Int = readOrThrow()
- while (i < 56 && (b & 0x80) != 0) {
- value |= (b & 0x7F).toLong << i
- i += 7
+ var b = readOrThrow()
+ var ret: Long = b & 0x7F
+ if ((b & 0x80) != 0) {
b = readOrThrow()
+ ret |= (b & 0x7F) << 7
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F) << 14
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F) << 21
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 28
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 35
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 42
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= (b & 0x7F).toLong << 49
+ if ((b & 0x80) != 0) {
+ b = readOrThrow()
+ ret |= b.toLong << 56
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
- val ret = value | (b.toLong << i)
if (!optimizePositive) (ret >>> 1) ^ -(ret & 1) else ret
}
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
index 2e6b57a8ec..b413b4587e 100644
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
@@ -251,7 +251,7 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
withSpark(new SparkContext("local", "test")) { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
- val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
+ val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = TriangleCount.run(graph)
val verts = triangleCount.vertices
verts.collect.foreach { case (vid, count) => assert(count === 1) }