aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-04-03 00:42:33 +0800
committerReynold Xin <rxin@cs.berkeley.edu>2013-04-03 00:42:33 +0800
commitd63c895945cc41b0bad7e9be96051489d0098c60 (patch)
treefbb5b4597077dde76a81fd51294ef29582cc56d1
parent25c71b185de330074674fbc390dab11de108e7ff (diff)
downloadspark-d63c895945cc41b0bad7e9be96051489d0098c60.tar.gz
spark-d63c895945cc41b0bad7e9be96051489d0098c60.tar.bz2
spark-d63c895945cc41b0bad7e9be96051489d0098c60.zip
Partial checkin of graphlab module.
-rw-r--r--graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala11
-rw-r--r--graph/src/main/scala/spark/graph/Graph.scala67
-rw-r--r--graph/src/main/scala/spark/graph/GraphLab.scala45
-rw-r--r--graph/src/main/scala/spark/graph/package.scala7
4 files changed, 93 insertions, 37 deletions
diff --git a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala
index 3bc73e1946..e787fe4e2c 100644
--- a/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala
+++ b/graph/src/main/scala/spark/graph/EdgeWithVerticesRDD.scala
@@ -20,7 +20,7 @@ private[graph]
class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
@transient vTable: RDD[(Vid, (VD, Array[Pid]))],
eTable: RDD[(Pid, EdgePartition[ED])])
- extends RDD[EdgeWithVertices[VD, ED]](eTable.context, Nil) {
+ extends RDD[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]](eTable.context, Nil) {
@transient
private val shuffleDependency = {
@@ -47,12 +47,14 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
override def getPreferredLocations(s: Partition) =
eTable.preferredLocations(s.asInstanceOf[EdgeWithVerticesPartition].eTablePartition)
- override def compute(s: Partition, context: TaskContext): Iterator[EdgeWithVertices[VD, ED]] = {
+ override def compute(s: Partition, context: TaskContext)
+ : Iterator[VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]] = {
+
val split = s.asInstanceOf[EdgeWithVerticesPartition]
// Fetch the vertices and put them in a hashmap.
// TODO: use primitive hashmaps for primitive VD types.
- val vmap = new it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap[VD]//(1000000)
+ val vmap = new VertexHashMap[VD]//(1000000)
val fetcher = SparkEnv.get.shuffleFetcher
fetcher.fetch[Pid, (Vid, VD)](shuffleId, split.index, context.taskMetrics).foreach {
case (pid, (vid, vdata)) => vmap.put(vid, vdata)
@@ -62,7 +64,7 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
.asInstanceOf[(Pid, EdgePartition[ED])]
// Return an iterator that looks up the hash map to find matching vertices for each edge.
- new Iterator[EdgeWithVertices[VD, ED]] {
+ val iter = new Iterator[EdgeWithVertices[VD, ED]] {
private var pos = 0
private val e = new EdgeWithVertices[VD, ED]
e.src = new Vertex[VD]
@@ -79,5 +81,6 @@ class EdgeWithVerticesRDD[VD: Manifest, ED: Manifest](
e
}
}
+ (vmap, iter)
}
}
diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala
index 1bf36c2e0d..4f1b3b44d5 100644
--- a/graph/src/main/scala/spark/graph/Graph.scala
+++ b/graph/src/main/scala/spark/graph/Graph.scala
@@ -3,38 +3,27 @@ package spark.graph
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
-import com.esotericsoftware.kryo._
-
import it.unimi.dsi.fastutil.ints.IntArrayList
-import spark.{ClosureCleaner, HashPartitioner, KryoRegistrator, SparkContext, RDD}
+import spark.{ClosureCleaner, HashPartitioner, SparkContext, RDD}
import spark.SparkContext._
+import spark.graph.Graph.EdgePartition
import spark.storage.StorageLevel
+case class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] (
+ var id: Vid = 0,
+ var data: VD = nullValue[VD])
-class Vertex[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD] {
- var id: Vid = _
- var data: VD = _
-
- def this(id: Int, data: VD) { this(); this.id = id; this.data = data; }
-}
-
-
-class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] {
- var src: Vid = _
- var dst: Vid = _
- var data: ED = _
- def this(src: Vid, dst: Vid, data: ED) {
- this(); this.src = src; this.dst = dst; this.data = data;
- }
-}
+case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
+ var src: Vid = 0,
+ var dst: Vid = 0,
+ var data: ED = nullValue[ED])
class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD,
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] {
-
var src: Vertex[VD] = _
var dst: Vertex[VD] = _
var data: ED = _
@@ -49,25 +38,38 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl
* A Graph RDD that supports computation on graphs.
*/
class Graph[VD: Manifest, ED: Manifest](
- private val _vertices: RDD[Vertex[VD]],
- private val _edges: RDD[Edge[ED]]) {
-
- import Graph.EdgePartition
+ val rawVertices: RDD[Vertex[VD]],
+ val rawEdges: RDD[Edge[ED]]) {
var numEdgePartitions = 5
var numVertexPartitions = 5
- private val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable(
- _edges, numEdgePartitions)
+ val vertexPartitioner = new HashPartitioner(numVertexPartitions)
+
+ val edgePartitioner = new HashPartitioner(numEdgePartitions)
+
+ lazy val eTable: RDD[(Pid, EdgePartition[ED])] = Graph.createETable(
+ rawEdges, numEdgePartitions)
- private val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable(
- _vertices, eTable, numVertexPartitions)
+ lazy val vTable: RDD[(Vid, (VD, Array[Pid]))] = Graph.createVTable(
+ rawVertices, eTable, numVertexPartitions)
- def vertices: RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
+ def vertices(): RDD[Vertex[VD]] = vTable.map { case(vid, (data, pids)) => new Vertex(vid, data) }
- def edges: RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator }
+ def edges(): RDD[Edge[ED]] = eTable.mapPartitions { iter => iter.next._2.iterator }
- def edgesWithVertices: RDD[EdgeWithVertices[VD, ED]] = new EdgeWithVerticesRDD(vTable, eTable)
+ def edgesWithVertices(): RDD[EdgeWithVertices[VD, ED]] = {
+ (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions { case(vmap, iter) => iter }
+ }
+
+ def mapPartitions[U: ClassManifest](
+ f: (VertexHashMap, Iterator[EdgeWithVertices[VD, ED]]) => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] = {
+ (new EdgeWithVerticesRDD(vTable, eTable)).mapPartitions({ part =>
+ val (vmap, iter) = part.next()
+ iter.mapPartitions(f)
+ }, preservesPartitioning)
+ }
}
@@ -75,8 +77,7 @@ class Graph[VD: Manifest, ED: Manifest](
object Graph {
/**
- * A partition of edges. This is created so we can store edge data in columnar format so it is
- * more efficient to store the data in memory.
+ * A partition of edges in 3 large columnar arrays.
*/
private[graph]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: Manifest] {
diff --git a/graph/src/main/scala/spark/graph/GraphLab.scala b/graph/src/main/scala/spark/graph/GraphLab.scala
new file mode 100644
index 0000000000..ebf001937b
--- /dev/null
+++ b/graph/src/main/scala/spark/graph/GraphLab.scala
@@ -0,0 +1,45 @@
+package spark.graph
+
+
+class GraphLab {
+
+ def iterateGAS[A: Manifest, VD: Manifest, ED: Manifest](
+ graph: Graph[VD, ED],
+ gather: (Vid, EdgeWithVertices[VD, ED]) => A,
+ merge: (A, A) => A,
+ default: A,
+ apply: (Vertex[VD], A) => VD,
+ numIter: Int,
+ gatherEdges: EdgeDirection = EdgeDirection.In) = {
+
+ val g = new Graph[(VD, A), ED](graph.rawVertices.map(v => (v, default)), graph.rawEdges)
+
+ var i = 0
+ while (i < numIter) {
+
+ val gatherTable = g.mapPartitions { case(vmap, iter) =>
+ val edgeSansAcc = new EdgeWithVertices[VD, ED]()
+ iter.map { edge: EdgeWithVertices[(VD, A), ED] =>
+ edgeSansAcc.data = edge.data
+ edgeSansAcc.src.data = edge.src.data._1
+ edgeSansAcc.dst.data = edge.dst.data._1
+ edgeSansAcc.src.id = edge.src.id
+ edgeSansAcc.dst.id = edge.dst.id
+ if (gatherEdges == EdgeDirection.In || gatherEdges == EdgeDirection.Both) {
+ edge.dst.data._2 = merge(edge.dst.data._2, gather(edgeSansAcc.dst.id, edgeSansAcc))
+ }
+ if (gatherEdges == EdgeDirection.Out || gatherEdges == EdgeDirection.Both) {
+ edge.src.data._2 = merge(edge.src.data._2, gather(edgeSansAcc.src.id, edgeSansAcc))
+ }
+ }
+
+ vmap.int2ObjectEntrySet().fastIterator().map{ case (vid, (vdata, acc)) => (vid, acc) }
+ }.reduceByKey(graph.vertexPartitioner, false)
+
+ gatherTable
+
+ i += 1
+ }
+ }
+
+}
diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/spark/graph/package.scala
index 90ccba431c..e900a27b27 100644
--- a/graph/src/main/scala/spark/graph/package.scala
+++ b/graph/src/main/scala/spark/graph/package.scala
@@ -5,4 +5,11 @@ package object graph {
type Vid = Int
type Pid = Int
type Status = Boolean
+ type VertexHashMap = it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap
+
+ /**
+ * Return the default null-like value for a data type T.
+ */
+ def nullValue[T] = null.asInstanceOf[T]
+
}