From ac536345f86e467ac83cb9c0dccbb34150335e26 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 3 Jan 2014 12:50:56 -0700 Subject: ClassManifest -> ClassTag --- .../scala/org/apache/spark/graph/Analytics.scala | 2 +- .../scala/org/apache/spark/graph/EdgeRDD.scala | 16 +++++---- .../scala/org/apache/spark/graph/EdgeTriplet.scala | 4 +-- .../main/scala/org/apache/spark/graph/Graph.scala | 28 ++++++++------- .../scala/org/apache/spark/graph/GraphLab.scala | 4 ++- .../scala/org/apache/spark/graph/GraphLoader.scala | 4 ++- .../scala/org/apache/spark/graph/GraphOps.scala | 10 +++--- .../main/scala/org/apache/spark/graph/Pregel.scala | 4 ++- .../scala/org/apache/spark/graph/VertexRDD.scala | 28 ++++++++------- .../spark/graph/algorithms/TriangleCount.scala | 4 ++- .../apache/spark/graph/impl/EdgePartition.scala | 10 +++--- .../spark/graph/impl/EdgePartitionBuilder.scala | 3 +- .../spark/graph/impl/EdgeTripletIterator.scala | 4 ++- .../org/apache/spark/graph/impl/GraphImpl.scala | 40 ++++++++++++---------- .../spark/graph/impl/MessageToPartition.scala | 24 +++++++------ .../spark/graph/impl/ReplicatedVertexView.scala | 10 +++--- .../apache/spark/graph/impl/VertexPartition.scala | 22 ++++++------ .../apache/spark/graph/util/GraphGenerators.scala | 20 +++-------- .../spark/graph/impl/EdgePartitionSuite.scala | 3 +- 19 files changed, 129 insertions(+), 111 deletions(-) (limited to 'graph/src') diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 2012dadb2f..14b9be73f1 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -335,7 +335,7 @@ object Analytics extends Logging { // /** // * // */ - // def alternatingLeastSquares[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, Double], + // def alternatingLeastSquares[VD: ClassTag, ED: ClassTag](graph: Graph[VD, Double], // latentK: Int, lambda: Double, numIter: Int) = { // val vertices = graph.vertices.mapPartitions( _.map { // case (vid, _) => (vid, Array.fill(latentK){ scala.util.Random.nextDouble() } ) diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala index 3dda5c7c60..1c21967c9c 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala @@ -1,12 +1,14 @@ package org.apache.spark.graph +import scala.reflect.{classTag, ClassTag} + import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.graph.impl.EdgePartition import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -class EdgeRDD[@specialized ED: ClassManifest]( +class EdgeRDD[@specialized ED: ClassTag]( val partitionsRDD: RDD[(Pid, EdgePartition[ED])]) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { @@ -42,7 +44,7 @@ class EdgeRDD[@specialized ED: ClassManifest]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ override def cache(): EdgeRDD[ED] = persist() - def mapEdgePartitions[ED2: ClassManifest](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2]) + def mapEdgePartitions[ED2: ClassTag](f: (Pid, EdgePartition[ED]) => EdgePartition[ED2]) : EdgeRDD[ED2] = { // iter => iter.map { case (pid, ep) => (pid, f(ep)) } new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => @@ -51,7 +53,7 @@ class EdgeRDD[@specialized ED: ClassManifest]( }, preservesPartitioning = true)) } - def zipEdgePartitions[T: ClassManifest, U: ClassManifest] + def zipEdgePartitions[T: ClassTag, U: ClassTag] (other: RDD[T]) (f: (Pid, EdgePartition[ED], Iterator[T]) => Iterator[U]): RDD[U] = { partitionsRDD.zipPartitions(other, preservesPartitioning = true) { (ePartIter, otherIter) => @@ -60,7 +62,7 @@ class EdgeRDD[@specialized ED: ClassManifest]( } } - def zipEdgePartitions[ED2: ClassManifest, ED3: ClassManifest] + def zipEdgePartitions[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2]) (f: (Pid, EdgePartition[ED], EdgePartition[ED2]) => EdgePartition[ED3]): EdgeRDD[ED3] = { new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, preservesPartitioning = true) { @@ -71,11 +73,11 @@ class EdgeRDD[@specialized ED: ClassManifest]( }) } - def innerJoin[ED2: ClassManifest, ED3: ClassManifest] + def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgeRDD[ED2]) (f: (Vid, Vid, ED, ED2) => ED3): EdgeRDD[ED3] = { - val ed2Manifest = classManifest[ED2] - val ed3Manifest = classManifest[ED3] + val ed2Manifest = classTag[ED2] + val ed3Manifest = classTag[ED3] zipEdgePartitions(other) { (pid, thisEPart, otherEPart) => thisEPart.innerJoin(otherEPart)(f)(ed2Manifest, ed3Manifest) } diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala index 76768489ee..5a384a5f84 100644 --- a/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala +++ b/graph/src/main/scala/org/apache/spark/graph/EdgeTriplet.scala @@ -14,8 +14,8 @@ import org.apache.spark.graph.impl.VertexPartition * that is not a trait. */ class EdgeTriplet[VD, ED] extends Edge[ED] { -// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassManifest, -// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest] extends Edge[ED] { +// class EdgeTriplet[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) VD: ClassTag, +// @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag] extends Edge[ED] { /** diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index b725b2a155..9dd26f7679 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph +import scala.reflect.ClassTag + import org.apache.spark.graph.impl._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -23,7 +25,7 @@ import org.apache.spark.storage.StorageLevel * @tparam VD the vertex attribute type * @tparam ED the edge attribute type */ -abstract class Graph[VD: ClassManifest, ED: ClassManifest] { +abstract class Graph[VD: ClassTag, ED: ClassTag] { /** * Get the vertices and their data. @@ -123,7 +125,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * }}} * */ - def mapVertices[VD2: ClassManifest](map: (Vid, VD) => VD2): Graph[VD2, ED] + def mapVertices[VD2: ClassTag](map: (Vid, VD) => VD2): Graph[VD2, ED] /** * Construct a new graph where the value of each edge is @@ -143,7 +145,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * attributes. * */ - def mapEdges[ED2: ClassManifest](map: Edge[ED] => ED2): Graph[VD, ED2] = { + def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = { mapEdges((pid, iter) => iter.map(map)) } @@ -167,7 +169,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @tparam ED2 the new edge data type * */ - def mapEdges[ED2: ClassManifest]( + def mapEdges[ED2: ClassTag]( map: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] @@ -195,7 +197,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * }}} * */ - def mapTriplets[ED2: ClassManifest](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { + def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { mapTriplets((pid, iter) => iter.map(map)) } @@ -219,7 +221,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @tparam ED2 the new edge data type * */ - def mapTriplets[ED2: ClassManifest]( + def mapTriplets[ED2: ClassTag]( map: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] @@ -261,7 +263,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * @return a graph with vertices and edges that exists in both the current graph and other, * with vertex and edge data from the current graph. */ - def mask[VD2: ClassManifest, ED2: ClassManifest](other: Graph[VD2, ED2]): Graph[VD, ED] + def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED] /** * This function merges multiple edges between two vertices into a single Edge. For correct @@ -313,7 +315,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * predicate or implement PageRank. * */ - def mapReduceTriplets[A: ClassManifest]( + def mapReduceTriplets[A: ClassTag]( mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) @@ -348,7 +350,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] { * }}} * */ - def outerJoinVertices[U: ClassManifest, VD2: ClassManifest](table: RDD[(Vid, U)]) + def outerJoinVertices[U: ClassTag, VD2: ClassTag](table: RDD[(Vid, U)]) (mapFunc: (Vid, VD, Option[U]) => VD2) : Graph[VD2, ED] @@ -376,7 +378,7 @@ object Graph { * @return a graph with edge attributes containing either the count of duplicate edges or 1 * (if `uniqueEdges=None`) and vertex attributes containing the total degree of each vertex. */ - def fromEdgeTuples[VD: ClassManifest]( + def fromEdgeTuples[VD: ClassTag]( rawEdges: RDD[(Vid, Vid)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = { @@ -397,7 +399,7 @@ object Graph { * @return a graph with edge attributes described by `edges` and vertices * given by all vertices in `edges` with value `defaultValue` */ - def fromEdges[VD: ClassManifest, ED: ClassManifest]( + def fromEdges[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], defaultValue: VD): Graph[VD, ED] = { GraphImpl(edges, defaultValue) @@ -418,7 +420,7 @@ object Graph { * @param partitionStrategy the partition strategy to use when * partitioning the edges. */ - def apply[VD: ClassManifest, ED: ClassManifest]( + def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { @@ -432,5 +434,5 @@ object Graph { * convenience operations are defined in the GraphOps class which may be shared across multiple * graph implementations. */ - implicit def graphToGraphOps[VD: ClassManifest, ED: ClassManifest](g: Graph[VD, ED]) = g.ops + implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops } // end of Graph object diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala index 5d2f0f4bda..c1ce5cd9cc 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLab.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph +import scala.reflect.ClassTag + import org.apache.spark.Logging import scala.collection.JavaConversions._ import org.apache.spark.rdd.RDD @@ -36,7 +38,7 @@ object GraphLab extends Logging { * @tparam A The type accumulated during the gather phase * @return the resulting graph after the algorithm converges */ - def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], numIter: Int, gatherDirection: EdgeDirection = EdgeDirection.In, scatterDirection: EdgeDirection = EdgeDirection.Out) diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala index a69bfde532..7daac4fcc5 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala @@ -1,6 +1,8 @@ package org.apache.spark.graph import java.util.{Arrays => JArrays} +import scala.reflect.ClassTag + import org.apache.spark.graph.impl.EdgePartitionBuilder import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graph.impl.{EdgePartition, GraphImpl} @@ -22,7 +24,7 @@ object GraphLoader extends Logging { * the Edge RDD * */ - def textFile[ED: ClassManifest]( + def textFile[ED: ClassTag]( sc: SparkContext, path: String, edgeParser: Array[String] => ED, diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala index 091c778275..11c6120beb 100644 --- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala +++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph +import scala.reflect.ClassTag + import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.apache.spark.SparkException @@ -15,7 +17,7 @@ import org.apache.spark.SparkException * @tparam ED the edge attribute type * */ -class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { +class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { /** * Compute the number of edges in the graph. @@ -109,7 +111,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { * @todo Should this return a graph with the new vertex values? * */ - def aggregateNeighbors[A: ClassManifest]( + def aggregateNeighbors[A: ClassTag]( mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A], reduceFunc: (A, A) => A, dir: EdgeDirection) @@ -226,7 +228,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { * }}} * */ - def joinVertices[U: ClassManifest](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) + def joinVertices[U: ClassTag](table: RDD[(Vid, U)])(mapFunc: (Vid, VD, U) => VD) : Graph[VD, ED] = { val uf = (id: Vid, data: VD, o: Option[U]) => { o match { @@ -262,7 +264,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) { * }}} * */ - def filter[VD2: ClassManifest, ED2: ClassManifest]( + def filter[VD2: ClassTag, ED2: ClassTag]( preprocess: Graph[VD, ED] => Graph[VD2, ED2], epred: (EdgeTriplet[VD2, ED2]) => Boolean = (x: EdgeTriplet[VD2, ED2]) => true, vpred: (Vid, VD2) => Boolean = (v:Vid, d:VD2) => true): Graph[VD, ED] = { diff --git a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala index 285e857b69..4664091b57 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Pregel.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Pregel.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph +import scala.reflect.ClassTag + /** * This object implements a Pregel-like bulk-synchronous @@ -84,7 +86,7 @@ object Pregel { * @return the resulting graph at the end of the computation * */ - def apply[VD: ClassManifest, ED: ClassManifest, A: ClassManifest] + def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue)( vprog: (Vid, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(Vid,A)], diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala index c274e342c7..8e5e319928 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.graph +import scala.reflect.ClassTag + import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ @@ -50,7 +52,7 @@ import org.apache.spark.graph.impl.VertexPartition * }}} * */ -class VertexRDD[@specialized VD: ClassManifest]( +class VertexRDD[@specialized VD: ClassTag]( val partitionsRDD: RDD[VertexPartition[VD]]) extends RDD[(Vid, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { @@ -111,7 +113,7 @@ class VertexRDD[@specialized VD: ClassManifest]( /** * Return a new VertexRDD by applying a function to each VertexPartition of this RDD. */ - def mapVertexPartitions[VD2: ClassManifest](f: VertexPartition[VD] => VertexPartition[VD2]) + def mapVertexPartitions[VD2: ClassTag](f: VertexPartition[VD] => VertexPartition[VD2]) : VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) new VertexRDD(newPartitionsRDD) @@ -121,7 +123,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * Return a new VertexRDD by applying a function to corresponding * VertexPartitions of this VertexRDD and another one. */ - def zipVertexPartitions[VD2: ClassManifest, VD3: ClassManifest] + def zipVertexPartitions[VD2: ClassTag, VD3: ClassTag] (other: VertexRDD[VD2]) (f: (VertexPartition[VD], VertexPartition[VD2]) => VertexPartition[VD3]): VertexRDD[VD3] = { val newPartitionsRDD = partitionsRDD.zipPartitions( @@ -160,7 +162,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * each of the entries in the original VertexRDD. The resulting * VertexRDD retains the same index. */ - def mapValues[VD2: ClassManifest](f: VD => VD2): VertexRDD[VD2] = + def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map((vid, attr) => f(attr))) /** @@ -174,7 +176,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * each of the entries in the original VertexRDD. The resulting * VertexRDD retains the same index. */ - def mapValues[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexRDD[VD2] = + def mapValues[VD2: ClassTag](f: (Vid, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) /** @@ -205,7 +207,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * other VertexSet. * */ - def leftZipJoin[VD2: ClassManifest, VD3: ClassManifest] + def leftZipJoin[VD2: ClassTag, VD3: ClassTag] (other: VertexRDD[VD2])(f: (Vid, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { this.zipVertexPartitions(other) { (thisPart, otherPart) => thisPart.leftJoin(otherPart)(f) @@ -231,7 +233,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * @return a VertexRDD containing all the vertices in this * VertexRDD with the attribute emitted by f. */ - def leftJoin[VD2: ClassManifest, VD3: ClassManifest] + def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: RDD[(Vid, VD2)]) (f: (Vid, VD, Option[VD2]) => VD3) : VertexRDD[VD3] = @@ -257,7 +259,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * Same effect as leftJoin(other) { (vid, a, bOpt) => bOpt.getOrElse(a) }, but `this` and `other` * must have the same index. */ - def innerZipJoin[U: ClassManifest, VD2: ClassManifest](other: VertexRDD[U]) + def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U]) (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = { this.zipVertexPartitions(other) { (thisPart, otherPart) => thisPart.innerJoin(otherPart)(f) @@ -268,7 +270,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * Replace vertices with corresponding vertices in `other`, and drop vertices without a * corresponding vertex in `other`. */ - def innerJoin[U: ClassManifest, VD2: ClassManifest](other: RDD[(Vid, U)]) + def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(Vid, U)]) (f: (Vid, VD, U) => VD2): VertexRDD[VD2] = { // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin @@ -291,7 +293,7 @@ class VertexRDD[@specialized VD: ClassManifest]( * Aggregate messages with the same ids using `reduceFunc`, returning a VertexRDD that is * co-indexed with this one. */ - def aggregateUsingIndex[VD2: ClassManifest]( + def aggregateUsingIndex[VD2: ClassTag]( messages: RDD[(Vid, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) @@ -318,7 +320,7 @@ object VertexRDD { * * @param rdd the collection of vertex-attribute pairs */ - def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = { + def apply[VD: ClassTag](rdd: RDD[(Vid, VD)]): VertexRDD[VD] = { val partitioned: RDD[(Vid, VD)] = rdd.partitioner match { case Some(p) => rdd case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) @@ -338,7 +340,7 @@ object VertexRDD { * @param rdd the collection of vertex-attribute pairs * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassManifest](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = + def apply[VD: ClassTag](rdd: RDD[(Vid, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { val partitioned: RDD[(Vid, VD)] = rdd.partitioner match { case Some(p) => rdd @@ -350,7 +352,7 @@ object VertexRDD { new VertexRDD(vertexPartitions) } - def apply[VD: ClassManifest](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD) + def apply[VD: ClassTag](vids: RDD[Vid], rdd: RDD[(Vid, VD)], defaultVal: VD) : VertexRDD[VD] = { VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala index b1cd3c47d0..a6384320ba 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/TriangleCount.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph.algorithms +import scala.reflect.ClassTag + import org.apache.spark.graph._ @@ -21,7 +23,7 @@ object TriangleCount { * * @return */ - def run[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD,ED]): Graph[Int, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { // Remove redundant edges val g = graph.groupEdges((a, b) => a).cache diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala index 4fcf08efce..7367269f67 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartition.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph.impl +import scala.reflect.ClassTag + import org.apache.spark.graph._ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap @@ -13,7 +15,7 @@ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap * @param index a clustered index on source vertex id * @tparam ED the edge attribute type. */ -class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassManifest]( +class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( val srcIds: Array[Vid], val dstIds: Array[Vid], val data: Array[ED], @@ -41,7 +43,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @return a new edge partition with the result of the function `f` * applied to each edge */ - def map[ED2: ClassManifest](f: Edge[ED] => ED2): EdgePartition[ED2] = { + def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = { val newData = new Array[ED2](data.size) val edge = new Edge[ED]() val size = data.size @@ -69,7 +71,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @return a new edge partition with the result of the function `f` * applied to each edge */ - def map[ED2: ClassManifest](iter: Iterator[ED2]): EdgePartition[ED2] = { + def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = { val newData = new Array[ED2](data.size) var i = 0 while (iter.hasNext) { @@ -132,7 +134,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * If there are multiple edges with the same src and dst in `other`, `f` will only be invoked * once. */ - def innerJoin[ED2: ClassManifest, ED3: ClassManifest] + def innerJoin[ED2: ClassTag, ED3: ClassTag] (other: EdgePartition[ED2]) (f: (Vid, Vid, ED, ED2) => ED3): EdgePartition[ED3] = { val builder = new EdgePartitionBuilder[ED3] diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala index 3876273369..ae3f3a6d03 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgePartitionBuilder.scala @@ -1,5 +1,6 @@ package org.apache.spark.graph.impl +import scala.reflect.ClassTag import scala.util.Sorting import org.apache.spark.graph._ @@ -7,7 +8,7 @@ import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVecto //private[graph] -class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassManifest](size: Int = 64) { +class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) { var edges = new PrimitiveVector[Edge[ED]](size) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala index c9e1e08153..4d5eb240a9 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/EdgeTripletIterator.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph.impl +import scala.reflect.ClassTag + import org.apache.spark.graph._ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap @@ -10,7 +12,7 @@ import org.apache.spark.util.collection.PrimitiveKeyOpenHashMap * debug / profile. */ private[impl] -class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest]( +class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( val vidToIndex: VertexIdToIndexMap, val vertexArray: Array[VD], val edgePartition: EdgePartition[ED]) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 79c11c780a..1dfd9cf316 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph.impl +import scala.reflect.{classTag, ClassTag} + import org.apache.spark.util.collection.PrimitiveVector import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.SparkContext._ @@ -23,7 +25,7 @@ import org.apache.spark.util.ClosureCleaner * edge partitions. `replicatedVertexView` stores a view of the replicated vertex attributes created * using the routing table. */ -class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( +class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( @transient val vertices: VertexRDD[VD], @transient val edges: EdgeRDD[ED], @transient val routingTable: RoutingTable, @@ -45,8 +47,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( /** Return a RDD that brings edges together with their source and destination vertices. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { - val vdManifest = classManifest[VD] - val edManifest = classManifest[ED] + val vdManifest = classTag[VD] + val edManifest = classTag[ED] edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (pid, ePart, vPartIter) => val (_, vPart) = vPartIter.next() new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdManifest, edManifest) @@ -63,7 +65,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { val numPartitions = edges.partitions.size - val edManifest = classManifest[ED] + val edManifest = classTag[ED] val newEdges = new EdgeRDD(edges.map { e => val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) @@ -153,8 +155,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) } - override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = { - if (classManifest[VD] equals classManifest[VD2]) { + override def mapVertices[VD2: ClassTag](f: (Vid, VD) => VD2): Graph[VD2, ED] = { + if (classTag[VD] equals classTag[VD2]) { // The map preserves type, so we can use incremental replication val newVerts = vertices.mapVertexPartitions(_.map(f)) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) @@ -168,17 +170,17 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( } } - override def mapEdges[ED2: ClassManifest]( + override def mapEdges[ED2: ClassTag]( f: (Pid, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = { val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator))) new GraphImpl(vertices, newETable , routingTable, replicatedVertexView) } - override def mapTriplets[ED2: ClassManifest]( + override def mapTriplets[ED2: ClassTag]( f: (Pid, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = { // Use an explicit manifest in PrimitiveKeyOpenHashMap init so we don't pull in the implicit // manifest from GraphImpl (which would require serializing GraphImpl). - val vdManifest = classManifest[VD] + val vdManifest = classTag[VD] val newEdgePartitions = edges.zipEdgePartitions(replicatedVertexView.get(true, true)) { (ePid, edgePartition, vTableReplicatedIter) => @@ -208,7 +210,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) // Filter the edges - val edManifest = classManifest[ED] + val edManifest = classTag[ED] val newEdges = new EdgeRDD[ED](triplets.filter { et => vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et) }.mapPartitionsWithIndex( { (pid, iter) => @@ -224,7 +226,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView) } // end of subgraph - override def mask[VD2: ClassManifest, ED2: ClassManifest] ( + override def mask[VD2: ClassTag, ED2: ClassTag] ( other: Graph[VD2, ED2]): Graph[VD, ED] = { val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } @@ -244,7 +246,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( // Lower level transformation methods ////////////////////////////////////////////////////////////////////////////////////////////////// - override def mapReduceTriplets[A: ClassManifest]( + override def mapReduceTriplets[A: ClassTag]( mapFunc: EdgeTriplet[VD, ED] => Iterator[(Vid, A)], reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = { @@ -311,9 +313,9 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( vertices.aggregateUsingIndex(preAgg, reduceFunc) } // end of mapReduceTriplets - override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest] + override def outerJoinVertices[U: ClassTag, VD2: ClassTag] (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2): Graph[VD2, ED] = { - if (classManifest[VD] equals classManifest[VD2]) { + if (classTag[VD] equals classTag[VD2]) { // updateF preserves type, so we can use incremental replication val newVerts = vertices.leftJoin(updates)(updateF) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) @@ -340,20 +342,20 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( object GraphImpl { - def apply[VD: ClassManifest, ED: ClassManifest]( + def apply[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = { fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) } - def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest]( + def fromEdgePartitions[VD: ClassTag, ED: ClassTag]( edgePartitions: RDD[(Pid, EdgePartition[ED])], defaultVertexAttr: VD): GraphImpl[VD, ED] = { fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) } - def apply[VD: ClassManifest, ED: ClassManifest]( + def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD): GraphImpl[VD, ED] = @@ -381,7 +383,7 @@ object GraphImpl { * pair: the key is the partition id, and the value is an EdgePartition object containing all the * edges in a partition. */ - private def createEdgeRDD[ED: ClassManifest]( + private def createEdgeRDD[ED: ClassTag]( edges: RDD[Edge[ED]]): EdgeRDD[ED] = { val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[ED] @@ -393,7 +395,7 @@ object GraphImpl { new EdgeRDD(edgePartitions) } - private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest]( + private def fromEdgeRDD[VD: ClassTag, ED: ClassTag]( edges: EdgeRDD[ED], defaultVertexAttr: VD): GraphImpl[VD, ED] = { edges.cache() diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala index c2e452cc72..66fe796d2e 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/MessageToPartition.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph.impl +import scala.reflect.{classTag, ClassTag} + import org.apache.spark.Partitioner import org.apache.spark.graph.{Pid, Vid} import org.apache.spark.rdd.{ShuffledRDD, RDD} @@ -37,16 +39,16 @@ class MessageToPartition[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef } -class VertexBroadcastMsgRDDFunctions[T: ClassManifest](self: RDD[VertexBroadcastMsg[T]]) { +class VertexBroadcastMsgRDDFunctions[T: ClassTag](self: RDD[VertexBroadcastMsg[T]]) { def partitionBy(partitioner: Partitioner): RDD[VertexBroadcastMsg[T]] = { val rdd = new ShuffledRDD[Pid, (Vid, T), VertexBroadcastMsg[T]](self, partitioner) // Set a custom serializer if the data is of int or double type. - if (classManifest[T] == ClassManifest.Int) { + if (classTag[T] == ClassTag.Int) { rdd.setSerializer(classOf[IntVertexBroadcastMsgSerializer].getName) - } else if (classManifest[T] == ClassManifest.Long) { + } else if (classTag[T] == ClassTag.Long) { rdd.setSerializer(classOf[LongVertexBroadcastMsgSerializer].getName) - } else if (classManifest[T] == ClassManifest.Double) { + } else if (classTag[T] == ClassTag.Double) { rdd.setSerializer(classOf[DoubleVertexBroadcastMsgSerializer].getName) } rdd @@ -54,7 +56,7 @@ class VertexBroadcastMsgRDDFunctions[T: ClassManifest](self: RDD[VertexBroadcast } -class MsgRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) { +class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) { /** * Return a copy of the RDD partitioned using the specified partitioner. @@ -67,23 +69,23 @@ class MsgRDDFunctions[T: ClassManifest](self: RDD[MessageToPartition[T]]) { object MsgRDDFunctions { - implicit def rdd2PartitionRDDFunctions[T: ClassManifest](rdd: RDD[MessageToPartition[T]]) = { + implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = { new MsgRDDFunctions(rdd) } - implicit def rdd2vertexMessageRDDFunctions[T: ClassManifest](rdd: RDD[VertexBroadcastMsg[T]]) = { + implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = { new VertexBroadcastMsgRDDFunctions(rdd) } - def partitionForAggregation[T: ClassManifest](msgs: RDD[(Vid, T)], partitioner: Partitioner) = { + def partitionForAggregation[T: ClassTag](msgs: RDD[(Vid, T)], partitioner: Partitioner) = { val rdd = new ShuffledRDD[Vid, T, (Vid, T)](msgs, partitioner) // Set a custom serializer if the data is of int or double type. - if (classManifest[T] == ClassManifest.Int) { + if (classTag[T] == ClassTag.Int) { rdd.setSerializer(classOf[IntAggMsgSerializer].getName) - } else if (classManifest[T] == ClassManifest.Long) { + } else if (classTag[T] == ClassTag.Long) { rdd.setSerializer(classOf[LongAggMsgSerializer].getName) - } else if (classManifest[T] == ClassManifest.Double) { + } else if (classTag[T] == ClassTag.Double) { rdd.setSerializer(classOf[DoubleAggMsgSerializer].getName) } rdd diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala index 175586b87e..2124144df7 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/ReplicatedVertexView.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph.impl +import scala.reflect.{classTag, ClassTag} + import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} @@ -17,7 +19,7 @@ import org.apache.spark.graph._ * example. */ private[impl] -class ReplicatedVertexView[VD: ClassManifest]( +class ReplicatedVertexView[VD: ClassTag]( updatedVerts: VertexRDD[VD], edges: EdgeRDD[_], routingTable: RoutingTable, @@ -80,7 +82,7 @@ class ReplicatedVertexView[VD: ClassManifest]( private def create(includeSrc: Boolean, includeDst: Boolean) : RDD[(Pid, VertexPartition[VD])] = { - val vdManifest = classManifest[VD] + val vdManifest = classTag[VD] // Ship vertex attributes to edge partitions according to vertexPlacement val verts = updatedVerts.partitionsRDD @@ -125,7 +127,7 @@ class ReplicatedVertexView[VD: ClassManifest]( } object ReplicatedVertexView { - protected def buildBuffer[VD: ClassManifest]( + protected def buildBuffer[VD: ClassTag]( pid2vidIter: Iterator[Array[Array[Vid]]], vertexPartIter: Iterator[VertexPartition[VD]]) = { val pid2vid: Array[Array[Vid]] = pid2vidIter.next() @@ -173,6 +175,6 @@ object ReplicatedVertexView { } } -class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD]) { +class VertexAttributeBlock[VD: ClassTag](val vids: Array[Vid], val attrs: Array[VD]) { def iterator: Iterator[(Vid, VD)] = (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala index 9b2d66999c..7048a40f42 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/VertexPartition.scala @@ -1,5 +1,7 @@ package org.apache.spark.graph.impl +import scala.reflect.ClassTag + import org.apache.spark.util.collection.{BitSet, PrimitiveKeyOpenHashMap} import org.apache.spark.Logging @@ -8,7 +10,7 @@ import org.apache.spark.graph._ private[graph] object VertexPartition { - def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = { + def apply[VD: ClassTag](iter: Iterator[(Vid, VD)]): VertexPartition[VD] = { val map = new PrimitiveKeyOpenHashMap[Vid, VD] iter.foreach { case (k, v) => map(k) = v @@ -16,7 +18,7 @@ private[graph] object VertexPartition { new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) } - def apply[VD: ClassManifest](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD) + def apply[VD: ClassTag](iter: Iterator[(Vid, VD)], mergeFunc: (VD, VD) => VD) : VertexPartition[VD] = { val map = new PrimitiveKeyOpenHashMap[Vid, VD] @@ -29,7 +31,7 @@ private[graph] object VertexPartition { private[graph] -class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( +class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( val index: VertexIdToIndexMap, val values: Array[VD], val mask: BitSet, @@ -70,7 +72,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( * each of the entries in the original VertexRDD. The resulting * VertexPartition retains the same index. */ - def map[VD2: ClassManifest](f: (Vid, VD) => VD2): VertexPartition[VD2] = { + def map[VD2: ClassTag](f: (Vid, VD) => VD2): VertexPartition[VD2] = { // Construct a view of the map transformation val newValues = new Array[VD2](capacity) var i = mask.nextSetBit(0) @@ -126,7 +128,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( } /** Left outer join another VertexPartition. */ - def leftJoin[VD2: ClassManifest, VD3: ClassManifest] + def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: VertexPartition[VD2]) (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { if (index != other.index) { @@ -146,14 +148,14 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( } /** Left outer join another iterator of messages. */ - def leftJoin[VD2: ClassManifest, VD3: ClassManifest] + def leftJoin[VD2: ClassTag, VD3: ClassTag] (other: Iterator[(Vid, VD2)]) (f: (Vid, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { leftJoin(createUsingIndex(other))(f) } /** Inner join another VertexPartition. */ - def innerJoin[U: ClassManifest, VD2: ClassManifest](other: VertexPartition[U]) + def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U]) (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = { if (index != other.index) { logWarning("Joining two VertexPartitions with different indexes is slow.") @@ -173,7 +175,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( /** * Inner join an iterator of messages. */ - def innerJoin[U: ClassManifest, VD2: ClassManifest] + def innerJoin[U: ClassTag, VD2: ClassTag] (iter: Iterator[Product2[Vid, U]]) (f: (Vid, VD, U) => VD2): VertexPartition[VD2] = { innerJoin(createUsingIndex(iter))(f) @@ -182,7 +184,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( /** * Similar effect as aggregateUsingIndex((a, b) => a) */ - def createUsingIndex[VD2: ClassManifest](iter: Iterator[Product2[Vid, VD2]]) + def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[Vid, VD2]]) : VertexPartition[VD2] = { val newMask = new BitSet(capacity) val newValues = new Array[VD2](capacity) @@ -214,7 +216,7 @@ class VertexPartition[@specialized(Long, Int, Double) VD: ClassManifest]( new VertexPartition(index, newValues, newMask) } - def aggregateUsingIndex[VD2: ClassManifest]( + def aggregateUsingIndex[VD2: ClassTag]( iter: Iterator[Product2[Vid, VD2]], reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { val newMask = new BitSet(capacity) diff --git a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala index a1e285816b..d61f358bb0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala +++ b/graph/src/main/scala/org/apache/spark/graph/util/GraphGenerators.scala @@ -1,10 +1,9 @@ package org.apache.spark.graph.util -import util._ -import math._ import scala.annotation.tailrec -//import scala.collection.mutable - +import scala.math._ +import scala.reflect.ClassTag +import scala.util._ import org.apache.spark._ import org.apache.spark.serializer._ @@ -155,7 +154,7 @@ object GraphGenerators { } - def outDegreeFromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): Graph[Int, ED] = { + def outDegreeFromEdges[ED: ClassTag](edges: RDD[Edge[ED]]): Graph[Int, ED] = { val vertices = edges.flatMap { edge => List((edge.srcId, 1)) } .reduceByKey(_ + _) @@ -281,14 +280,3 @@ object GraphGenerators { } // end of Graph Generators - - - - - - - - - - - diff --git a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala index a52a5653e2..f951fd7a82 100644 --- a/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/impl/EdgePartitionSuite.scala @@ -1,5 +1,6 @@ package org.apache.spark.graph.impl +import scala.reflect.ClassTag import scala.util.Random import org.scalatest.FunSuite @@ -59,7 +60,7 @@ class EdgePartitionSuite extends FunSuite { } test("innerJoin") { - def makeEdgePartition[A: ClassManifest](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { + def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { val builder = new EdgePartitionBuilder[A] for ((src, dst, attr) <- xs) { builder.add(src: Vid, dst: Vid, attr) } builder.toEdgePartition -- cgit v1.2.3