aboutsummaryrefslogtreecommitdiff
path: root/graph
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-03 08:47:49 -0700
committerJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2013-04-03 08:47:49 -0700
commitc649073b5f8211070e6b043a9a03d18b7485caa4 (patch)
tree6b4a8cace15f0a2f9ca44b0be531747689e1a0c7 /graph
parent0123c9d6a123f3e2c3e416ce402e7cf4c9900042 (diff)
parentfe42ad41bb6c424a97e3ce8bc185b2c7a7b24e86 (diff)
downloadspark-c649073b5f8211070e6b043a9a03d18b7485caa4.tar.gz
spark-c649073b5f8211070e6b043a9a03d18b7485caa4.tar.bz2
spark-c649073b5f8211070e6b043a9a03d18b7485caa4.zip
merged with trunk
Diffstat (limited to 'graph')
-rw-r--r--graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala8
-rw-r--r--graph/src/main/scala/spark/graph/Graph.scala150
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala36
-rw-r--r--graph/src/main/scala/spark/graph/package.scala4
4 files changed, 157 insertions, 41 deletions
diff --git a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala
index e787fe4e2c..f200f7cbf6 100644
--- a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala
+++ b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala
@@ -17,10 +17,10 @@ class EdgeWithVerticesPartition(idx: Int, val eTablePartition: Partition) extend
* A RDD that brings together edge data with its associated vertex data.
*/
private[graph]
-class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
+class EdgeWithVerticesRDD[VD: ClassManifest, ED: ClassManifest](
@transient vTable: RDD[(Vid, (VD, Array[Pid]))],
eTable: RDD[(Pid, EdgePartition[ED])])
- extends RDD[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]](eTable.context, Nil) {
+ extends RDD[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])](eTable.context, Nil) {
@transient
private val shuffleDependency = {
@@ -48,7 +48,7 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition)
override def compute(s: Partition, context: TaskContext)
- : Iterator[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]] = {
+ : Iterator[(VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]])] = {
val split = s.asInstanceOf[EdgeWithVerticesPartition]
@@ -81,6 +81,6 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
e
}
}
- (vmap, iter)
+ Iterator((vmap, iter))
}
}
diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala
index 6e662ab1fb..5a1e06acdd 100644
--- a/graph/src/main/scala/spark/graph/Graph.scala
+++ b/graph/src/main/scala/spark/graph/Graph.scala
@@ -5,7 +5,8 @@ import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntArrayList
-import spark.{ClosureCleaner, HashPartitioner, SparkContext, RDD}
+import spark.{ClosureCleaner, HashPartitioner, RDD}
+import spark.SparkContext
import spark.SparkContext._
import spark.graph.Graph.EdgePartition
import spark.storage.StorageLevel
@@ -13,7 +14,10 @@ import spark.storage.StorageLevel
case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] (
var id: Vid = 0,
- var data: VD = nullValue[VD])
+ var data: VD = nullValue[VD]) {
+
+ def this(tuple: Tuple2[Vid, VD]) = this(tuple._1, tuple._2)
+}
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
@@ -37,37 +41,102 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl
/**
* A Graph RDD that supports computation on graphs.
*/
-class Graph[VD: Manifest, ED: Manifest](
- val rawVertices: RDD[Vertex[VD]],
- val rawEdges: RDD[Edge[ED]]) {
+class Graph[VD: ClassManifest, ED: ClassManifest] protected (
+ _rawVertices: RDD[Vertex[VD]],
+ _rawEdges: RDD[Edge[ED]],
+ _rawVTable: RDD[(Vid, (VD, Array[Pid]))],
+ _rawETable: RDD[(Pid, EdgePartition[ED])]) {
+
+ def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = this(vertices, edges, null, null)
+
+ protected var _cached = false
+
+ def cache(): Graph[VD, ED] = {
+ eTable.cache()
+ vTable.cache()
+ _cached = true
+ this
+ }
var numEdgePartitions = 5
var numVertexPartitions = 5
- val vertexPartitioner = new HashPartitioner(numVertexPartitions)
+ protected val vertexPartitioner = new HashPartitioner(numVertexPartitions)
+
+ protected val edgePartitioner = new HashPartitioner(numEdgePartitions)
+
+ protected lazy val eTable: RDD[(Pid, EdgePartition[ED])] = {
+ if (_rawETable == null) {
+ Graph.createETable(_rawEdges, numEdgePartitions)
+ } else {
+ _rawETable
+ }
+ }
+
+ protected lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = {
+ if (_rawVTable == null) {
+ Graph.createVTable(_rawVertices, eTable, numVertexPartitions)
+ } else {
+ _rawVTable
+ }
+ }
+
+ def vertices: RDD[Vertex[VD]] = {
+ if (!_cached && _rawVertices != null) {
+ _rawVertices
+ } else {
+ vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
+ }
+ }
+
+ def edges: RDD[Edge[ED]] = {
+ if (!_cached && _rawEdges != null) {
+ _rawEdges
+ } else {
+ eTable.mapPartitions { iter => iter.next._2.iterator }
+ }
+ }
+
+ def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = {
+ (new EdgeWithVerticesRDD[VD, ED](vTable, eTable)).mapPartitions { part => part.next._2 }
+ }
+
+ def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]) = {
+ ClosureCleaner.clean(f)
+ new Graph(vertices.map(f), edges)
+ }
- val edgePartitioner = new HashPartitioner(numEdgePartitions)
+ def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]) = {
+ ClosureCleaner.clean(f)
+ new Graph(vertices, edges.map(f))
+ }
- lazy val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable(
- rawEdges, numEdgePartitions)
+ def updateVertices[U: ClassManifest](
+ updates: RDD[(Vid, U)],
+ updateFunc: (Vertex[VD], Seq[U]) => VD)
+ : Graph[VD, ED] = {
- lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable(
- rawVertices, eTable, numVertexPartitions)
+ ClosureCleaner.clean(updateFunc)
- def vertices(): RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
+ val joined: RDD[(Vid, ((VD, Array[Pid]), Option[Seq[U]]))] =
+ vTable.leftOuterJoin(updates.groupByKey(vertexPartitioner))
- def edges(): RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator }
+ val newVTable = (joined.mapPartitions({ iter =>
+ iter.map { case (vid, ((vdata, pids), updates)) =>
+ val newVdata = if (updates.isDefined) updateFunc(Vertex(vid, vdata), updates.get) else vdata
+ (vid, (newVdata, pids))
+ }
+ }, preservesPartitioning = true)).cache()
- def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] = {
- (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions { case(vmap, iter) => iter }
+ new Graph(null, null, newVTable, eTable)
}
def mapPartitions[U: ClassManifest](
- f: (VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
+ f: (VertexHashMap[VD], Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
(new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part =>
val (vmap, iter) = part.next()
- iter.mapPartitions(f)
+ f(vmap, iter)
}, preservesPartitioning)
}
@@ -77,10 +146,47 @@ class Graph[VD: Manifest, ED: Manifest](
object Graph {
/**
+ * Load an edge list from file initializing the Graph RDD
+ */
+ def textFile[ED: ClassManifest](sc: SparkContext,
+ fname: String, edgeParser: Array[String] => ED) = {
+
+ // Parse the edge data table
+ val edges = sc.textFile(fname).map { line =>
+ val lineArray = line.split("\\s+")
+ if(lineArray.length < 2) {
+ println("Invalid line: " + line)
+ assert(false)
+ }
+ val source = lineArray(0)
+ val target = lineArray(1)
+ val tail = lineArray.drop(2)
+ val edata = edgeParser(tail)
+ Edge(source.trim.toInt, target.trim.toInt, edata)
+ }.cache()
+
+ // Parse the vertex data table
+ val vertices = edges.flatMap { edge => List((edge.src, 1), (edge.dst, 1)) }
+ .reduceByKey(_ + _)
+ .map(new Vertex(_))
+
+ val graph = new Graph[Int, ED](vertices, edges)
+ graph.cache()
+
+ println("Loaded graph:" +
+ "\n\t#edges: " + graph.edges.count +
+ "\n\t#vertices: " + graph.vertices.count)
+
+ graph
+ }
+
+
+ /**
* A partition of edges in 3 large columnar arrays.
*/
private[graph]
- class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: Manifest] {
+ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED:ClassManifest]
+ {
val srcIds: IntArrayList = new IntArrayList
val dstIds: IntArrayList = new IntArrayList
// TODO: Specialize data.
@@ -104,7 +210,7 @@ object Graph {
private var edge = new Edge[ED]
private var pos = 0
- override def hasNext: Boolean = pos < size
+ override def hasNext: Boolean = pos < EdgePartition.this.size
override def next(): Edge[ED] = {
edge.src = srcIds.get(pos)
@@ -117,7 +223,7 @@ object Graph {
}
private[graph]
- def createVTable[VD: Manifest, ED: Manifest](
+ def createVTable[VD: ClassManifest, ED: ClassManifest](
vertices: RDD[Vertex[VD]],
eTable: RDD[(Pid, EdgePartition[ED])],
numPartitions: Int) = {
@@ -145,7 +251,6 @@ object Graph {
case (vdata, None) => (vdata, Array.empty[Pid])
case (vdata, Some(pids)) => (vdata, pids.toArray)
}
- .cache()
}
/**
@@ -157,7 +262,7 @@ object Graph {
* containing all the edges in a partition.
*/
private[graph]
- def createETable[ED: Manifest](edges: RDD[Edge[ED]], numPartitions: Int)
+ def createETable[ED: ClassManifest](edges: RDD[Edge[ED]], numPartitions: Int)
: RDD[(Pid, EdgePartition[ED])] = {
edges.map { e =>
@@ -170,7 +275,6 @@ object Graph {
iter.foreach { case (_, (src, dst, data)) => edgePartition.add(src, dst, data) }
Iterator((pid, edgePartition))
}, preservesPartitioning = true)
- .cache()
}
/**
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
index d90fdc3369..ce5b3f6e19 100644
--- a/graph/src/main/scala/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -1,45 +1,57 @@
package spark.graph
+import scala.collection.JavaConversions._
+import spark.RDD
+
object GraphLab {
- def iterateGAS[A: Manifest, VD: Manifest, ED: Manifest](
+ def iterateGAS[A: ClassManifest, VD: ClassManifest, ED: ClassManifest](
graph: Graph[VD, ED],
gather: (Vid, EdgeWithVertices[VD, ED]) => A,
merge: (A, A) => A,
default: A,
apply: (Vertex[VD], A) => VD,
numIter: Int,
- gatherEdges: EdgeDirection = EdgeDirection.In) = {
+ gatherEdges: EdgeDirection.EdgeDirection = EdgeDirection.In) = {
- val g = new Graph[(VD, A), ED](graph.rawVertices.map(v => (v, default)), graph.rawEdges)
+ var g = graph.mapVertices(v => Vertex(v.id, VDataWithAcc(v.data, default))).cache()
var i = 0
while (i < numIter) {
- val gatherTable = g.mapPartitions { case(vmap, iter) =>
+ val accUpdates: RDD[(Vid, A)] = g.mapPartitions({ case(vmap, iter) =>
val edgeSansAcc = new EdgeWithVertices[VD, ED]()
- iter.map { edge: EdgeWithVertices[(VD, A), ED] =>
+ iter.map { edge: EdgeWithVertices[VDataWithAcc[VD, A], ED] =>
edgeSansAcc.data = edge.data
- edgeSansAcc.src.data = edge.src.data._1
- edgeSansAcc.dst.data = edge.dst.data._1
+ edgeSansAcc.src.data = edge.src.data.vdata
+ edgeSansAcc.dst.data = edge.dst.data.vdata
edgeSansAcc.src.id = edge.src.id
edgeSansAcc.dst.id = edge.dst.id
if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) {
- edge.dst.data._2 = merge(edge.dst.data._2, gather(edgeSansAcc.dst.id, edgeSansAcc))
+ edge.dst.data.acc = merge(edge.dst.data.acc, gather(edgeSansAcc.dst.id, edgeSansAcc))
}
if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) {
- edge.src.data._2 = merge(edge.src.data._2, gather(edgeSansAcc.src.id, edgeSansAcc))
+ edge.src.data.acc = merge(edge.src.data.acc, gather(edgeSansAcc.src.id, edgeSansAcc))
}
}
- vmap.int2ObjectEntrySet().fastIterator().map{ case (vid, (vdata, acc)) => (vid, acc) }
- }.reduceByKey(graph.vertexPartitioner, false)
+ vmap.int2ObjectEntrySet().fastIterator().map{ entry =>
+ (entry.getIntKey(), entry.getValue().acc)
+ }
+ })(classManifest[(Int, A)])
- gatherTable
+ def applyFunc(v: Vertex[VDataWithAcc[VD, A]], updates: Seq[A]): VDataWithAcc[VD, A] = {
+ VDataWithAcc(apply(Vertex(v.id, v.data.vdata), updates.reduce(merge)), default)
+ }
+ g = g.updateVertices(accUpdates, applyFunc).cache()
i += 1
}
}
}
+
+
+private[graph]
+sealed case class VDataWithAcc[VD: ClassManifest, A](var vdata: VD, var acc: A)
diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/spark/graph/package.scala
index e900a27b27..6db050e6e1 100644
--- a/graph/src/main/scala/spark/graph/package.scala
+++ b/graph/src/main/scala/spark/graph/package.scala
@@ -4,8 +4,8 @@ package object graph {
type Vid = Int
type Pid = Int
- type Status = Boolean
- type VertexHashMap = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap
+
+ type VertexHashMap[T] = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap[T]
/**
* Return the default null-like value for a data type T.