aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-03 12:55:05 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-01-08 21:19:14 -0800
commit0ad75cdfb0093a0b525c598c5af4b9745581a6d7 (patch)
tree6d96fea54bffad62bee717caf6aed6fa4a3e6da2 /graph/src
parentac536345f86e467ac83cb9c0dccbb34150335e26 (diff)
downloadspark-0ad75cdfb0093a0b525c598c5af4b9745581a6d7.tar.gz
spark-0ad75cdfb0093a0b525c598c5af4b9745581a6d7.tar.bz2
spark-0ad75cdfb0093a0b525c598c5af4b9745581a6d7.zip
Manifest -> Tag in variable names
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala6
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala16
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala8
3 files changed, 15 insertions, 15 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index 1c21967c9c..6f1d790325 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -76,10 +76,10 @@ class EdgeRDD[@specialized ED: ClassTag](
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = {
- val ed2Manifest = classTag[ED2]
- val ed3Manifest = classTag[ED3]
+ val ed2Tag = classTag[ED2]
+ val ed3Tag = classTag[ED3]
zipEdgePartitions(other) { (pid, thisEPart, otherEPart) =>
- thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest)
+ thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 1dfd9cf316..826c1074a8 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -47,11 +47,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
/** Return a RDD that brings edges together with their source and destination vertices. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
- val vdManifest = classTag[VD]
- val edManifest = classTag[ED]
+ val vdTag = classTag[VD]
+ val edTag = classTag[ED]
edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (pid, ePart, vPartIter) =>
val (_, vPart) = vPartIter.next()
- new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest)
+ new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag)
}
}
@@ -65,7 +65,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
val numPartitions = edges.partitions.size
- val edManifest = classTag[ED]
+ val edTag = classTag[ED]
val newEdges = new EdgeRDD(edges.map { e =>
val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
@@ -74,7 +74,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex( { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED]()(edManifest)
+ val builder = new EdgePartitionBuilder[ED]()(edTag)
iter.foreach { message =>
val data = message.data
builder.add(data._1, data._2, data._3)
@@ -180,7 +180,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
- val vdManifest = classTag[VD]
+ val vdTag = classTag[VD]
val newEdgePartitions =
edges.zipEdgePartitions(replicatedVertexView.get(true, true)) {
(ePid, edgePartition, vTableReplicatedIter) =>
@@ -210,11 +210,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
// Filter the edges
- val edManifest = classTag[ED]
+ val edTag = classTag[ED]
val newEdges = new EdgeRDD[ED](triplets.filter { et =>
vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
}.mapPartitionsWithIndex( { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED]()(edManifest)
+ val builder = new EdgePartitionBuilder[ED]()(edTag)
iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) }
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index 2124144df7..033971c1af 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -82,12 +82,12 @@ class ReplicatedVertexView[VD: ClassTag](
private def create(includeSrc: Boolean, includeDst: Boolean)
: RDD[(Pid, VertexPartition[VD])] = {
- val vdManifest = classTag[VD]
+ val vdTag = classTag[VD]
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
val shippedVerts = routingTable.get(includeSrc, includeDst)
- .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest))
+ .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag))
.partitionBy(edges.partitioner.get)
// TODO: Consider using a specialized shuffler.
@@ -109,7 +109,7 @@ class ReplicatedVertexView[VD: ClassTag](
val (pid, vidToIndex) = mapIter.next()
assert(!mapIter.hasNext)
// Populate the vertex array using the vidToIndex map
- val vertexArray = vdManifest.newArray(vidToIndex.capacity)
+ val vertexArray = vdTag.newArray(vidToIndex.capacity)
for ((_, block) <- shippedVertsIter) {
for (i <- 0 until block.vids.size) {
val vid = block.vids(i)
@@ -119,7 +119,7 @@ class ReplicatedVertexView[VD: ClassTag](
}
}
val newVPart = new VertexPartition(
- vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest)
+ vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag)
Iterator((pid, newVPart))
}.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst))
}