aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-14 05:22:46 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-14 15:28:24 -0800
commit0c3fc1c1b62be702e89a651ef1ef442a984f1bba (patch)
tree2ce08d6d67e88f04f4f136a370b244beda4edbb3 /graph/src
parent59f625b7454d882c06461222c3e305910fff9594 (diff)
downloadspark-0c3fc1c1b62be702e89a651ef1ef442a984f1bba.tar.gz
spark-0c3fc1c1b62be702e89a651ef1ef442a984f1bba.tar.bz2
spark-0c3fc1c1b62be702e89a651ef1ef442a984f1bba.zip
Avoid re-creating the view RDDs multiple times
Previously, successive operations that support incremental view maintenance would inadvertently recreate previous view RDDs by calling VTableReplicated.get(), which created the RDDs anew though they were already cached. This change memoizes the RDDs and separates the process of shipping active-set information to an existing view.
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala146
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala13
3 files changed, 74 insertions, 95 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index d15423e611..e3d5e37c8d 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -45,7 +45,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val vdManifest = classManifest[VD]
val edManifest = classManifest[ED]
- edges.zipEdgePartitions(vTableReplicated.get(true, true, None)) { (ePart, vPartIter) =>
+ edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) =>
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest)
}
@@ -54,7 +54,6 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
vertices.persist(newLevel)
edges.persist(newLevel)
- vertexPlacement.persist(newLevel)
this
}
@@ -151,7 +150,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdManifest = classManifest[VD]
val newETable =
- edges.zipEdgePartitions(vTableReplicated.get(true, true, None)) { (edgePartition, vTableReplicatedIter) =>
+ edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) =>
val (pid, vPart) = vTableReplicatedIter.next()
val et = new EdgeTriplet[VD, ED]
val newEdgePartition = edgePartition.map { e =>
@@ -212,7 +211,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// in the relevant position in an edge.
val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
- val vs = vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSetOpt.map(_._1))
+ val vs = activeSetOpt match {
+ case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+ case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
+ }
val activeDirectionOpt = activeSetOpt.map(_._2)
// Map and combine.
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
index eb045b7f87..6124dfab83 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
@@ -41,106 +41,88 @@ class VTableReplicated[VD: ClassManifest](
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
- }, preservesPartitioning = true).cache()
+ }, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap")
}
- def get(includeSrc: Boolean, includeDst: Boolean, activesOpt: Option[VertexRDD[_]] = None)
- : RDD[(Pid, VertexPartition[VD])] = {
+ private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
+ private lazy val srcAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(true, false)
+ private lazy val dstAttrOnly: RDD[(Pid, VertexPartition[VD])] = create(false, true)
+ private lazy val noAttrs: RDD[(Pid, VertexPartition[VD])] = create(false, false)
+
+ def get(includeSrc: Boolean, includeDst: Boolean): RDD[(Pid, VertexPartition[VD])] = {
+ (includeSrc, includeDst) match {
+ case (true, true) => bothAttrs
+ case (true, false) => srcAttrOnly
+ case (false, true) => dstAttrOnly
+ case (false, false) => noAttrs
+ }
+ }
+
+ def get(
+ includeSrc: Boolean,
+ includeDst: Boolean,
+ actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
+
+ // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
+ // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
+ // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
+ // also shipped there.
+ val shippedActives = vertexPlacement.get(true, true)
+ .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _))
+ .partitionBy(edges.partitioner.get)// .cache().setName("VTableReplicated shippedActives")
+ // Update vTableReplicated with shippedActives, setting activeness flags in the resulting
+ // VertexPartitions
+ get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
+ val (pid, vPart) = viewIter.next()
+ val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator))
+ Iterator((pid, newPart))
+ }
+ }
+ private def create(includeSrc: Boolean, includeDst: Boolean)
+ : RDD[(Pid, VertexPartition[VD])] = {
val vdManifest = classManifest[VD]
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
val shippedVerts = vertexPlacement.get(includeSrc, includeDst)
.zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest))
- .partitionBy(edges.partitioner.get).cache()
+ .partitionBy(edges.partitioner.get)// .cache().setName("VTableReplicated shippedVerts")
// TODO: Consider using a specialized shuffler.
- // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
- // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
- // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
- // also shipped there.
- val shippedActivesOpt = activesOpt.map { actives =>
- vertexPlacement.get(true, true)
- .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _))
- .partitionBy(edges.partitioner.get).cache() // TODO(ankurdave): Why do we cache this?
- }
-
prevVTableReplicated match {
case Some(vTableReplicated) =>
val prevView: RDD[(Pid, VertexPartition[VD])] =
vTableReplicated.get(includeSrc, includeDst)
- // Update vTableReplicated with updatedVerts, setting staleness and activeness flags in the
- // resulting VertexPartitions
- shippedActivesOpt match {
- case Some(shippedActives) =>
- prevView.zipPartitions(shippedVerts, shippedActives) {
- (prevViewIter, shippedVertsIter, shippedActivesIter) =>
- val (pid, prevVPart) = prevViewIter.next()
- val newVPart = prevVPart
- .innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
- .replaceActives(shippedActivesIter.flatMap(_._2.iterator))
- Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated delta actives %s %s".format(includeSrc, includeDst))
- case None =>
- prevView.zipPartitions(shippedVerts) {
- (prevViewIter, shippedVertsIter) =>
- val (pid, prevVPart) = prevViewIter.next()
- val newVPart = prevVPart
- .innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
- Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst))
- }
+ // Update vTableReplicated with shippedVerts, setting staleness flags in the resulting
+ // VertexPartitions
+ prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) =>
+ val (pid, prevVPart) = prevViewIter.next()
+ val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
+ Iterator((pid, newVPart))
+ }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst))
case None =>
- // Within each edge partition, place the vertex attributes received from
- // msgsByPartition into the correct locations specified in localVidMap
- shippedActivesOpt match {
- case Some(shippedActives) =>
- localVidMap.zipPartitions(shippedVerts, shippedActives) {
- (mapIter, shippedVertsIter, shippedActivesIter) =>
- val (pid, vidToIndex) = mapIter.next()
- assert(!mapIter.hasNext)
- // Populate the vertex array using the vidToIndex map
- val vertexArray = vdManifest.newArray(vidToIndex.capacity)
- for ((_, block) <- shippedVertsIter) {
- for (i <- 0 until block.vids.size) {
- val vid = block.vids(i)
- val attr = block.attrs(i)
- val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
- vertexArray(ind) = attr
- }
- }
- // Populate the activeSet with the received actives
- val activeSet = new VertexSet(vidToIndex.capacity)
- for (activeVid <- shippedActivesIter.flatMap(_._2.iterator)) {
- activeSet.add(activeVid)
- }
- val newVPart = new VertexPartition(
- vidToIndex, vertexArray, vidToIndex.getBitSet, Some(activeSet))(vdManifest)
- Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated active %s %s".format(includeSrc, includeDst))
-
- case None =>
- localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
- val (pid, vidToIndex) = mapIter.next()
- assert(!mapIter.hasNext)
- // Populate the vertex array using the vidToIndex map
- val vertexArray = vdManifest.newArray(vidToIndex.capacity)
- for ((_, block) <- shippedVertsIter) {
- for (i <- 0 until block.vids.size) {
- val vid = block.vids(i)
- val attr = block.attrs(i)
- val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
- vertexArray(ind) = attr
- }
- }
- val newVPart = new VertexPartition(
- vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)
- Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst))
- }
+ // Within each edge partition, place the shipped vertex attributes into the correct
+ // locations specified in localVidMap
+ localVidMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) =>
+ val (pid, vidToIndex) = mapIter.next()
+ assert(!mapIter.hasNext)
+ // Populate the vertex array using the vidToIndex map
+ val vertexArray = vdManifest.newArray(vidToIndex.capacity)
+ for ((_, block) <- shippedVertsIter) {
+ for (i <- 0 until block.vids.size) {
+ val vid = block.vids(i)
+ val attr = block.attrs(i)
+ val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
+ vertexArray(ind) = attr
+ }
+ }
+ val newVPart = new VertexPartition(
+ vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)
+ Iterator((pid, newVPart))
+ }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst))
}
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
index 24fdf0db45..44a0a05f74 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
@@ -7,8 +7,10 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.PrimitiveVector
/**
- * Stores the layout of replicated vertex attributes for GraphImpl. Tells each
- * partition of the vertex data where it should go.
+ * Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is,
+ * the routing information for shipping vertex attributes to edge partitions. This is always cached
+ * because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes
+ * and (possibly) once to ship the active-set information.
*/
class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
@@ -25,13 +27,6 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
case (false, false) => noAttrs
}
- def persist(newLevel: StorageLevel) {
- bothAttrs.persist(newLevel)
- srcAttrOnly.persist(newLevel)
- dstAttrOnly.persist(newLevel)
- noAttrs.persist(newLevel)
- }
-
private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.