aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-04-05 14:21:24 +0800
committerReynold Xin <rxin@cs.berkeley.edu>2013-04-05 14:21:24 +0800
commit9764e579b879ff6a483019418a5377fb07fc7cc5 (patch)
tree4bf75859e9f7871d926e2a64eb48cb5609b2f1ef
parent71348563515a3f97119c2723f20ae034eed57ea4 (diff)
downloadspark-9764e579b879ff6a483019418a5377fb07fc7cc5.tar.gz
spark-9764e579b879ff6a483019418a5377fb07fc7cc5.tar.bz2
spark-9764e579b879ff6a483019418a5377fb07fc7cc5.zip
Minor cleanup.
-rw-r--r--graph/src/main/scala/spark/graph/Analytics.scala27
-rw-r--r--graph/src/main/scala/spark/graph/Graph.scala31
-rw-r--r--graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala18
-rw-r--r--graph/src/main/scala/spark/graph/package.scala7
4 files changed, 23 insertions, 60 deletions
diff --git a/graph/src/main/scala/spark/graph/Analytics.scala b/graph/src/main/scala/spark/graph/Analytics.scala
index 5b0e5221ba..5c16ba8175 100644
--- a/graph/src/main/scala/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/spark/graph/Analytics.scala
@@ -2,25 +2,9 @@ package spark.graph
import spark._
import spark.SparkContext._
-// import com.esotericsoftware.kryo._
// import breeze.linalg._
-
-
-// class AnalyticsKryoRegistrator extends KryoRegistrator {
-// def registerClasses(kryo: Kryo) {
-// println("registering kryo")
-// kryo.register(classOf[(Int,Float,Float)])
-// Graph.kryoRegister[(Int,Float,Float), Float](kryo)
-// Graph.kryoRegister[(Int,Float), Float](kryo)
-// Graph.kryoRegister[Int, Float](kryo)
-// Graph.kryoRegister[Float, Float](kryo)
-// kryo.setReferences(false);
-// }
-// }
-
-
object Analytics {
@@ -111,14 +95,6 @@ object Analytics {
gatherDirection = EdgeDirection.In)
}
-
-
-
-
-
-
-
-
// /**
// * Compute the connected component membership of each vertex
// * and return an RDD with the vertex value containing the
@@ -250,9 +226,6 @@ object Analytics {
// //System.setProperty("spark.shuffle.compress", "false")
// System.setProperty("spark.kryo.registrator", "spark.graphlab.AnalyticsKryoRegistrator")
-
-
-
taskType match {
case "pagerank" => {
diff --git a/graph/src/main/scala/spark/graph/Graph.scala b/graph/src/main/scala/spark/graph/Graph.scala
index 1b1929cc26..7b1111ae77 100644
--- a/graph/src/main/scala/spark/graph/Graph.scala
+++ b/graph/src/main/scala/spark/graph/Graph.scala
@@ -43,12 +43,6 @@ class EdgeWithVertices[@specialized(Char, Int, Boolean, Byte, Long, Float, Doubl
}
-private[graph]
-case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U,
- @specialized(Char, Int, Boolean, Byte, Long, Float, Double) V](
- var _1: U, var _2: V)
-
-
/**
* A Graph RDD that supports computation on graphs.
*/
@@ -61,26 +55,23 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
_rawETable: RDD[(Pid, EdgePartition[ED])]) {
def this(vertices: RDD[Vertex[VD]], edges: RDD[Edge[ED]]) = {
- this(
- Graph.DEFAULT_NUM_VERTEX_PARTITIONS, Graph.DEFAULT_NUM_EDGE_PARTITIONS,
- vertices, edges,
- null, null)
+ this(vertices.partitions.size, edges.partitions.size, vertices, edges, null, null)
}
def withPartitioner(numVertexPartitions: Int, numEdgePartitions: Int): Graph[VD, ED] = {
if (_cached) {
- val newgraph = new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable)
- newgraph.cache()
+ (new Graph(numVertexPartitions, numEdgePartitions, null, null, _rawVTable, _rawETable))
+ .cache()
} else {
new Graph(numVertexPartitions, numEdgePartitions, _rawVertices, _rawEdges, null, null)
}
}
- def withVertexPartitioner(numVertexPartitions: Int = Graph.DEFAULT_NUM_VERTEX_PARTITIONS) = {
+ def withVertexPartitioner(numVertexPartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
- def withEdgePartitioner(numEdgePartitions: Int = Graph.DEFAULT_NUM_EDGE_PARTITIONS) = {
+ def withEdgePartitioner(numEdgePartitions: Int) = {
withPartitioner(numVertexPartitions, numEdgePartitions)
}
@@ -139,11 +130,11 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
edgeDirection)
}
- def mapVertices[VD2: ClassManifest](f: (Vertex[VD]) => Vertex[VD2]): Graph[VD2, ED] = {
+ def mapVertices[VD2: ClassManifest](f: Vertex[VD] => Vertex[VD2]): Graph[VD2, ED] = {
newGraph(vertices.map(f), edges)
}
- def mapEdges[ED2: ClassManifest](f: (Edge[ED]) => Edge[ED2]): Graph[VD, ED2] = {
+ def mapEdges[ED2: ClassManifest](f: Edge[ED] => Edge[ED2]): Graph[VD, ED2] = {
newGraph(vertices, edges.map(f))
}
@@ -237,7 +228,7 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
}
}
}
- vmap.int2ObjectEntrySet().fastIterator().filter{!_.getValue()._2.isEmpty}.map{ entry =>
+ vmap.int2ObjectEntrySet().fastIterator().filter(!_.getValue()._2.isEmpty).map{ entry =>
(entry.getIntKey(), entry.getValue()._2)
}
}
@@ -316,14 +307,10 @@ class Graph[VD: ClassManifest, ED: ClassManifest] protected (
object Graph {
- val DEFAULT_NUM_VERTEX_PARTITIONS = 5
- val DEFAULT_NUM_EDGE_PARTITIONS = 5
-
/**
* Load an edge list from file initializing the Graph RDD
*/
- def textFile[ED: ClassManifest](sc: SparkContext,
- fname: String, edgeParser: Array[String] => ED ) = {
+ def textFile[ED: ClassManifest](sc: SparkContext, fname: String, edgeParser: Array[String] => ED) = {
// Parse the edge data table
val edges = sc.textFile(fname).map { line =>
diff --git a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala
index e72e500fb8..e1cb77f114 100644
--- a/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/spark/graph/GraphKryoRegistrator.scala
@@ -8,21 +8,17 @@ import spark.KryoRegistrator
class GraphKryoRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
- kryo.register(classOf[(Int, Float, Float)])
- registerClass[(Int, Float, Float), Float](kryo)
- registerClass[(Int, Float), Float](kryo)
- registerClass[Int, Float](kryo)
- registerClass[Float, Float](kryo)
+ //kryo.register(classOf[(Int, Float, Float)])
+ registerClass[Int, Int, Int](kryo)
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
}
- private def registerClass[VD: Manifest, ED: Manifest](kryo: Kryo) {
- //kryo.register(classManifest[VD].erasure)
- // kryo.register(classManifest[ED].erasure)
- kryo.register(classOf[(Vid, Vid, ED)])
- kryo.register(classOf[(Vid, ED)])
- //kryo.register(classOf[EdgeBlockRecord[ED]])
+ private def registerClass[VD: Manifest, ED: Manifest, VD2: Manifest](kryo: Kryo) {
+ kryo.register(classOf[Vertex[VD]])
+ kryo.register(classOf[Edge[ED]])
+ kryo.register(classOf[MutableTuple2[VD, VD2]])
+ kryo.register(classOf[(Vid, VD2)])
}
}
diff --git a/graph/src/main/scala/spark/graph/package.scala b/graph/src/main/scala/spark/graph/package.scala
index e7ec3f6e86..cf1b23ca5d 100644
--- a/graph/src/main/scala/spark/graph/package.scala
+++ b/graph/src/main/scala/spark/graph/package.scala
@@ -11,4 +11,11 @@ package object graph {
* Return the default null-like value for a data type T.
*/
def nullValue[T] = null.asInstanceOf[T]
+
+
+ private[graph]
+ case class MutableTuple2[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) U,
+ @specialized(Char, Int, Boolean, Byte, Long, Float, Double) V](
+ var _1: U, var _2: V)
+
}