aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-16 23:52:34 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-18 13:01:13 -0800
commit0f137e8b75497e61f8d9fec98896cd912f27c3ed (patch)
tree14b693b6a6604b865652923c95c17779713314da /graph/src
parent9193a8f7887b919cc62dc308fbdd2d3d92d8a746 (diff)
downloadspark-0f137e8b75497e61f8d9fec98896cd912f27c3ed.tar.gz
spark-0f137e8b75497e61f8d9fec98896cd912f27c3ed.tar.bz2
spark-0f137e8b75497e61f8d9fec98896cd912f27c3ed.zip
Reimplement Graph.mask using innerJoin
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala23
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala34
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala39
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala23
5 files changed, 83 insertions, 38 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index ee368ebb41..63858db2ef 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -44,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassManifest](
override def cache(): EdgeRDD[ED] = persist()
def mapEdgePartitions[ED2: ClassManifest](f: EdgePartition[ED] => EdgePartition[ED2])
- : EdgeRDD[ED2]= {
+ : EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(ep)))
@@ -60,6 +60,27 @@ class EdgeRDD[@specialized ED: ClassManifest](
}
}
+ def zipEdgePartitions[ED2: ClassManifest, ED3: ClassManifest]
+ (other: EdgeRDD[ED2])
+ (f: (EdgePartition[ED], EdgePartition[ED2]) => EdgePartition[ED3]): EdgeRDD[ED3] = {
+ new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, preservesPartitioning = true) {
+ (thisIter, otherIter) =>
+ val (pid, thisEPart) = thisIter.next()
+ val (_, otherEPart) = otherIter.next()
+ Iterator(Tuple2(pid, f(thisEPart, otherEPart)))
+ })
+ }
+
+ def innerJoin[ED2: ClassManifest, ED3: ClassManifest]
+ (other: EdgeRDD[ED2])
+ (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = {
+ val ed2Manifest = classManifest[ED2]
+ val ed3Manifest = classManifest[ED3]
+ zipEdgePartitions(other) { (thisEPart, otherEPart) =>
+ thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest)
+ }
+ }
+
def collectVids(): RDD[Vid] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index e544650963..e8fa8e611c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -48,7 +48,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* along with their vertex data.
*
*/
- val edges: RDD[Edge[ED]]
+ val edges: EdgeRDD[ED]
/**
* Get the edges with the vertex data associated with the adjacent
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
index bfdafcc542..e97522feae 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -99,6 +99,40 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
}
/**
+ * Apply `f` to all edges present in both `this` and `other` and return a new EdgePartition
+ * containing the resulting edges.
+ *
+ * If there are multiple edges with the same src and dst in `this`, `f` will be invoked once for
+ * each edge, but each time it may be invoked on any corresponding edge in `other`.
+ *
+ * If there are multiple edges with the same src and dst in `other`, `f` will only be invoked
+ * once.
+ */
+ def innerJoin[ED2: ClassManifest, ED3: ClassManifest]
+ (other: EdgePartition[ED2])
+ (f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = {
+ val builder = new EdgePartitionBuilder[ED3]
+ var i = 0
+ var j = 0
+ // For i = index of each edge in `this`...
+ while (i < size && j < other.size) {
+ val srcId = this.srcIds(i)
+ val dstId = this.dstIds(i)
+ // ... forward j to the index of the corresponding edge in `other`, and...
+ while (j < other.size && other.srcIds(j) < srcId) { j += 1 }
+ if (j < other.size && other.srcIds(j) == srcId) {
+ while (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) < dstId) { j += 1 }
+ if (j < other.size && other.srcIds(j) == srcId && other.dstIds(j) == dstId) {
+ // ... run `f` on the matching edge
+ builder.add(srcId, dstId, f(srcId, dstId, this.data(i), other.data(j)))
+ }
+ }
+ i += 1
+ }
+ builder.toEdgePartition
+ }
+
+ /**
* The number of edges in this partition
*
* @return size of the partition
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 2fe02718e9..e7f975253a 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -216,7 +216,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
} // end of subgraph
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
- other: Graph[VD2, ED2]) : Graph[VD, ED] = GraphImpl.mask(this, other)
+ other: Graph[VD2, ED2]): Graph[VD, ED] = {
+ val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
+ val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
+ new GraphImpl(newVerts, newEdges)
+
+ }
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
ClosureCleaner.clean(merge)
@@ -379,38 +384,6 @@ object GraphImpl {
new EdgeRDD(edges)
}
- def mask[VD: ClassManifest, ED: ClassManifest, VD2: ClassManifest, ED2: ClassManifest] (
- thisGraph: Graph[VD, ED], otherGraph: Graph[VD2, ED2]) : Graph[VD, ED] = {
- // basically vertices.join(other.vertices)
- // written this way to take advantage of fast join in VertexSetRDDs
- val newVTable = VertexSetRDD(
- thisGraph.vertices.leftJoin(otherGraph.vertices)((vid, v, w) => if (w.isEmpty) None else Some(v))
- .filter{case (vid, opt) => !opt.isEmpty}
- .map{case (vid, opt) => (vid, opt.get)}
- )
-
- // TODO(amatsukawa): safer way to downcast? case matching perhaps?
- val thisImpl = thisGraph.asInstanceOf[GraphImpl[VD, ED]]
- val otherImpl = otherGraph.asInstanceOf[GraphImpl[VD2, ED2]]
- val newETable = thisImpl.eTable.zipPartitions(otherImpl.eTable) {
- // extract two edge partitions, keep all edges in in this partition that is
- // also in the other partition
- (thisIter, otherIter) =>
- val (_, otherEPart) = otherIter.next()
- val otherEdges = otherEPart.iterator.map(e => (e.srcId, e.dstId)).toSet
- val (pid, thisEPart) = thisIter.next()
- val newEPartBuilder = new EdgePartitionBuilder[ED]
- thisEPart.foreach { e =>
- if (otherEdges.contains((e.srcId, e.dstId)))
- newEPartBuilder.add(e.srcId, e.dstId, e.attr)
- }
- Iterator((pid, newEPartBuilder.toEdgePartition))
- }.partitionBy(thisImpl.eTable.partitioner.get)
-
- val newVertexPlacement = new VertexPlacement(newETable, newVTable)
- new GraphImpl(newVTable, newETable, newVertexPlacement, thisImpl.partitioner)
- }
-
private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
edges: EdgeRDD[ED],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index fae6eb5525..f6bb201a83 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -5,7 +5,9 @@ import scala.util.Random
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
+import org.apache.spark.graph.Graph._
import org.apache.spark.graph.LocalSparkContext._
+import org.apache.spark.graph.impl.EdgePartition
import org.apache.spark.graph.impl.EdgePartitionBuilder
import org.apache.spark.rdd._
@@ -183,7 +185,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
- test("projectGraph") {
+ test("mask") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 5
val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
@@ -207,7 +209,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
- test ("filterGraph") {
+ test ("filter") {
withSpark(new SparkContext("local", "test")) { sc =>
val n = 5
val vertices = sc.parallelize((0 to n).map(x => (x:Vid, x)))
@@ -215,7 +217,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
val graph: Graph[Int, Int] = Graph(vertices, edges)
val filteredGraph = graph.filter(
graph => {
- val degrees: VertexSetRDD[Int] = graph.outDegrees
+ val degrees: VertexRDD[Int] = graph.outDegrees
graph.outerJoinVertices(degrees) {(vid, data, deg) => deg.getOrElse(0)}
},
vpred = (vid: Vid, deg:Int) => deg > 0
@@ -278,4 +280,19 @@ class GraphSuite extends FunSuite with LocalSparkContext {
assert(edgePartition.indexIterator(_ == 0).map(_.copy()).toList === edgesFrom0)
assert(edgePartition.indexIterator(_ == 1).map(_.copy()).toList === edgesFrom1)
}
+
+ test("EdgePartition.innerJoin") {
+ def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
+ val builder = new EdgePartitionBuilder[A]
+ for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }
+ builder.toEdgePartition
+ }
+ val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
+ val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0))
+ val a = makeEdgePartition(aList)
+ val b = makeEdgePartition(bList)
+
+ assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList ===
+ List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0)))
+ }
}