diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-04-03 00:42:33 +0800 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-04-03 00:42:33 +0800 |
commit | d63c895945cc41b0bad7e9be96051489d0098c60 (patch) | |
tree | fbb5b4597077dde76a81fd51294ef29582cc56d1 | |
parent | 25c71b185de330074674fbc390dab11de108e7ff (diff) | |
download | spark-d63c895945cc41b0bad7e9be96051489d0098c60.tar.gz spark-d63c895945cc41b0bad7e9be96051489d0098c60.tar.bz2 spark-d63c895945cc41b0bad7e9be96051489d0098c60.zip |
Partial checkin of graphlab module.
-rw-r--r-- | graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala | 11 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/Graph.scala | 67 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/GraphLab.scala | 45 | ||||
-rw-r--r-- | graph/src/main/scala/spark/graph/package.scala | 7 |
4 files changed, 93 insertions, 37 deletions
diff --git a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala index 3bc73e1946..e787fe4e2c 100644 --- a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala +++ b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala @@ -20,7 +20,7 @@ private[graph] class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( @transient vTable: RDD[(Vid, (VD, Array[Pid]))], eTable: RDD[(Pid, EdgePartition[ED])]) - extends RDD[EdgeWithVertices[VD, ED]](eTable.context, Nil) { + extends RDD[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]](eTable.context, Nil) { @transient private val shuffleDependency = { @@ -47,12 +47,14 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( override def getPreferredLocations(s: Partition) = eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition) - override def compute(s: Partition, context: TaskContext): Iterator[EdgeWithVertices[VD, ED]] = { + override def compute(s: Partition, context: TaskContext) + : Iterator[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]] = { + val split = s.asInstanceOf[EdgeWithVerticesPartition] // Fetch the vertices and put them in a hashmap. // TODO: use primitive hashmaps for primitive VD types. - val vmap = new it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap[VD]//(1000000) + val vmap = new VertexHashMap[VD]//(1000000) val fetcher = SparkEnv.get.shuffleFetcher fetcher.fetch[Pid, (Vid, VD)](shuffleId, split.index, context.taskMetrics).foreach { case (pid, (vid, vdata)) => vmap.put(vid, vdata) @@ -62,7 +64,7 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( .asInstanceOf[(Pid, EdgePartition[ED])] // Return an iterator that looks up the hash map to find matching vertices for each edge. - new Iterator[EdgeWithVertices[VD, ED]] { + val iter = new Iterator[EdgeWithVertices[VD, ED]] { private var pos = 0 private val e = new EdgeWithVertices[VD, ED] e.src = new Vertex[VD] @@ -79,5 +81,6 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest]( e } } + (vmap, iter) } } diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala index 1bf36c2e0d..4f1b3b44d5 100644 --- a/graph/src/main/scala/spark/graph/Graph.scala +++ b/graph/src/main/scala/spark/graph/Graph.scala @@ -3,38 +3,27 @@ package spark.graph import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer -import com.esotericsoftware.kryo._ - import it.unimi.dsi.fastutil.ints.IntArrayList -import spark.{ClosureCleaner, HashPartitioner, KryoRegistrator, SparkContext, RDD} +import spark.{ClosureCleaner, HashPartitioner, SparkContext, RDD} import spark.SparkContext._ +import spark.graph.Graph.EdgePartition import spark.storage.StorageLevel +case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] ( + var id: Vid = 0, + var data: VD = nullValue[VD]) -class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] { - var id: Vid = _ - var data: VD = _ - - def this(id: Int, data: VD) { this(); this.id = id; this.data = data; } -} - - -class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] { - var src: Vid = _ - var dst: Vid = _ - var data: ED = _ - def this(src: Vid, dst: Vid, data: ED) { - this(); this.src = src; this.dst = dst; this.data = data; - } -} +case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( + var src: Vid = 0, + var dst: Vid = 0, + var data: ED = nullValue[ED]) class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD, @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] { - var src: Vertex[VD] = _ var dst: Vertex[VD] = _ var data: ED = _ @@ -49,25 +38,38 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl * A Graph RDD that supports computation on graphs. */ class Graph[VD: Manifest, ED: Manifest]( - private val _vertices: RDD[Vertex[VD]], - private val _edges: RDD[Edge[ED]]) { - - import Graph.EdgePartition + val rawVertices: RDD[Vertex[VD]], + val rawEdges: RDD[Edge[ED]]) { var numEdgePartitions = 5 var numVertexPartitions = 5 - private val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable( - _edges, numEdgePartitions) + val vertexPartitioner = new HashPartitioner(numVertexPartitions) + + val edgePartitioner = new HashPartitioner(numEdgePartitions) + + lazy val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable( + rawEdges, numEdgePartitions) - private val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable( - _vertices, eTable, numVertexPartitions) + lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable( + rawVertices, eTable, numVertexPartitions) - def vertices: RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } + def vertices(): RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) } - def edges: RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator } + def edges(): RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator } - def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = new EdgeWithVerticesRDD(vTable, eTable) + def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] = { + (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions { case(vmap, iter) => iter } + } + + def mapPartitions[U: ClassManifest]( + f: (VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = { + (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part => + val (vmap, iter) = part.next() + iter.mapPartitions(f) + }, preservesPartitioning) + } } @@ -75,8 +77,7 @@ class Graph[VD: Manifest, ED: Manifest]( object Graph { /** - * A partition of edges. This is created so we can store edge data in columnar format so it is - * more efficient to store the data in memory. + * A partition of edges in 3 large columnar arrays. */ private[graph] class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: Manifest] { diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala new file mode 100644 index 0000000000..ebf001937b --- /dev/null +++ b/graph/src/main/scala/spark/graph/GraphLab.scala @@ -0,0 +1,45 @@ +package spark.graph + + +class GraphLab { + + def iterateGAS[A: Manifest, VD: Manifest, ED: Manifest]( + graph: Graph[VD, ED], + gather: (Vid, EdgeWithVertices[VD, ED]) => A, + merge: (A, A) => A, + default: A, + apply: (Vertex[VD], A) => VD, + numIter: Int, + gatherEdges: EdgeDirection = EdgeDirection.In) = { + + val g = new Graph[(VD, A), ED](graph.rawVertices.map(v => (v, default)), graph.rawEdges) + + var i = 0 + while (i < numIter) { + + val gatherTable = g.mapPartitions { case(vmap, iter) => + val edgeSansAcc = new EdgeWithVertices[VD, ED]() + iter.map { edge: EdgeWithVertices[(VD, A), ED] => + edgeSansAcc.data = edge.data + edgeSansAcc.src.data = edge.src.data._1 + edgeSansAcc.dst.data = edge.dst.data._1 + edgeSansAcc.src.id = edge.src.id + edgeSansAcc.dst.id = edge.dst.id + if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) { + edge.dst.data._2 = merge(edge.dst.data._2, gather(edgeSansAcc.dst.id, edgeSansAcc)) + } + if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) { + edge.src.data._2 = merge(edge.src.data._2, gather(edgeSansAcc.src.id, edgeSansAcc)) + } + } + + vmap.int2ObjectEntrySet().fastIterator().map{ case (vid, (vdata, acc)) => (vid, acc) } + }.reduceByKey(graph.vertexPartitioner, false) + + gatherTable + + i += 1 + } + } + +} diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/spark/graph/package.scala index 90ccba431c..e900a27b27 100644 --- a/graph/src/main/scala/spark/graph/package.scala +++ b/graph/src/main/scala/spark/graph/package.scala @@ -5,4 +5,11 @@ package object graph { type Vid = Int type Pid = Int type Status = Boolean + type VertexHashMap = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap + + /** + * Return the default null-like value for a data type T. + */ + def nullValue[T] = null.asInstanceOf[T] + } |