aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-03 12:50:56 -0700
committerAnkur Dave <ankurdave@gmail.com>2014-01-08 21:19:14 -0800
commitac536345f86e467ac83cb9c0dccbb34150335e26 (patch)
treeb0e51b2a9629fd39bd31020051b33eba90cc93e2 /graph/src
parent78d6b13ac88d4f0e52cf430d0bc3c1eb5369e4dc (diff)
downloadspark-ac536345f86e467ac83cb9c0dccbb34150335e26.tar.gz
spark-ac536345f86e467ac83cb9c0dccbb34150335e26.tar.bz2
spark-ac536345f86e467ac83cb9c0dccbb34150335e26.zip
ClassManifest -> ClassTag
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala2
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala16
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Graph.scala28
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLab.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphOps.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Pregel.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala28
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala3
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala4
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala40
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala24
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala22
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala20
-rw-r--r--graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala3
19 files changed, 129 insertions, 111 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index 2012dadb2f..14b9be73f1 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -335,7 +335,7 @@ object Analytics extends Logging {
// /**
// *
// */
- // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double],
+ // def alternatingLeastSquares[VD: ClassTag, ED: ClassTag](graph: Graph[VD, Double],
// latentK: Int, lambda: Double, numIter: Int) = {
// val vertices = graph.vertices.mapPartitions( _.map {
// case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } )
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index 3dda5c7c60..1c21967c9c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -1,12 +1,14 @@
package org.apache.spark.graph
+import scala.reflect.{classTag, ClassTag}
+
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graph.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-class EdgeRDD[@specialized ED: ClassManifest](
+class EdgeRDD[@specialized ED: ClassTag](
val partitionsRDD: RDD[(Pid, EdgePartition[ED])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
@@ -42,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassManifest](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()
- def mapEdgePartitions[ED2: ClassManifest](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2])
+ def mapEdgePartitions[ED2: ClassTag](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2])
: EdgeRDD[ED2] = {
// iter => iter.map { case (pid, ep) => (pid, f(ep)) }
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
@@ -51,7 +53,7 @@ class EdgeRDD[@specialized ED: ClassManifest](
}, preservesPartitioning = true))
}
- def zipEdgePartitions[T: ClassManifest, U: ClassManifest]
+ def zipEdgePartitions[T: ClassTag, U: ClassTag]
(other: RDD[T])
(f: (Pid, EdgePartition[ED], Iterator[T]) => Iterator[U]): RDD[U] = {
partitionsRDD.zipPartitions(other, preservesPartitioning = true) { (ePartIter, otherIter) =>
@@ -60,7 +62,7 @@ class EdgeRDD[@specialized ED: ClassManifest](
}
}
- def zipEdgePartitions[ED2: ClassManifest, ED3: ClassManifest]
+ def zipEdgePartitions[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (Pid, EdgePartition[ED], EdgePartition[ED2]) => EdgePartition[ED3]): EdgeRDD[ED3] = {
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, preservesPartitioning = true) {
@@ -71,11 +73,11 @@ class EdgeRDD[@specialized ED: ClassManifest](
})
}
- def innerJoin[ED2: ClassManifest, ED3: ClassManifest]
+ def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = {
- val ed2Manifest = classManifest[ED2]
- val ed3Manifest = classManifest[ED3]
+ val ed2Manifest = classTag[ED2]
+ val ed3Manifest = classTag[ED3]
zipEdgePartitions(other) { (pid, thisEPart, otherEPart) =>
thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
index 76768489ee..5a384a5f84 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala
@@ -14,8 +14,8 @@ import org.apache.spark.graph.impl.VertexPartition
* that is not a trait.
*/
class EdgeTriplet[VD, ED] extends Edge[ED] {
-// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest,
-// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] {
+// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassTag,
+// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag] extends Edge[ED] {
/**
diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index b725b2a155..9dd26f7679 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph
+import scala.reflect.ClassTag
+
import org.apache.spark.graph.impl._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -23,7 +25,7 @@ import org.apache.spark.storage.StorageLevel
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
*/
-abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
+abstract class Graph[VD: ClassTag, ED: ClassTag] {
/**
* Get the vertices and their data.
@@ -123,7 +125,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}}
*
*/
- def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED]
+ def mapVertices[VD2: ClassTag](map: (Vid, VD) => VD2): Graph[VD2, ED]
/**
* Construct a new graph where the value of each edge is
@@ -143,7 +145,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* attributes.
*
*/
- def mapEdges[ED2: ClassManifest](map: Edge[ED] => ED2): Graph[VD, ED2] = {
+ def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = {
mapEdges((pid, iter) => iter.map(map))
}
@@ -167,7 +169,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @tparam ED2 the new edge data type
*
*/
- def mapEdges[ED2: ClassManifest](
+ def mapEdges[ED2: ClassTag](
map: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]):
Graph[VD, ED2]
@@ -195,7 +197,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}}
*
*/
- def mapTriplets[ED2: ClassManifest](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+ def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map))
}
@@ -219,7 +221,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @tparam ED2 the new edge data type
*
*/
- def mapTriplets[ED2: ClassManifest](
+ def mapTriplets[ED2: ClassTag](
map: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]):
Graph[VD, ED2]
@@ -261,7 +263,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* @return a graph with vertices and edges that exists in both the current graph and other,
* with vertex and edge data from the current graph.
*/
- def mask[VD2: ClassManifest, ED2: ClassManifest](other: Graph[VD2, ED2]): Graph[VD, ED]
+ def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
/**
* This function merges multiple edges between two vertices into a single Edge. For correct
@@ -313,7 +315,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* predicate or implement PageRank.
*
*/
- def mapReduceTriplets[A: ClassManifest](
+ def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
@@ -348,7 +350,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
* }}}
*
*/
- def outerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)])
+ def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(Vid, U)])
(mapFunc: (Vid, VD, Option[U]) => VD2)
: Graph[VD2, ED]
@@ -376,7 +378,7 @@ object Graph {
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex.
*/
- def fromEdgeTuples[VD: ClassManifest](
+ def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(Vid, Vid)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = {
@@ -397,7 +399,7 @@ object Graph {
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
- def fromEdges[VD: ClassManifest, ED: ClassManifest](
+ def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
GraphImpl(edges, defaultValue)
@@ -418,7 +420,7 @@ object Graph {
* @param partitionStrategy the partition strategy to use when
* partitioning the edges.
*/
- def apply[VD: ClassManifest, ED: ClassManifest](
+ def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
@@ -432,5 +434,5 @@ object Graph {
* convenience operations are defined in the GraphOps class which may be shared across multiple
* graph implementations.
*/
- implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops
+ implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops
} // end of Graph object
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
index 5d2f0f4bda..c1ce5cd9cc 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph
+import scala.reflect.ClassTag
+
import org.apache.spark.Logging
import scala.collection.JavaConversions._
import org.apache.spark.rdd.RDD
@@ -36,7 +38,7 @@ object GraphLab extends Logging {
* @tparam A The type accumulated during the gather phase
* @return the resulting graph after the algorithm converges
*/
- def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED], numIter: Int,
gatherDirection: EdgeDirection = EdgeDirection.In,
scatterDirection: EdgeDirection = EdgeDirection.Out)
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index a69bfde532..7daac4fcc5 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -1,6 +1,8 @@
package org.apache.spark.graph
import java.util.{Arrays => JArrays}
+import scala.reflect.ClassTag
+
import org.apache.spark.graph.impl.EdgePartitionBuilder
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graph.impl.{EdgePartition, GraphImpl}
@@ -22,7 +24,7 @@ object GraphLoader extends Logging {
* the Edge RDD
*
*/
- def textFile[ED: ClassManifest](
+ def textFile[ED: ClassTag](
sc: SparkContext,
path: String,
edgeParser: Array[String] => ED,
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
index 091c778275..11c6120beb 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph
+import scala.reflect.ClassTag
+
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.SparkException
@@ -15,7 +17,7 @@ import org.apache.spark.SparkException
* @tparam ED the edge attribute type
*
*/
-class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
+class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/**
* Compute the number of edges in the graph.
@@ -109,7 +111,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* @todo Should this return a graph with the new vertex values?
*
*/
- def aggregateNeighbors[A: ClassManifest](
+ def aggregateNeighbors[A: ClassTag](
mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
reduceFunc: (A, A) => A,
dir: EdgeDirection)
@@ -226,7 +228,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* }}}
*
*/
- def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
+ def joinVertices[U: ClassTag](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD)
: Graph[VD, ED] = {
val uf = (id: Vid, data: VD, o: Option[U]) => {
o match {
@@ -262,7 +264,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
* }}}
*
*/
- def filter[VD2: ClassManifest, ED2: ClassManifest](
+ def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true,
vpred: (Vid, VD2) => Boolean = (v:Vid, d:VD2) => true): Graph[VD, ED] = {
diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
index 285e857b69..4664091b57 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph
+import scala.reflect.ClassTag
+
/**
* This object implements a Pregel-like bulk-synchronous
@@ -84,7 +86,7 @@ object Pregel {
* @return the resulting graph at the end of the computation
*
*/
- def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest]
+ def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)(
vprog: (Vid, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)],
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
index c274e342c7..8e5e319928 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.graph
+import scala.reflect.ClassTag
+
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
@@ -50,7 +52,7 @@ import org.apache.spark.graph.impl.VertexPartition
* }}}
*
*/
-class VertexRDD[@specialized VD: ClassManifest](
+class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[VertexPartition[VD]])
extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
@@ -111,7 +113,7 @@ class VertexRDD[@specialized VD: ClassManifest](
/**
* Return a new VertexRDD by applying a function to each VertexPartition of this RDD.
*/
- def mapVertexPartitions[VD2: ClassManifest](f: VertexPartition[VD] => VertexPartition[VD2])
+ def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD)
@@ -121,7 +123,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* Return a new VertexRDD by applying a function to corresponding
* VertexPartitions of this VertexRDD and another one.
*/
- def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest]
+ def zipVertexPartitions[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])
(f: (VertexPartition[VD], VertexPartition[VD2]) => VertexPartition[VD3]): VertexRDD[VD3] = {
val newPartitionsRDD = partitionsRDD.zipPartitions(
@@ -160,7 +162,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
*/
- def mapValues[VD2: ClassManifest](f: VD => VD2): VertexRDD[VD2] =
+ def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map((vid, attr) => f(attr)))
/**
@@ -174,7 +176,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* each of the entries in the original VertexRDD. The resulting
* VertexRDD retains the same index.
*/
- def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] =
+ def mapValues[VD2: ClassTag](f: (Vid, VD) => VD2): VertexRDD[VD2] =
this.mapVertexPartitions(_.map(f))
/**
@@ -205,7 +207,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* other VertexSet.
*
*/
- def leftZipJoin[VD2: ClassManifest, VD3: ClassManifest]
+ def leftZipJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.leftJoin(otherPart)(f)
@@ -231,7 +233,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* @return a VertexRDD containing all the vertices in this
* VertexRDD with the attribute emitted by f.
*/
- def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: RDD[(Vid, VD2)])
(f: (Vid, VD, Option[VD2]) => VD3)
: VertexRDD[VD3] =
@@ -257,7 +259,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other`
* must have the same index.
*/
- def innerZipJoin[U: ClassManifest, VD2: ClassManifest](other: VertexRDD[U])
+ def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
this.zipVertexPartitions(other) { (thisPart, otherPart) =>
thisPart.innerJoin(otherPart)(f)
@@ -268,7 +270,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* Replace vertices with corresponding vertices in `other`, and drop vertices without a
* corresponding vertex in `other`.
*/
- def innerJoin[U: ClassManifest, VD2: ClassManifest](other: RDD[(Vid, U)])
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(Vid, U)])
(f: (Vid, VD, U) => VD2): VertexRDD[VD2] = {
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
@@ -291,7 +293,7 @@ class VertexRDD[@specialized VD: ClassManifest](
* Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is
* co-indexed with this one.
*/
- def aggregateUsingIndex[VD2: ClassManifest](
+ def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] =
{
val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get)
@@ -318,7 +320,7 @@ object VertexRDD {
*
* @param rdd the collection of vertex-attribute pairs
*/
- def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = {
+ def apply[VD: ClassTag](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = {
val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
case Some(p) => rdd
case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size))
@@ -338,7 +340,7 @@ object VertexRDD {
* @param rdd the collection of vertex-attribute pairs
* @param mergeFunc the associative, commutative merge function.
*/
- def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
+ def apply[VD: ClassTag](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] =
{
val partitioned: RDD[(Vid, VD)] = rdd.partitioner match {
case Some(p) => rdd
@@ -350,7 +352,7 @@ object VertexRDD {
new VertexRDD(vertexPartitions)
}
- def apply[VD: ClassManifest](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD)
+ def apply[VD: ClassTag](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD)
: VertexRDD[VD] =
{
VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) =>
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
index b1cd3c47d0..a6384320ba 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph.algorithms
+import scala.reflect.ClassTag
+
import org.apache.spark.graph._
@@ -21,7 +23,7 @@ object TriangleCount {
*
* @return
*/
- def run[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD,ED]): Graph[Int, ED] = {
+ def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
// Remove redundant edges
val g = graph.groupEdges((a, b) => a).cache
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
index 4fcf08efce..7367269f67 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph.impl
+import scala.reflect.ClassTag
+
import org.apache.spark.graph._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
@@ -13,7 +15,7 @@ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
* @param index a clustered index on source vertex id
* @tparam ED the edge attribute type.
*/
-class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest](
+class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
val srcIds: Array[Vid],
val dstIds: Array[Vid],
val data: Array[ED],
@@ -41,7 +43,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* @return a new edge partition with the result of the function `f`
* applied to each edge
*/
- def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = {
+ def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = {
val newData = new Array[ED2](data.size)
val edge = new Edge[ED]()
val size = data.size
@@ -69,7 +71,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* @return a new edge partition with the result of the function `f`
* applied to each edge
*/
- def map[ED2: ClassManifest](iter: Iterator[ED2]): EdgePartition[ED2] = {
+ def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = {
val newData = new Array[ED2](data.size)
var i = 0
while (iter.hasNext) {
@@ -132,7 +134,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double)
* If there are multiple edges with the same src and dst in `other`, `f` will only be invoked
* once.
*/
- def innerJoin[ED2: ClassManifest, ED3: ClassManifest]
+ def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgePartition[ED2])
(f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = {
val builder = new EdgePartitionBuilder[ED3]
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
index 3876273369..ae3f3a6d03 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala
@@ -1,5 +1,6 @@
package org.apache.spark.graph.impl
+import scala.reflect.ClassTag
import scala.util.Sorting
import org.apache.spark.graph._
@@ -7,7 +8,7 @@ import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVecto
//private[graph]
-class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest](size: Int = 64) {
+class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) {
var edges = new PrimitiveVector[Edge[ED]](size)
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
index c9e1e08153..4d5eb240a9 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph.impl
+import scala.reflect.ClassTag
+
import org.apache.spark.graph._
import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
@@ -10,7 +12,7 @@ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap
* debug / profile.
*/
private[impl]
-class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
+class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
val vidToIndex: VertexIdToIndexMap,
val vertexArray: Array[VD],
val edgePartition: EdgePartition[ED])
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 79c11c780a..1dfd9cf316 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph.impl
+import scala.reflect.{classTag, ClassTag}
+
import org.apache.spark.util.collection.PrimitiveVector
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.SparkContext._
@@ -23,7 +25,7 @@ import org.apache.spark.util.ClosureCleaner
* edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created
* using the routing table.
*/
-class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
+class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val vertices: VertexRDD[VD],
@transient val edges: EdgeRDD[ED],
@transient val routingTable: RoutingTable,
@@ -45,8 +47,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
/** Return a RDD that brings edges together with their source and destination vertices. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
- val vdManifest = classManifest[VD]
- val edManifest = classManifest[ED]
+ val vdManifest = classTag[VD]
+ val edManifest = classTag[ED]
edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (pid, ePart, vPartIter) =>
val (_, vPart) = vPartIter.next()
new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest)
@@ -63,7 +65,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
val numPartitions = edges.partitions.size
- val edManifest = classManifest[ED]
+ val edManifest = classTag[ED]
val newEdges = new EdgeRDD(edges.map { e =>
val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
@@ -153,8 +155,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
}
- override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
- if (classManifest[VD] equals classManifest[VD2]) {
+ override def mapVertices[VD2: ClassTag](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
+ if (classTag[VD] equals classTag[VD2]) {
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f))
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
@@ -168,17 +170,17 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
}
}
- override def mapEdges[ED2: ClassManifest](
+ override def mapEdges[ED2: ClassTag](
f: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
new GraphImpl(vertices, newETable , routingTable, replicatedVertexView)
}
- override def mapTriplets[ED2: ClassManifest](
+ override def mapTriplets[ED2: ClassTag](
f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
// Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit
// manifest from GraphImpl (which would require serializing GraphImpl).
- val vdManifest = classManifest[VD]
+ val vdManifest = classTag[VD]
val newEdgePartitions =
edges.zipEdgePartitions(replicatedVertexView.get(true, true)) {
(ePid, edgePartition, vTableReplicatedIter) =>
@@ -208,7 +210,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
// Filter the edges
- val edManifest = classManifest[ED]
+ val edManifest = classTag[ED]
val newEdges = new EdgeRDD[ED](triplets.filter { et =>
vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)
}.mapPartitionsWithIndex( { (pid, iter) =>
@@ -224,7 +226,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView)
} // end of subgraph
- override def mask[VD2: ClassManifest, ED2: ClassManifest] (
+ override def mask[VD2: ClassTag, ED2: ClassTag] (
other: Graph[VD2, ED2]): Graph[VD, ED] = {
val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v }
@@ -244,7 +246,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
// Lower level transformation methods
//////////////////////////////////////////////////////////////////////////////////////////////////
- override def mapReduceTriplets[A: ClassManifest](
+ override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
@@ -311,9 +313,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
vertices.aggregateUsingIndex(preAgg, reduceFunc)
} // end of mapReduceTriplets
- override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
+ override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = {
- if (classManifest[VD] equals classManifest[VD2]) {
+ if (classTag[VD] equals classTag[VD2]) {
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(updates)(updateF)
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
@@ -340,20 +342,20 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
object GraphImpl {
- def apply[VD: ClassManifest, ED: ClassManifest](
+ def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
{
fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
}
- def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
+ def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
edgePartitions: RDD[(Pid, EdgePartition[ED])],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
}
- def apply[VD: ClassManifest, ED: ClassManifest](
+ def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(Vid, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD): GraphImpl[VD, ED] =
@@ -381,7 +383,7 @@ object GraphImpl {
* pair: the key is the partition id, and the value is an EdgePartition object containing all the
* edges in a partition.
*/
- private def createEdgeRDD[ED: ClassManifest](
+ private def createEdgeRDD[ED: ClassTag](
edges: RDD[Edge[ED]]): EdgeRDD[ED] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED]
@@ -393,7 +395,7 @@ object GraphImpl {
new EdgeRDD(edgePartitions)
}
- private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
+ private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
edges: EdgeRDD[ED],
defaultVertexAttr: VD): GraphImpl[VD, ED] = {
edges.cache()
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
index c2e452cc72..66fe796d2e 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph.impl
+import scala.reflect.{classTag, ClassTag}
+
import org.apache.spark.Partitioner
import org.apache.spark.graph.{Pid, Vid}
import org.apache.spark.rdd.{ShuffledRDD, RDD}
@@ -37,16 +39,16 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef
}
-class VertexBroadcastMsgRDDFunctions[T: ClassManifest](self: RDD[VertexBroadcastMsg[T]]) {
+class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) {
def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = {
val rdd = new ShuffledRDD[Pid, (Vid, T), VertexBroadcastMsg[T]](self, partitioner)
// Set a custom serializer if the data is of int or double type.
- if (classManifest[T] == ClassManifest.Int) {
+ if (classTag[T] == ClassTag.Int) {
rdd.setSerializer(classOf[IntVertexBroadcastMsgSerializer].getName)
- } else if (classManifest[T] == ClassManifest.Long) {
+ } else if (classTag[T] == ClassTag.Long) {
rdd.setSerializer(classOf[LongVertexBroadcastMsgSerializer].getName)
- } else if (classManifest[T] == ClassManifest.Double) {
+ } else if (classTag[T] == ClassTag.Double) {
rdd.setSerializer(classOf[DoubleVertexBroadcastMsgSerializer].getName)
}
rdd
@@ -54,7 +56,7 @@ class VertexBroadcastMsgRDDFunctions[T: ClassManifest](self: RDD[VertexBroadcast
}
-class MsgRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
+class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) {
/**
* Return a copy of the RDD partitioned using the specified partitioner.
@@ -67,23 +69,23 @@ class MsgRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) {
object MsgRDDFunctions {
- implicit def rdd2PartitionRDDFunctions[T: ClassManifest](rdd: RDD[MessageToPartition[T]]) = {
+ implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = {
new MsgRDDFunctions(rdd)
}
- implicit def rdd2vertexMessageRDDFunctions[T: ClassManifest](rdd: RDD[VertexBroadcastMsg[T]]) = {
+ implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = {
new VertexBroadcastMsgRDDFunctions(rdd)
}
- def partitionForAggregation[T: ClassManifest](msgs: RDD[(Vid, T)], partitioner: Partitioner) = {
+ def partitionForAggregation[T: ClassTag](msgs: RDD[(Vid, T)], partitioner: Partitioner) = {
val rdd = new ShuffledRDD[Vid, T, (Vid, T)](msgs, partitioner)
// Set a custom serializer if the data is of int or double type.
- if (classManifest[T] == ClassManifest.Int) {
+ if (classTag[T] == ClassTag.Int) {
rdd.setSerializer(classOf[IntAggMsgSerializer].getName)
- } else if (classManifest[T] == ClassManifest.Long) {
+ } else if (classTag[T] == ClassTag.Long) {
rdd.setSerializer(classOf[LongAggMsgSerializer].getName)
- } else if (classManifest[T] == ClassManifest.Double) {
+ } else if (classTag[T] == ClassTag.Double) {
rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName)
}
rdd
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
index 175586b87e..2124144df7 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph.impl
+import scala.reflect.{classTag, ClassTag}
+
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet}
@@ -17,7 +19,7 @@ import org.apache.spark.graph._
* example.
*/
private[impl]
-class ReplicatedVertexView[VD: ClassManifest](
+class ReplicatedVertexView[VD: ClassTag](
updatedVerts: VertexRDD[VD],
edges: EdgeRDD[_],
routingTable: RoutingTable,
@@ -80,7 +82,7 @@ class ReplicatedVertexView[VD: ClassManifest](
private def create(includeSrc: Boolean, includeDst: Boolean)
: RDD[(Pid, VertexPartition[VD])] = {
- val vdManifest = classManifest[VD]
+ val vdManifest = classTag[VD]
// Ship vertex attributes to edge partitions according to vertexPlacement
val verts = updatedVerts.partitionsRDD
@@ -125,7 +127,7 @@ class ReplicatedVertexView[VD: ClassManifest](
}
object ReplicatedVertexView {
- protected def buildBuffer[VD: ClassManifest](
+ protected def buildBuffer[VD: ClassTag](
pid2vidIter: Iterator[Array[Array[Vid]]],
vertexPartIter: Iterator[VertexPartition[VD]]) = {
val pid2vid: Array[Array[Vid]] = pid2vidIter.next()
@@ -173,6 +175,6 @@ object ReplicatedVertexView {
}
}
-class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) {
+class VertexAttributeBlock[VD: ClassTag](val vids: Array[Vid], val attrs: Array[VD]) {
def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
index 9b2d66999c..7048a40f42 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala
@@ -1,5 +1,7 @@
package org.apache.spark.graph.impl
+import scala.reflect.ClassTag
+
import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap}
import org.apache.spark.Logging
@@ -8,7 +10,7 @@ import org.apache.spark.graph._
private[graph] object VertexPartition {
- def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = {
+ def apply[VD: ClassTag](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = {
val map = new PrimitiveKeyOpenHashMap[Vid, VD]
iter.foreach { case (k, v) =>
map(k) = v
@@ -16,7 +18,7 @@ private[graph] object VertexPartition {
new VertexPartition(map.keySet, map._values, map.keySet.getBitSet)
}
- def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD)
+ def apply[VD: ClassTag](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD)
: VertexPartition[VD] =
{
val map = new PrimitiveKeyOpenHashMap[Vid, VD]
@@ -29,7 +31,7 @@ private[graph] object VertexPartition {
private[graph]
-class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
+class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet,
@@ -70,7 +72,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
- def map[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexPartition[VD2] = {
+ def map[VD2: ClassTag](f: (Vid, VD) => VD2): VertexPartition[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](capacity)
var i = mask.nextSetBit(0)
@@ -126,7 +128,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
}
/** Left outer join another VertexPartition. */
- def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: VertexPartition[VD2])
(f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
if (index != other.index) {
@@ -146,14 +148,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
}
/** Left outer join another iterator of messages. */
- def leftJoin[VD2: ClassManifest, VD3: ClassManifest]
+ def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: Iterator[(Vid, VD2)])
(f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = {
leftJoin(createUsingIndex(other))(f)
}
/** Inner join another VertexPartition. */
- def innerJoin[U: ClassManifest, VD2: ClassManifest](other: VertexPartition[U])
+ def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U])
(f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
if (index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
@@ -173,7 +175,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
/**
* Inner join an iterator of messages.
*/
- def innerJoin[U: ClassManifest, VD2: ClassManifest]
+ def innerJoin[U: ClassTag, VD2: ClassTag]
(iter: Iterator[Product2[Vid, U]])
(f: (Vid, VD, U) => VD2): VertexPartition[VD2] = {
innerJoin(createUsingIndex(iter))(f)
@@ -182,7 +184,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
- def createUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]])
+ def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[Vid, VD2]])
: VertexPartition[VD2] = {
val newMask = new BitSet(capacity)
val newValues = new Array[VD2](capacity)
@@ -214,7 +216,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest](
new VertexPartition(index, newValues, newMask)
}
- def aggregateUsingIndex[VD2: ClassManifest](
+ def aggregateUsingIndex[VD2: ClassTag](
iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] =
{
val newMask = new BitSet(capacity)
diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
index a1e285816b..d61f358bb0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala
@@ -1,10 +1,9 @@
package org.apache.spark.graph.util
-import util._
-import math._
import scala.annotation.tailrec
-//import scala.collection.mutable
-
+import scala.math._
+import scala.reflect.ClassTag
+import scala.util._
import org.apache.spark._
import org.apache.spark.serializer._
@@ -155,7 +154,7 @@ object GraphGenerators {
}
- def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = {
+ def outDegreeFromEdges[ED: ClassTag](edges: RDD[Edge[ED]]): Graph[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.srcId, 1)) }
.reduceByKey(_ + _)
@@ -281,14 +280,3 @@ object GraphGenerators {
} // end of Graph Generators
-
-
-
-
-
-
-
-
-
-
-
diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
index a52a5653e2..f951fd7a82 100644
--- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala
@@ -1,5 +1,6 @@
package org.apache.spark.graph.impl
+import scala.reflect.ClassTag
import scala.util.Random
import org.scalatest.FunSuite
@@ -59,7 +60,7 @@ class EdgePartitionSuite extends FunSuite {
}
test("innerJoin") {
- def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
+ def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = {
val builder = new EdgePartitionBuilder[A]
for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) }
builder.toEdgePartition