aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-03 17:55:58 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-01-03 18:01:13 -0700
commitcfab8f2062fad9ea400716afce28d200dd714c2b (patch)
treecef8dfbc0645a7c690e24e83daa1b4d0278c93c7 /graph/src
parent17311359c44a99541e16489a8842f3013919e832 (diff)
downloadspark-cfab8f2062fad9ea400716afce28d200dd714c2b.tar.gz
spark-cfab8f2062fad9ea400716afce28d200dd714c2b.tar.bz2
spark-cfab8f2062fad9ea400716afce28d200dd714c2b.zip
Revert "Create VertexPlacement on demand in VTableReplicated"
This reverts commit 32f957f3317bd62768b415da0c0cd9114f59782c. Conflicts: graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala55
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala65
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala64
3 files changed, 109 insertions, 75 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index e7c4b5db82..16d73820f0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -19,19 +19,28 @@ import org.apache.spark.util.ClosureCleaner
* edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
* contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
* vertex attributes are replicated to the edge partitions where they appear as sources or
- * destinations. `replicatedVertexView` stores a view of the replicated vertex attributes, which are
- * co-partitioned with the relevant edges.
+ * destinations. `routingTable` stores the routing information for shipping vertex attributes to
+ * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
+ * using the routing table.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vertices: VertexRDD[VD],
@transient val edges: EdgeRDD[ED],
+ @transient val routingTable: RoutingTable,
@transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] {
def this(
vertices: VertexRDD[VD],
+ edges: EdgeRDD[ED],
+ routingTable: RoutingTable) = {
+ this(vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
+ }
+
+ def this(
+ vertices: VertexRDD[VD],
edges: EdgeRDD[ED]) = {
- this(vertices, edges, new ReplicatedVertexView(vertices, edges))
+ this(vertices, edges, new RoutingTable(edges, vertices))
}
/** Return a RDD that brings edges together with their source and destination vertices. */
@@ -76,8 +85,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
override def statistics: Map[String, Any] = {
+ // Get the total number of vertices after replication, used to compute the replication ratio.
+ def numReplicatedVertices(vid2pids: RDD[Array[Array[Vid]]]): Double = {
+ vid2pids.map(_.map(_.size).sum.toLong).reduce(_ + _).toDouble
+ }
+
val numVertices = this.ops.numVertices
val numEdges = this.ops.numEdges
+ val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices
+ val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices
+ val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices
// One entry for each partition, indicate the total number of edges on that partition.
val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
val minLoad = loadArray.min
@@ -85,6 +102,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Map(
"Num Vertices" -> numVertices,
"Num Edges" -> numEdges,
+ "Replication (both)" -> replicationRatioBoth,
+ "Replication (src only)" -> replicationRatioSrcOnly,
+ "Replication (dest only)" -> replicationRatioDstOnly,
"Load Array" -> loadArray,
"Min Load" -> minLoad,
"Max Load" -> maxLoad)
@@ -121,13 +141,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
println("\n\nvertices ------------------------------------------")
traverseLineage(vertices, " ", visited)
visited += (vertices.id -> "vertices")
+ println("\n\nroutingTable.bothAttrs -------------------------------")
+ traverseLineage(routingTable.bothAttrs, " ", visited)
+ visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
} // end of printLineage
override def reverse: Graph[VD, ED] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), replicatedVertexView)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), routingTable, replicatedVertexView)
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
if (classManifest[VD] equals classManifest[VD2]) {
@@ -135,16 +158,17 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val newVerts = vertices.mapVertexPartitions(_.map(f))
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = new ReplicatedVertexView[VD2](
- changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
- new GraphImpl(newVerts, edges, newReplicatedVertexView)
+ changedVerts, edges, routingTable,
+ Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
} else {
// The map does not preserve type, so we must re-replicate all vertices
- new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges)
+ new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
}
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), replicatedVertexView)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), routingTable, replicatedVertexView)
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
@@ -162,7 +186,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
Iterator((pid, newEdgePartition))
}
- new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), replicatedVertexView)
+ new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
}
override def subgraph(
@@ -185,7 +209,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
// removed will be ignored, since we only refer to replicated vertices when they are adjacent to
// an edge.
- new GraphImpl(newVerts, newEdges, replicatedVertexView)
+ new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
} // end of subgraph
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
@@ -195,13 +219,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
// removed will be ignored, since we only refer to replicated vertices when they are adjacent to
// an edge.
- new GraphImpl(newVerts, newEdges, replicatedVertexView)
+ new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView)
}
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
ClosureCleaner.clean(merge)
val newEdges = edges.mapEdgePartitions(_.groupEdges(merge))
- new GraphImpl(vertices, newEdges, replicatedVertexView)
+ new GraphImpl(vertices, newEdges, routingTable, replicatedVertexView)
}
//////////////////////////////////////////////////////////////////////////////////////////////////
@@ -281,12 +305,13 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val newVerts = vertices.leftJoin(updates)(updateF)
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = new ReplicatedVertexView[VD2](
- changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
- new GraphImpl(newVerts, edges, newReplicatedVertexView)
+ changedVerts, edges, routingTable,
+ Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(updates)(updateF)
- new GraphImpl(newVerts, edges)
+ new GraphImpl(newVerts, edges, routingTable)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index bef99810bd..175586b87e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -1,13 +1,11 @@
package org.apache.spark.graph.impl
-import org.apache.spark.Partitioner
-import scala.collection.mutable
-
import org.apache.spark.SparkContext._
-import org.apache.spark.graph._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
+import org.apache.spark.graph._
+
/**
* A view of the vertices after they are shipped to the join sites specified in
* `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
@@ -22,6 +20,7 @@ private[impl]
class ReplicatedVertexView[VD: ClassManifest](
updatedVerts: VertexRDD[VD],
edges: EdgeRDD[_],
+ routingTable: RoutingTable,
prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
/**
@@ -50,9 +49,6 @@ class ReplicatedVertexView[VD: ClassManifest](
private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
- private val routingTables: mutable.Map[(Boolean, Boolean), RDD[Array[Array[Vid]]]] =
- new mutable.HashMap[(Boolean, Boolean), RDD[Array[Array[Vid]]]]
-
def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
(includeSrc, includeDst) match {
case (true, true) => bothAttrs
@@ -70,7 +66,7 @@ class ReplicatedVertexView[VD: ClassManifest](
// includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
// shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
// also shipped there.
- val shippedActives = getRoutingTable(true, true)
+ val shippedActives = routingTable.get(true, true)
.zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
.partitionBy(edges.partitioner.get)
// Update the view with shippedActives, setting activeness flags in the resulting
@@ -88,7 +84,7 @@ class ReplicatedVertexView[VD: ClassManifest](
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
- val shippedVerts = getRoutingTable(includeSrc, includeDst)
+ val shippedVerts = routingTable.get(includeSrc, includeDst)
.zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest))
.partitionBy(edges.partitioner.get)
// TODO: Consider using a specialized shuffler.
@@ -126,19 +122,6 @@ class ReplicatedVertexView[VD: ClassManifest](
}.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
}
}
-
- /**
- * Returns an RDD with the locations of edge-partition join sites for each vertex attribute in
- * `vertices`; that is, the routing information for shipping vertex attributes to edge
- * partitions. The routing information is stored as a compressed bitmap for each vertex partition.
- */
- private def getRoutingTable(
- includeSrc: Boolean, includeDst: Boolean): RDD[Array[Array[Vid]]] = {
- routingTables.getOrElseUpdate(
- (includeSrc, includeDst),
- ReplicatedVertexView.createRoutingTable(
- edges, updatedVerts.partitioner.get, includeSrc, includeDst))
- }
}
object ReplicatedVertexView {
@@ -188,44 +171,6 @@ object ReplicatedVertexView {
(pid, actives.trim().array)
}
}
-
- private def createRoutingTable(
- edges: EdgeRDD[_],
- vertexPartitioner: Partitioner,
- includeSrc: Boolean,
- includeDst: Boolean): RDD[Array[Array[Vid]]] = {
- // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
- val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
- val numEdges = edgePartition.size
- val vSet = new VertexSet
- if (includeSrc) { // Add src vertices to the set.
- var i = 0
- while (i < numEdges) {
- vSet.add(edgePartition.srcIds(i))
- i += 1
- }
- }
- if (includeDst) { // Add dst vertices to the set.
- var i = 0
- while (i < numEdges) {
- vSet.add(edgePartition.dstIds(i))
- i += 1
- }
- }
- vSet.iterator.map { vid => (vid, pid) }
- }
-
- val numPartitions = vertexPartitioner.numPartitions
- vid2pid.partitionBy(vertexPartitioner).mapPartitions { iter =>
- val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
- for ((vid, pid) <- iter) {
- pid2vid(pid) += vid
- }
-
- Iterator(pid2vid.map(_.trim().array))
- }.cache().setName("VertexPlacement %s %s".format(includeSrc, includeDst))
- }
}
class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
new file mode 100644
index 0000000000..b6cd048b33
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
@@ -0,0 +1,64 @@
+package org.apache.spark.graph.impl
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.graph._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.collection.PrimitiveVector
+
+/**
+ * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing
+ * information for shipping vertex attributes to edge partitions. This is always cached because it
+ * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and
+ * (possibly) once to ship the active-set information.
+ */
+class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
+
+ val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true)
+ val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false)
+ val dstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(false, true)
+ val noAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(false, false)
+
+ def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] =
+ (includeSrcAttr, includeDstAttr) match {
+ case (true, true) => bothAttrs
+ case (true, false) => srcAttrOnly
+ case (false, true) => dstAttrOnly
+ case (false, false) => noAttrs
+ }
+
+ private def createPid2Vid(
+ includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
+ // Determine which vertices each edge partition needs by creating a mapping from vid to pid.
+ val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
+ val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
+ val numEdges = edgePartition.size
+ val vSet = new VertexSet
+ if (includeSrcAttr) { // Add src vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.srcIds(i))
+ i += 1
+ }
+ }
+ if (includeDstAttr) { // Add dst vertices to the set.
+ var i = 0
+ while (i < numEdges) {
+ vSet.add(edgePartition.dstIds(i))
+ i += 1
+ }
+ }
+ vSet.iterator.map { vid => (vid, pid) }
+ }
+
+ val numPartitions = vertices.partitions.size
+ vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
+ val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
+ for ((vid, pid) <- iter) {
+ pid2vid(pid) += vid
+ }
+
+ Iterator(pid2vid.map(_.trim().array))
+ }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr))
+ }
+}