aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-20 15:58:40 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-20 15:58:40 -0800
commit17311359c44a99541e16489a8842f3013919e832 (patch)
tree6927b5e2842349ee9680315e03aedb09cfcecb9f /graph/src
parent32f957f3317bd62768b415da0c0cd9114f59782c (diff)
downloadspark-17311359c44a99541e16489a8842f3013919e832.tar.gz
spark-17311359c44a99541e16489a8842f3013919e832.tar.bz2
spark-17311359c44a99541e16489a8842f3013919e832.zip
Rename VTableReplicated -> ReplicatedVertexView
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala114
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala (renamed from graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala)51
2 files changed, 77 insertions, 88 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index ebeebd4c65..e7c4b5db82 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -16,27 +16,22 @@ import org.apache.spark.util.ClosureCleaner
* A Graph RDD that supports computation on graphs.
*
* Graphs are represented using two classes of data: vertex-partitioned and
- * edge-partitioned. `vTable` contains vertex attributes, which are
- * vertex-partitioned. `eTable` contains edge attributes, which are
- * edge-partitioned. For operations on vertex neighborhoods, vertex attributes
- * are replicated to the edge partitions where they appear as sources or
- * destinations. `vertexPlacement` specifies where each vertex will be
- * replicated. `vTableReplicated` stores the replicated vertex attributes, which
- * are co-partitioned with the relevant edges.
- *
- * mask in vertices means filter
- * mask in vTableReplicated means active
+ * edge-partitioned. `vertices` contains vertex attributes, which are vertex-partitioned. `edges`
+ * contains edge attributes, which are edge-partitioned. For operations on vertex neighborhoods,
+ * vertex attributes are replicated to the edge partitions where they appear as sources or
+ * destinations. `replicatedVertexView` stores a view of the replicated vertex attributes, which are
+ * co-partitioned with the relevant edges.
*/
class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vertices: VertexRDD[VD],
@transient val edges: EdgeRDD[ED],
- @transient val vTableReplicated: VTableReplicated[VD])
+ @transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] {
def this(
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]) = {
- this(vertices, edges, new VTableReplicated(vertices, edges))
+ this(vertices, edges, new ReplicatedVertexView(vertices, edges))
}
/** Return a RDD that brings edges together with their source and destination vertices. */
@@ -44,7 +39,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val vdManifest = classManifest[VD]
val edManifest = classManifest[ED]
- edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (ePart, vPartIter) =>
+ edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (ePart, vPartIter) =>
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest)
}
@@ -120,28 +115,28 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
deps.foreach(d => traverseLineage(d.rdd, indent + " | ", visited))
}
}
- println("eTable ------------------------------------------")
+ println("edges ------------------------------------------")
traverseLineage(edges, " ")
- var visited = Map(edges.id -> "eTable")
- println("\n\nvTable ------------------------------------------")
+ var visited = Map(edges.id -> "edges")
+ println("\n\nvertices ------------------------------------------")
traverseLineage(vertices, " ", visited)
- visited += (vertices.id -> "vTable")
+ visited += (vertices.id -> "vertices")
println("\n\ntriplets ----------------------------------------")
traverseLineage(triplets, " ", visited)
println(visited)
} // end of printLineage
override def reverse: Graph[VD, ED] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), vTableReplicated)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.reverse), replicatedVertexView)
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
if (classManifest[VD] equals classManifest[VD2]) {
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f))
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
- val newVTableReplicated = new VTableReplicated[VD2](
- changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
- new GraphImpl(newVerts, edges, newVTableReplicated)
+ val newReplicatedVertexView = new ReplicatedVertexView[VD2](
+ changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, newReplicatedVertexView)
} else {
// The map does not preserve type, so we must re-replicate all vertices
new GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges)
@@ -149,15 +144,15 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] =
- new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), vTableReplicated)
+ new GraphImpl(vertices, edges.mapEdgePartitions(_.map(f)), replicatedVertexView)
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
val vdManifest = classManifest[VD]
- val newETable =
- edges.zipEdgePartitions(vTableReplicated.get(true, true)) { (edgePartition, vTableReplicatedIter) =>
- val (pid, vPart) = vTableReplicatedIter.next()
+ val newEdgePartitions =
+ edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (edgePartition, vPartIter) =>
+ val (pid, vPart) = vPartIter.next()
val et = new EdgeTriplet[VD, ED]
val newEdgePartition = edgePartition.map { e =>
et.set(e)
@@ -167,7 +162,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
Iterator((pid, newEdgePartition))
}
- new GraphImpl(vertices, new EdgeRDD(newETable), vTableReplicated)
+ new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), replicatedVertexView)
}
override def subgraph(
@@ -187,26 +182,26 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
- // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been
+ // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
// removed will be ignored, since we only refer to replicated vertices when they are adjacent to
// an edge.
- new GraphImpl(newVerts, newEdges, vTableReplicated)
+ new GraphImpl(newVerts, newEdges, replicatedVertexView)
} // end of subgraph
override def mask[VD2: ClassManifest, ED2: ClassManifest] (
other: Graph[VD2, ED2]): Graph[VD, ED] = {
val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
- // Reuse the previous VTableReplicated unmodified. The replicated vertices that have been
+ // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been
// removed will be ignored, since we only refer to replicated vertices when they are adjacent to
// an edge.
- new GraphImpl(newVerts, newEdges, vTableReplicated)
+ new GraphImpl(newVerts, newEdges, replicatedVertexView)
}
override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
ClosureCleaner.clean(merge)
- val newETable = edges.mapEdgePartitions(_.groupEdges(merge))
- new GraphImpl(vertices, newETable, vTableReplicated)
+ val newEdges = edges.mapEdgePartitions(_.groupEdges(merge))
+ new GraphImpl(vertices, newEdges, replicatedVertexView)
}
//////////////////////////////////////////////////////////////////////////////////////////////////
@@ -226,14 +221,16 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val mapUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
val mapUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
val vs = activeSetOpt match {
- case Some((activeSet, _)) => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
- case None => vTableReplicated.get(mapUsesSrcAttr, mapUsesDstAttr)
+ case Some((activeSet, _)) =>
+ replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet)
+ case None =>
+ replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr)
}
val activeDirectionOpt = activeSetOpt.map(_._2)
// Map and combine.
- val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vTableReplicatedIter) =>
- val (_, vPart) = vTableReplicatedIter.next()
+ val preAgg = edges.zipEdgePartitions(vs) { (edgePartition, vPartIter) =>
+ val (_, vPart) = vPartIter.next()
// Choose scan method
val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
@@ -283,9 +280,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(updates)(updateF)
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
- val newVTableReplicated = new VTableReplicated[VD2](
- changedVerts, edges, Some(vTableReplicated.asInstanceOf[VTableReplicated[VD2]]))
- new GraphImpl(newVerts, edges, newVTableReplicated)
+ val newReplicatedVertexView = new ReplicatedVertexView[VD2](
+ changedVerts, edges, Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]]))
+ new GraphImpl(newVerts, edges, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(updates)(updateF)
@@ -309,13 +306,13 @@ object GraphImpl {
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- fromEdgeRDD(createETable(edges), defaultVertexAttr)
+ fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
}
def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
- edges: RDD[(Pid, EdgePartition[ED])],
+ edgePartitions: RDD[(Pid, EdgePartition[ED])],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- fromEdgeRDD(createETableFromEdgePartitions(edges), defaultVertexAttr)
+ fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
}
def apply[VD: ClassManifest, ED: ClassManifest](
@@ -323,44 +320,39 @@ object GraphImpl {
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
- val etable = createETable(edges).cache()
+ val edgeRDD = createEdgeRDD(edges).cache()
// Get the set of all vids
val partitioner = Partitioner.defaultPartitioner(vertices)
val vPartitioned = vertices.partitionBy(partitioner)
- val vidsFromEdges = collectVidsFromEdges(etable, partitioner)
+ val vidsFromEdges = collectVidsFromEdges(edgeRDD, partitioner)
val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) =>
vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1)
}
- val vtable = VertexRDD(vids, vPartitioned, defaultVertexAttr)
+ val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr)
- new GraphImpl(vtable, etable)
+ new GraphImpl(vertexRDD, edgeRDD)
}
/**
- * Create the edge table RDD, which is much more efficient for Java heap storage than the
- * normal edges data structure (RDD[(Vid, Vid, ED)]).
+ * Create the edge RDD, which is much more efficient for Java heap storage than the normal edges
+ * data structure (RDD[(Vid, Vid, ED)]).
*
- * The edge table contains multiple partitions, and each partition contains only one RDD
- * key-value pair: the key is the partition id, and the value is an EdgePartition object
- * containing all the edges in a partition.
+ * The edge RDD contains multiple partitions, and each partition contains only one RDD key-value
+ * pair: the key is the partition id, and the value is an EdgePartition object containing all the
+ * edges in a partition.
*/
- private def createETable[ED: ClassManifest](
+ private def createEdgeRDD[ED: ClassManifest](
edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
- val eTable = edges.mapPartitionsWithIndex { (pid, iter) =>
+ val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
- new EdgeRDD(eTable)
- }
-
- private def createETableFromEdgePartitions[ED: ClassManifest](
- edges: RDD[(Pid, EdgePartition[ED])]): EdgeRDD[ED] = {
- new EdgeRDD(edges)
+ new EdgeRDD(edgePartitions)
}
private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
@@ -370,8 +362,8 @@ object GraphImpl {
// Get the set of all vids
val vids = collectVidsFromEdges(edges, new HashPartitioner(edges.partitions.size))
// Create the VertexRDD.
- val vtable = VertexRDD(vids.mapValues(x => defaultVertexAttr))
- new GraphImpl(vtable, edges)
+ val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr))
+ new GraphImpl(vertices, edges)
}
/** Collects all vids mentioned in edges and partitions them by partitioner. */
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index fc708da3d9..bef99810bd 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicated.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -10,19 +10,19 @@ import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
/**
* A view of the vertices after they are shipped to the join sites specified in
- * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevVTableReplicated`
- * is specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise,
- * a fresh view is created.
+ * `vertexPlacement`. The resulting view is co-partitioned with `edges`. If `prevViewOpt` is
+ * specified, `updatedVerts` are treated as incremental updates to the previous view. Otherwise, a
+ * fresh view is created.
*
* The view is always cached (i.e., once it is created, it remains materialized). This avoids
* constructing it twice if the user calls graph.triplets followed by graph.mapReduceTriplets, for
* example.
*/
private[impl]
-class VTableReplicated[VD: ClassManifest](
+class ReplicatedVertexView[VD: ClassManifest](
updatedVerts: VertexRDD[VD],
edges: EdgeRDD[_],
- prevVTableReplicated: Option[VTableReplicated[VD]] = None) {
+ prevViewOpt: Option[ReplicatedVertexView[VD]] = None) {
/**
* Within each edge partition, create a local map from vid to an index into the attribute
@@ -30,9 +30,9 @@ class VTableReplicated[VD: ClassManifest](
* vids from both the source and destination of edges. It must always include both source and
* destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this.
*/
- private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevVTableReplicated match {
- case Some(prev) =>
- prev.localVidMap
+ private val localVidMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match {
+ case Some(prevView) =>
+ prevView.localVidMap
case None =>
edges.partitionsRDD.mapPartitions(_.map {
case (pid, epart) =>
@@ -42,7 +42,7 @@ class VTableReplicated[VD: ClassManifest](
vidToIndex.add(e.dstId)
}
(pid, vidToIndex)
- }, preservesPartitioning = true).cache().setName("VTableReplicated localVidMap")
+ }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVidMap")
}
private lazy val bothAttrs: RDD[(Pid, VertexPartition[VD])] = create(true, true)
@@ -66,15 +66,14 @@ class VTableReplicated[VD: ClassManifest](
includeSrc: Boolean,
includeDst: Boolean,
actives: VertexRDD[_]): RDD[(Pid, VertexPartition[VD])] = {
-
// Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and
// includeDst. These flags govern attribute shipping, but the activeness of a vertex must be
// shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is
// also shipped there.
val shippedActives = getRoutingTable(true, true)
- .zipPartitions(actives.partitionsRDD)(VTableReplicated.buildActiveBuffer(_, _))
+ .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _))
.partitionBy(edges.partitioner.get)
- // Update vTableReplicated with shippedActives, setting activeness flags in the resulting
+ // Update the view with shippedActives, setting activeness flags in the resulting
// VertexPartitions
get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) =>
val (pid, vPart) = viewIter.next()
@@ -90,22 +89,20 @@ class VTableReplicated[VD: ClassManifest](
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
val shippedVerts = getRoutingTable(includeSrc, includeDst)
- .zipPartitions(verts)(VTableReplicated.buildBuffer(_, _)(vdManifest))
+ .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest))
.partitionBy(edges.partitioner.get)
// TODO: Consider using a specialized shuffler.
- prevVTableReplicated match {
- case Some(vTableReplicated) =>
- val prevView: RDD[(Pid, VertexPartition[VD])] =
- vTableReplicated.get(includeSrc, includeDst)
-
- // Update vTableReplicated with shippedVerts, setting staleness flags in the resulting
+ prevViewOpt match {
+ case Some(prevView) =>
+ // Update prevView with shippedVerts, setting staleness flags in the resulting
// VertexPartitions
- prevView.zipPartitions(shippedVerts) { (prevViewIter, shippedVertsIter) =>
- val (pid, prevVPart) = prevViewIter.next()
- val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
- Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated delta %s %s".format(includeSrc, includeDst))
+ prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) {
+ (prevViewIter, shippedVertsIter) =>
+ val (pid, prevVPart) = prevViewIter.next()
+ val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator))
+ Iterator((pid, newVPart))
+ }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst))
case None =>
// Within each edge partition, place the shipped vertex attributes into the correct
@@ -126,7 +123,7 @@ class VTableReplicated[VD: ClassManifest](
val newVPart = new VertexPartition(
vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)
Iterator((pid, newVPart))
- }.cache().setName("VTableReplicated %s %s".format(includeSrc, includeDst))
+ }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
}
}
@@ -139,12 +136,12 @@ class VTableReplicated[VD: ClassManifest](
includeSrc: Boolean, includeDst: Boolean): RDD[Array[Array[Vid]]] = {
routingTables.getOrElseUpdate(
(includeSrc, includeDst),
- VTableReplicated.createRoutingTable(
+ ReplicatedVertexView.createRoutingTable(
edges, updatedVerts.partitioner.get, includeSrc, includeDst))
}
}
-object VTableReplicated {
+object ReplicatedVertexView {
protected def buildBuffer[VD: ClassManifest](
pid2vidIter: Iterator[Array[Array[Vid]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {