diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-03 12:55:05 -0700 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-08 21:19:14 -0800 |
commit | 0ad75cdfb0093a0b525c598c5af4b9745581a6d7 (patch) | |
tree | 6d96fea54bffad62bee717caf6aed6fa4a3e6da2 /graph/src | |
parent | ac536345f86e467ac83cb9c0dccbb34150335e26 (diff) | |
download | spark-0ad75cdfb0093a0b525c598c5af4b9745581a6d7.tar.gz spark-0ad75cdfb0093a0b525c598c5af4b9745581a6d7.tar.bz2 spark-0ad75cdfb0093a0b525c598c5af4b9745581a6d7.zip |
Manifest -> Tag in variable names
Diffstat (limited to 'graph/src')
3 files changed, 15 insertions, 15 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index 1c21967c9c..6f1d790325 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -76,10 +76,10 @@ class EdgeRDD[@specialized ED: ClassTag]( def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2]) (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = { - val ed2Manifest = classTag[ED2] - val ed3Manifest = classTag[ED3] + val ed2Tag = classTag[ED2] + val ed3Tag = classTag[ED3] zipEdgePartitions(other) { (pid, thisEPart, otherEPart) => - thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest) + thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 1dfd9cf316..826c1074a8 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -47,11 +47,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( /** Return a RDD that brings edges together with their source and destination vertices. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { - val vdManifest = classTag[VD] - val edManifest = classTag[ED] + val vdTag = classTag[VD] + val edTag = classTag[ED] edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (pid, ePart, vPartIter) => val (_, vPart) = vPartIter.next() - new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest) + new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag) } } @@ -65,7 +65,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { val numPartitions = edges.partitions.size - val edManifest = classTag[ED] + val edTag = classTag[ED] val newEdges = new EdgeRDD(edges.map { e => val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) @@ -74,7 +74,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } .partitionBy(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED]()(edManifest) + val builder = new EdgePartitionBuilder[ED]()(edTag) iter.foreach { message => val data = message.data builder.add(data._1, data._2, data._3) @@ -180,7 +180,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit // manifest from GraphImpl (which would require serializing GraphImpl). - val vdManifest = classTag[VD] + val vdTag = classTag[VD] val newEdgePartitions = edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (ePid, edgePartition, vTableReplicatedIter) => @@ -210,11 +210,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) // Filter the edges - val edManifest = classTag[ED] + val edTag = classTag[ED] val newEdges = new EdgeRDD[ED](triplets.filter { et => vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et) }.mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED]()(edManifest) + val builder = new EdgePartitionBuilder[ED]()(edTag) iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) } val edgePartition = builder.toEdgePartition Iterator((pid, edgePartition)) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala index 2124144df7..033971c1af 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala @@ -82,12 +82,12 @@ class ReplicatedVertexView[VD: ClassTag]( private def create(includeSrc: Boolean, includeDst: Boolean) : RDD[(Pid, VertexPartition[VD])] = { - val vdManifest = classTag[VD] + val vdTag = classTag[VD] // Ship vertex attributes to edge partitions according to vertexPlacement val verts = updatedVerts.partitionsRDD val shippedVerts = routingTable.get(includeSrc, includeDst) - .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdManifest)) + .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag)) .partitionBy(edges.partitioner.get) // TODO: Consider using a specialized shuffler. @@ -109,7 +109,7 @@ class ReplicatedVertexView[VD: ClassTag]( val (pid, vidToIndex) = mapIter.next() assert(!mapIter.hasNext) // Populate the vertex array using the vidToIndex map - val vertexArray = vdManifest.newArray(vidToIndex.capacity) + val vertexArray = vdTag.newArray(vidToIndex.capacity) for ((_, block) <- shippedVertsIter) { for (i <- 0 until block.vids.size) { val vid = block.vids(i) @@ -119,7 +119,7 @@ class ReplicatedVertexView[VD: ClassTag]( } } val newVPart = new VertexPartition( - vidToIndex, vertexArray, vidToIndex.getBitSet)(vdManifest) + vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag) Iterator((pid, newVPart)) }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst)) } |