aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorJoey <joseph.e.gonzalez@gmail.com>2014-01-04 12:54:41 -0800
committerJoey <joseph.e.gonzalez@gmail.com>2014-01-04 12:54:41 -0800
commit280ddf64bd597cd287e6f41b71fbc3c63a0d7e68 (patch)
tree9169bf85e5ed648212f9c7fad18e27c843d4a693 /graph/src
parentdc9cb83e8661d075d8f3b47c27b2f423287e598d (diff)
parentcfab8f2062fad9ea400716afce28d200dd714c2b (diff)
downloadspark-280ddf64bd597cd287e6f41b71fbc3c63a0d7e68.tar.gz
spark-280ddf64bd597cd287e6f41b71fbc3c63a0d7e68.tar.bz2
spark-280ddf64bd597cd287e6f41b71fbc3c63a0d7e68.zip
Merge pull request #121 from ankurdave/more-simplify
Simplify GraphImpl internals further
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala146
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala)55
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala)18
3 files changed, 106 insertions, 113 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 9e44f49113..16d73820f0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -16,35 +16,31 @@ import org.apache.spark.util.ClosureCleaner
* A Graph RDD that supports computation on graphs.
*
* Graphs are represented using two classes of data: vertex-partitioned and
- * edge-partitioned. `vTable` contains vertex attributes, which are
- * vertex-partitioned. `eTable` contains edge attributes, which are
- * edge-partitioned. For operations on vertex neighborhoods, vertex attributes
- * are replicated to the edge partitions where they appear as sources or
- * destinations. `vertexPlacement` specifies where each vertex will be
- * replicated. `vTableReplicated` stores the replicated vertex attributes, which
- * are co-partitioned with the relevant edges.
- *
- * mask in vertices means filter
- * mask in vTableReplicated means active
+ * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
+ * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
+ * vertex attributes are replicated to the edge partitions where they appear as sources or
+ * destinations. `routingTable` stores the routing information for shipping vertex attributes to
+ * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
+ * using the routing table.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vertices: VertexRDD[VD],
@transient val edges: EdgeRDD[ED],
- @transient val vertexPlacement: VertexPlacement,
- @transient val vTableReplicated: VTableReplicated[VD])
+ @transient val routingTable: RoutingTable,
+ @transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] {
def this(
vertices: VertexRDD[VD],
edges: EdgeRDD[ED],
- vertexPlacement: VertexPlacement) = {
- this(vertices, edges, vertexPlacement, new VTableReplicated(vertices, edges, vertexPlacement))
+ routingTable: RoutingTable) = {
+ this(vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable))
}
def this(
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]) = {
- this(vertices, edges, new VertexPlacement(edges, vertices))
+ this(vertices, edges, new RoutingTable(edges, vertices))
}
/** Return a RDD that brings edges together with their source and destination vertices. */
@@ -52,7 +48,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val vdManifest = classManifest[VD]
val edManifest = classManifest[ED]
- edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) =>
+ edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (ePart, vPartIter) =>
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest)
}
@@ -96,9 +92,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val numVertices = this.ops.numVertices
val numEdges = this.ops.numEdges
- val replicationRatioBoth = numReplicatedVertices(vertexPlacement.bothAttrs) / numVertices
- val replicationRatioSrcOnly = numReplicatedVertices(vertexPlacement.srcAttrOnly) / numVertices
- val replicationRatioDstOnly = numReplicatedVertices(vertexPlacement.dstAttrOnly) / numVertices
+ val replicationRatioBoth = numReplicatedVertices(routingTable.bothAttrs) / numVertices
+ val replicationRatioSrcOnly = numReplicatedVertices(routingTable.srcAttrOnly) / numVertices
+ val replicationRatioDstOnly = numReplicatedVertices(routingTable.dstAttrOnly) / numVertices
// One entry for each partition, indicate the total number of edges on that partition.
val loadArray = edges.partitionsRDD.map(_._2.size).collect().map(_.toDouble / numEdges)
val minLoad = loadArray.min
@@ -139,48 +135,48 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
}
}
- println("eTable ------------------------------------------")
+ println("edges ------------------------------------------")
traverseLineage(edges, " ")
- var visited = Map(edges.id -> "eTable")
- println("\n\nvTable ------------------------------------------")
+ var visited = Map(edges.id -> "edges")
+ println("\n\nvertices ------------------------------------------")
traverseLineage(vertices, " ", visited)
- visited += (vertices.id -> "vTable")
- println("\n\nvertexPlacement.bothAttrs -------------------------------")
- traverseLineage(vertexPlacement.bothAttrs, " ", visited)
- visited += (vertexPlacement.bothAttrs.id -> "vertexPlacement.bothAttrs")
+ visited += (vertices.id -> "vertices")
+ println("\n\nroutingTable.bothAttrs -------------------------------")
+ traverseLineage(routingTable.bothAttrs, " ", visited)
+ visited += (routingTable.bothAttrs.id -> "routingTable.bothAttrs")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
} // end of printLineage
override def reverse: Graph[VD, ED] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vertexPlacement, vTableReplicated)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), routingTable, replicatedVertexView)
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
if (classManifest[VD] equals classManifest[VD2]) {
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f))
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
- val newVTableReplicated = new VTableReplicated[VD2](
- changedVerts, edges, vertexPlacement,
- Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
- new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ val newReplicatedVertexView = new ReplicatedVertexView[VD2](
+ changedVerts, edges, routingTable,
+ Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
} else {
// The map does not preserve type, so we must re-replicate all vertices
- new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, vertexPlacement)
+ new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable)
}
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vertexPlacement, vTableReplicated)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), routingTable, replicatedVertexView)
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdManifest = classManifest[VD]
- val newETable =
- edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) =>
- val (pid, vPart) = vTableReplicatedIter.next()
+ val newEdgePartitions =
+ edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (edgePartition, vPartIter) =>
+ val (pid, vPart) = vPartIter.next()
val et = new EdgeTriplet[VD, ED]
val newEdgePartition = edgePartition.map { e =>
et.set(e)
@@ -190,7 +186,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
Iterator((pid, newEdgePartition))
}
- new GraphImpl(vertices, new EdgeRDD(newETable), vertexPlacement, vTableReplicated)
+ new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView)
}
override def subgraph(
@@ -210,23 +206,26 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
- // Reuse the previous VTableReplicated unmodified. It will contain extra vertices, which is
- // fine.
- new GraphImpl(newVerts, newEdges, new VertexPlacement(newEdges, newVerts), vTableReplicated)
+ // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
+ // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
+ // an edge.
+ new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
} // end of subgraph
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
other: Graph[VD2, ED2]): Graph[VD, ED] = {
val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
- new GraphImpl(newVerts, newEdges)
-
+ // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
+ // removed will be ignored, since we only refer to replicated vertices when they are adjacent to
+ // an edge.
+ new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView)
}
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
ClosureCleaner.clean(merge)
- val newETable = edges.mapEdgePartitions(_.groupEdges(merge))
- new GraphImpl(vertices, newETable, vertexPlacement, vTableReplicated)
+ val newEdges = edges.mapEdgePartitions(_.groupEdges(merge))
+ new GraphImpl(vertices, newEdges, routingTable, replicatedVertexView)
}
//////////////////////////////////////////////////////////////////////////////////////////////////
@@ -246,14 +245,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
val vs = activeSetOpt match {
- case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
- case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
+ case Some((activeSet, _)) =>
+ replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+ case None =>
+ replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr)
}
val activeDirectionOpt = activeSetOpt.map(_._2)
// Map and combine.
- val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) =>
- val (_, vPart) = vTableReplicatedIter.next()
+ val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vPartIter) =>
+ val (_, vPart) = vPartIter.next()
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
@@ -303,14 +304,14 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(updates)(updateF)
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
- val newVTableReplicated = new VTableReplicated[VD2](
- changedVerts, edges, vertexPlacement,
- Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
- new GraphImpl(newVerts, edges, vertexPlacement, newVTableReplicated)
+ val newReplicatedVertexView = new ReplicatedVertexView[VD2](
+ changedVerts, edges, routingTable,
+ Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(updates)(updateF)
- new GraphImpl(newVerts, edges, vertexPlacement)
+ new GraphImpl(newVerts, edges, routingTable)
}
}
@@ -330,13 +331,13 @@ object GraphImpl {
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- fromEdgeRDD(createETable(edges), defaultVertexAttr)
+ fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
}
def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
- edges: RDD[(Pid, EdgePartition[ED])],
+ edgePartitions: RDD[(Pid, EdgePartition[ED])],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr)
+ fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
}
def apply[VD: ClassManifest, ED: ClassManifest](
@@ -344,44 +345,39 @@ object GraphImpl {
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- val etable = createETable(edges).cache()
+ val edgeRDD = createEdgeRDD(edges).cache()
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
val vPartitioned = vertices.partitionBy(partitioner)
- val vidsFromEdges = collectVidsFromEdges(etable, partitioner)
+ val vidsFromEdges = collectVidsFromEdges(edgeRDD, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
- val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr)
+ val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
- new GraphImpl(vtable, etable)
+ new GraphImpl(vertexRDD, edgeRDD)
}
/**
- * Create the edge table RDD, which is much more efficient for Java heap storage than the
- * normal edges data structure (RDD[(Vid, Vid, ED)]).
+ * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
+ * data structure (RDD[(Vid, Vid, ED)]).
*
- * The edge table contains multiple partitions, and each partition contains only one RDD
- * key-value pair: the key is the partition id, and the value is an EdgePartition object
- * containing all the edges in a partition.
+ * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
+ * pair: the key is the partition id, and the value is an EdgePartition object containing all the
+ * edges in a partition.
*/
- private def createETable[ED: ClassManifest](
+ private def createEdgeRDD[ED: ClassManifest](
edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
- val eTable = edges.mapPartitionsWithIndex { (pid, iter) =>
+ val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
- new EdgeRDD(eTable)
- }
-
- private def createETableFromEdgePartitions[ED: ClassManifest](
- edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = {
- new EdgeRDD(edges)
+ new EdgeRDD(edgePartitions)
}
private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
@@ -391,8 +387,8 @@ object GraphImpl {
// Get the set of all vids
val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size))
// Create the VertexRDD.
- val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr))
- new GraphImpl(vtable, edges)
+ val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
+ new GraphImpl(vertices, edges)
}
/** Collects all vids mentioned in edges and partitions them by partitioner. */
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index b9b2a4705b..175586b87e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -8,20 +8,20 @@ import org.apache.spark.graph._
/**
* A view of the vertices after they are shipped to the join sites specified in
- * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated`
- * is specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise,
- * a fresh view is created.
+ * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
+ * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
+ * fresh view is created.
*
* The view is always cached (i.e., once it is created, it remains materialized). This avoids
* constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
* example.
*/
private[impl]
-class VTableReplicated[VD: ClassManifest](
+class ReplicatedVertexView[VD: ClassManifest](
updatedVerts: VertexRDD[VD],
edges: EdgeRDD[_],
- vertexPlacement: VertexPlacement,
- prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
+ routingTable: RoutingTable,
+ prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
/**
* Within each edge partition, create a local map from vid to an index into the attribute
@@ -29,9 +29,9 @@ class VTableReplicated[VD: ClassManifest](
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
- private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match {
- case Some(prev) =>
- prev.localVidMap
+ private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
+ case Some(prevView) =>
+ prevView.localVidMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
@@ -41,7 +41,7 @@ class VTableReplicated[VD: ClassManifest](
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
- }, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap")
+ }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVidMap")
}
private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
@@ -62,15 +62,14 @@ class VTableReplicated[VD: ClassManifest](
includeSrc: Boolean,
includeDst: Boolean,
actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
-
// Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
// includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
// shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
// also shipped there.
- val shippedActives = vertexPlacement.get(true, true)
- .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _))
+ val shippedActives = routingTable.get(true, true)
+ .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
.partitionBy(edges.partitioner.get)
- // Update vTableReplicated with shippedActives, setting activeness flags in the resulting
+ // Update the view with shippedActives, setting activeness flags in the resulting
// VertexPartitions
get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
val (pid, vPart) = viewIter.next()
@@ -85,23 +84,21 @@ class VTableReplicated[VD: ClassManifest](
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
- val shippedVerts = vertexPlacement.get(includeSrc, includeDst)
- .zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest))
+ val shippedVerts = routingTable.get(includeSrc, includeDst)
+ .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest))
.partitionBy(edges.partitioner.get)
// TODO: Consider using a specialized shuffler.
- prevVTableReplicated match {
- case Some(vTableReplicated) =>
- val prevView: RDD[(Pid, VertexPartition[VD])] =
- vTableReplicated.get(includeSrc, includeDst)
-
- // Update vTableReplicated with shippedVerts, setting staleness flags in the resulting
+ prevViewOpt match {
+ case Some(prevView) =>
+ // Update prevView with shippedVerts, setting staleness flags in the resulting
// VertexPartitions
- prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) =>
- val (pid, prevVPart) = prevViewIter.next()
- val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
- Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst))
+ prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
+ (prevViewIter, shippedVertsIter) =>
+ val (pid, prevVPart) = prevViewIter.next()
+ val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
+ Iterator((pid, newVPart))
+ }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
case None =>
// Within each edge partition, place the shipped vertex attributes into the correct
@@ -122,12 +119,12 @@ class VTableReplicated[VD: ClassManifest](
val newVPart = new VertexPartition(
vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)
Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst))
+ }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
}
}
}
-object VTableReplicated {
+object ReplicatedVertexView {
protected def buildBuffer[VD: ClassManifest](
pid2vidIter: Iterator[Array[Array[Vid]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
index 44a0a05f74..b6cd048b33 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPlacement.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/RoutingTable.scala
@@ -7,12 +7,12 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.collection.PrimitiveVector
/**
- * Stores the locations of edge-partition join sites for each vertex attribute in `vTable`; that is,
- * the routing information for shipping vertex attributes to edge partitions. This is always cached
- * because it may be used multiple times in VTableReplicated -- once to ship the vertex attributes
- * and (possibly) once to ship the active-set information.
+ * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing
+ * information for shipping vertex attributes to edge partitions. This is always cached because it
+ * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and
+ * (possibly) once to ship the active-set information.
*/
-class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
+class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
val bothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(true, true)
val srcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(true, false)
@@ -30,7 +30,7 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
private def createPid2Vid(
includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] = {
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
- val vid2pid: RDD[(Vid, Pid)] = eTable.partitionsRDD.mapPartitions { iter =>
+ val vid2pid: RDD[(Vid, Pid)] = edges.partitionsRDD.mapPartitions { iter =>
val (pid: Pid, edgePartition: EdgePartition[_]) = iter.next()
val numEdges = edgePartition.size
val vSet = new VertexSet
@@ -51,14 +51,14 @@ class VertexPlacement(eTable: EdgeRDD[_], vTable: VertexRDD[_]) {
vSet.iterator.map { vid => (vid, pid) }
}
- val numPartitions = vTable.partitions.size
- vid2pid.partitionBy(vTable.partitioner.get).mapPartitions { iter =>
+ val numPartitions = vertices.partitions.size
+ vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[Vid])
for ((vid, pid) <- iter) {
pid2vid(pid) += vid
}
Iterator(pid2vid.map(_.trim().array))
- }.cache().setName("VertexPlacement %s %s".format(includeSrcAttr, includeDstAttr))
+ }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr))
}
}