aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-20 15:37:33 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-20 15:37:33 -0800
commit32f957f3317bd62768b415da0c0cd9114f59782c (patch)
tree2924f25ff4854caa19247d05dcf9ea535327a5f8 /graph/src
parent6d1bf0d78d150925c0dbd9adfe6294c89846f9b7 (diff)
downloadspark-32f957f3317bd62768b415da0c0cd9114f59782c.tar.gz
spark-32f957f3317bd62768b415da0c0cd9114f59782c.tar.bz2
spark-32f957f3317bd62768b415da0c0cd9114f59782c.zip
Create VertexPlacement on demand in VTableReplicated
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala59
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala65
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala64
3 files changed, 79 insertions, 109 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 9e44f49113..ebeebd4c65 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -30,21 +30,13 @@ import org.apache.spark.util.ClosureCleaner
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vertices: VertexRDD[VD],
@transient val edges: EdgeRDD[ED],
- @transient val vertexPlacement: VertexPlacement,
@transient val vTableReplicated: VTableReplicated[VD])
extends Graph[VD, ED] {
def this(
vertices: VertexRDD[VD],
- edges: EdgeRDD[ED],
- vertexPlacement: VertexPlacement) = {
- this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement))
- }
-
- def this(
- vertices: VertexRDD[VD],
edges: EdgeRDD[ED]) = {
- this(vertices, edges, new VertexPlacement(edges, vertices))
+ this(vertices, edges, new VTableReplicated(vertices, edges))
}
/** Return a RDD that brings edges together with their source and destination vertices. */
@@ -89,16 +81,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
override def statistics: Map[String, Any] = {
- // Get the total number of vertices after replication, used to compute the replication ratio.
- def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
- vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
- }
-
val numVertices = this.ops.numVertices
val numEdges = this.ops.numEdges
- val replicationRatioBoth = numReplicatedVertices(vertexPlacement.bothAttrs) / numVertices
- val replicationRatioSrcOnly = numReplicatedVertices(vertexPlacement.srcAttrOnly) / numVertices
- val replicationRatioDstOnly = numReplicatedVertices(vertexPlacement.dstAttrOnly) / numVertices
// One entry for each partition, indicate the total number of edges on that partition.
val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
val minLoad = loadArray.min
@@ -106,9 +90,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Map(
"Num Vertices" -> numVertices,
"Num Edges" -> numEdges,
- "Replication (both)" -> replicationRatioBoth,
- "Replication (src only)" -> replicationRatioSrcOnly,
- "Replication (dest only)" -> replicationRatioDstOnly,
"Load Array" -> loadArray,
"Min Load" -> minLoad,
"Max Load" -> maxLoad)
@@ -145,16 +126,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
println("\n\nvTable ------------------------------------------")
traverseLineage(vertices, " ", visited)
visited += (vertices.id -> "vTable")
- println("\n\nvertexPlacement.bothAttrs -------------------------------")
- traverseLineage(vertexPlacement.bothAttrs, " ", visited)
- visited += (vertexPlacement.bothAttrs.id -> "vertexPlacement.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
} // end of printLineage
override def reverse: Graph[VD, ED] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement, vTableReplicated)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vTableReplicated)
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
if (classManifest[VD] equals classManifest[VD2]) {
@@ -162,17 +140,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val newVerts = vertices.mapVertexPartitions(_.map(f))
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newVTableReplicated = new VTableReplicated[VD2](
- changedVerts, edges, vertexPlacement,
- Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
- new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
+ new GraphImpl(newVerts, edges, newVTableReplicated)
} else {
// The map does not preserve type, so we must re-replicate all vertices
- new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement)
+ new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges)
}
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement, vTableReplicated)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vTableReplicated)
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
@@ -190,7 +167,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
Iterator((pid, newEdgePartition))
}
- new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement, vTableReplicated)
+ new GraphImpl(vertices, new EdgeRDD(newETable), vTableReplicated)
}
override def subgraph(
@@ -210,23 +187,26 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
- // Reuse the previous VTableReplicated unmodified. It will contain extra vertices, which is
- // fine.
- new GraphImpl(newVerts, newEdges, new VertexPlacement(newEdges, newVerts), vTableReplicated)
+ // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been
+ // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
+ // an edge.
+ new GraphImpl(newVerts, newEdges, vTableReplicated)
} // end of subgraph
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
other: Graph[VD2, ED2]): Graph[VD, ED] = {
val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
- new GraphImpl(newVerts, newEdges)
-
+ // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been
+ // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
+ // an edge.
+ new GraphImpl(newVerts, newEdges, vTableReplicated)
}
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
ClosureCleaner.clean(merge)
val newETable = edges.mapEdgePartitions(_.groupEdges(merge))
- new GraphImpl(vertices, newETable, vertexPlacement, vTableReplicated)
+ new GraphImpl(vertices, newETable, vTableReplicated)
}
//////////////////////////////////////////////////////////////////////////////////////////////////
@@ -304,13 +284,12 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val newVerts = vertices.leftJoin(updates)(updateF)
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newVTableReplicated = new VTableReplicated[VD2](
- changedVerts, edges, vertexPlacement,
- Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
- new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
+ new GraphImpl(newVerts, edges, newVTableReplicated)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(updates)(updateF)
- new GraphImpl(newVerts, edges, vertexPlacement)
+ new GraphImpl(newVerts, edges)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
index b9b2a4705b..fc708da3d9 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
@@ -1,11 +1,13 @@
package org.apache.spark.graph.impl
+import org.apache.spark.Partitioner
+import scala.collection.mutable
+
import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
-import org.apache.spark.graph._
-
/**
* A view of the vertices after they are shipped to the join sites specified in
* `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated`
@@ -20,7 +22,6 @@ private[impl]
class VTableReplicated[VD: ClassManifest](
updatedVerts: VertexRDD[VD],
edges: EdgeRDD[_],
- vertexPlacement: VertexPlacement,
prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
/**
@@ -49,6 +50,9 @@ class VTableReplicated[VD: ClassManifest](
private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
+ private val routingTables: mutable.Map[(Boolean, Boolean), RDD[Array[Array[Vid]]]] =
+ new mutable.HashMap[(Boolean, Boolean), RDD[Array[Array[Vid]]]]
+
def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
(includeSrc, includeDst) match {
case (true, true) => bothAttrs
@@ -67,7 +71,7 @@ class VTableReplicated[VD: ClassManifest](
// includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
// shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
// also shipped there.
- val shippedActives = vertexPlacement.get(true, true)
+ val shippedActives = getRoutingTable(true, true)
.zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _))
.partitionBy(edges.partitioner.get)
// Update vTableReplicated with shippedActives, setting activeness flags in the resulting
@@ -85,7 +89,7 @@ class VTableReplicated[VD: ClassManifest](
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
- val shippedVerts = vertexPlacement.get(includeSrc, includeDst)
+ val shippedVerts = getRoutingTable(includeSrc, includeDst)
.zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest))
.partitionBy(edges.partitioner.get)
// TODO: Consider using a specialized shuffler.
@@ -125,6 +129,19 @@ class VTableReplicated[VD: ClassManifest](
}.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst))
}
}
+
+ /**
+ * Returns an RDD with the locations of edge-partition join sites for each vertex attribute in
+ * `vertices`; that is, the routing information for shipping vertex attributes to edge
+ * partitions. The routing information is stored as a compressed bitmap for each vertex partition.
+ */
+ private def getRoutingTable(
+ includeSrc: Boolean, includeDst: Boolean): RDD[Array[Array[Vid]]] = {
+ routingTables.getOrElseUpdate(
+ (includeSrc, includeDst),
+ VTableReplicated.createRoutingTable(
+ edges, updatedVerts.partitioner.get, includeSrc, includeDst))
+ }
}
object VTableReplicated {
@@ -174,6 +191,44 @@ object VTableReplicated {
(pid, actives.trim().array)
}
}
+
+ private def createRoutingTable(
+ edges: EdgeRDD[_],
+ vertexPartitioner: Partitioner,
+ includeSrc: Boolean,
+ includeDst: Boolean): RDD[Array[Array[Vid]]] = {
+ // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+ val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
+ val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
+ val numEdges = edgePartition.size
+ val vSet = new VertexSet
+ if (includeSrc) { // Add src vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.srcIds(i))
+ i += 1
+ }
+ }
+ if (includeDst) { // Add dst vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.dstIds(i))
+ i += 1
+ }
+ }
+ vSet.iterator.map { vid => (vid, pid) }
+ }
+
+ val numPartitions = vertexPartitioner.numPartitions
+ vid2pid.partitionBy(vertexPartitioner).mapPartitions { iter =>
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
+ for ((vid, pid) <- iter) {
+ pid2vid(pid) += vid
+ }
+
+ Iterator(pid2vid.map(_.trim().array))
+ }.cache().setName("VertexPlacement %s %s".format(includeSrc, includeDst))
+ }
}
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
deleted file mode 100644
index 44a0a05f74..0000000000
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.spark.graph.impl
-
-import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.collection.PrimitiveVector
-
-/**
- * Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is,
- * the routing information for shipping vertex attributes to edge partitions. This is always cached
- * because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes
- * and (possibly) once to ship the active-set information.
- */
-class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
-
- val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true)
- val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false)
- val dstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(false, true)
- val noAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(false, false)
-
- def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] =
- (includeSrcAttr, includeDstAttr) match {
- case (true, true) => bothAttrs
- case (true, false) => srcAttrOnly
- case (false, true) => dstAttrOnly
- case (false, false) => noAttrs
- }
-
- private def createPid2Vid(
- includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
- // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(Vid, Pid)] = eTable.partitionsRDD.mapPartitions { iter =>
- val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
- val numEdges = edgePartition.size
- val vSet = new VertexSet
- if (includeSrcAttr) { // Add src vertices to the set.
- var i = 0
- while (i < numEdges) {
- vSet.add(edgePartition.srcIds(i))
- i += 1
- }
- }
- if (includeDstAttr) { // Add dst vertices to the set.
- var i = 0
- while (i < numEdges) {
- vSet.add(edgePartition.dstIds(i))
- i += 1
- }
- }
- vSet.iterator.map { vid => (vid, pid) }
- }
-
- val numPartitions = vTable.partitions.size
- vid2pid.partitionBy(vTable.partitioner.get).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
- for ((vid, pid) <- iter) {
- pid2vid(pid) += vid
- }
-
- Iterator(pid2vid.map(_.trim().array))
- }.cache().setName("VertexPlacement %s %s".format(includeSrcAttr, includeDstAttr))
- }
-}