aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-12-06 22:32:47 -0800
committerAnkur Dave <ankurdave@gmail.com>2013-12-14 15:01:51 -0800
commit1e98840128f3cffbe6566b384e742b3a52cdaa9f (patch)
tree712827ec49d6adada0f2c57a1a1dd70808df54a1 /graph/src
parent9bf192b01c02d71d21261438cd02427b910042eb (diff)
downloadspark-1e98840128f3cffbe6566b384e742b3a52cdaa9f.tar.gz
spark-1e98840128f3cffbe6566b384e742b3a52cdaa9f.tar.bz2
spark-1e98840128f3cffbe6566b384e742b3a52cdaa9f.zip
Load edges in columnar format
In GraphLoader.edgeListFile, load edges directly into EdgePartitions, avoiding repartitioning.
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala11
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala55
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala88
3 files changed, 101 insertions, 53 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
index 24844262bc..a34113b1eb 100644
--- a/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/EdgeRDD.scala
@@ -1,6 +1,7 @@
package org.apache.spark.graph
+import org.apache.spark.Partitioner
import org.apache.spark.{TaskContext, Partition, OneToOneDependency}
import org.apache.spark.graph.impl.EdgePartition
import org.apache.spark.rdd.RDD
@@ -13,10 +14,16 @@ class EdgeRDD[@specialized ED: ClassManifest](
partitionsRDD.setName("EdgeRDD")
- override val partitioner = partitionsRDD.partitioner
-
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
+ /**
+ * If partitionsRDD already has a partitioner, use it. Otherwise assume that the Pids in
+ * partitionsRDD correspond to the actual partitions and create a new partitioner that allows
+ * co-partitioning with partitionsRDD.
+ */
+ override val partitioner =
+ partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+
override def compute(split: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val edgePartition = partitionsRDD.compute(split, context).next()._2
edgePartition.iterator
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index 29d14452de..b00c7c4afe 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -1,9 +1,12 @@
package org.apache.spark.graph
-import org.apache.spark.SparkContext
+import java.util.{Arrays => JArrays}
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.graph.impl.{EdgePartition, GraphImpl}
+import org.apache.spark.util.collection.PrimitiveVector
-object GraphLoader {
+object GraphLoader extends Logging {
/**
* Load an edge list from file initializing the Graph
@@ -77,24 +80,42 @@ object GraphLoader {
minEdgePartitions: Int = 1,
partitionStrategy: PartitionStrategy = RandomVertexCut):
Graph[Int, Int] = {
+ val startTime = System.currentTimeMillis
+
// Parse the edge data table
- val edges = sc.textFile(path, minEdgePartitions).mapPartitions( iter =>
- iter.filter(line => !line.isEmpty && line(0) != '#').map { line =>
- val lineArray = line.split("\\s+")
- if(lineArray.length < 2) {
- println("Invalid line: " + line)
- assert(false)
+ val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (index, iter) =>
+ val srcIds = new PrimitiveVector[Long]
+ val dstIds = new PrimitiveVector[Long]
+ iter.foreach { line =>
+ if (!line.isEmpty && line(0) != '#') {
+ val lineArray = line.split("\\s+")
+ if (lineArray.length < 2) {
+ logWarning("Invalid line: " + line)
+ }
+ val srcId = lineArray(0).toLong
+ val dstId = lineArray(1).toLong
+ if (canonicalOrientation && dstId > srcId) {
+ srcIds += dstId
+ dstIds += srcId
+ } else {
+ srcIds += srcId
+ dstIds += dstId
+ }
}
- val source = lineArray(0).trim.toLong
- val target = lineArray(1).trim.toLong
- if (canonicalOrientation && target > source) {
- Edge(target, source, 1)
- } else {
- Edge(source, target, 1)
- }
- })
+ }
+ val srcIdArray = srcIds.trim().array
+ val dstIdArray = dstIds.trim().array
+ val data = new Array[Int](srcIdArray.length)
+ JArrays.fill(data, 1)
+
+ Iterator((index, new EdgePartition[Int](srcIdArray, dstIdArray, data)))
+ }.cache()
+ edges.count()
+
+ logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
+
val defaultVertexAttr = 1
- Graph.fromEdges(edges, defaultVertexAttr, partitionStrategy)
+ GraphImpl.fromEdgePartitions(edges, defaultVertexAttr, partitionStrategy)
} // end of edgeListFile
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index 771c460345..1e17fd5a67 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -253,25 +253,11 @@ object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
edges: RDD[Edge[ED]],
- defaultValue: VD,
+ defaultVertexAttr: VD,
partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] =
{
val etable = createETable(edges, partitionStrategy).cache()
-
- // Get the set of all vids
- val vids = etable.flatMap { e =>
- Iterator((e.srcId, 0), (e.dstId, 0))
- }
-
- // Shuffle the vids and create the VertexRDD.
- // TODO: Consider doing map side distinct before shuffle.
- val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)](
- vids, new HashPartitioner(edges.partitions.size))
- shuffled.setSerializer(classOf[VidMsgSerializer].getName)
- val vtable = VertexRDD(shuffled.mapValues(x => defaultValue))
-
- val vertexPlacement = new VertexPlacement(etable, vtable)
- new GraphImpl(vtable, etable, vertexPlacement)
+ fromEdgeRDD(etable, defaultVertexAttr)
}
def apply[VD: ClassManifest, ED: ClassManifest](
@@ -303,6 +289,14 @@ object GraphImpl {
new GraphImpl(vtable, etable, vertexPlacement)
}
+ def fromEdgePartitions[VD: ClassManifest, ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])],
+ defaultVertexAttr: VD,
+ partitionStrategy: PartitionStrategy): GraphImpl[VD, ED] = {
+ val etable = createETableFromEdgePartitions(edges, partitionStrategy)
+ fromEdgeRDD(etable, defaultVertexAttr)
+ }
+
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
@@ -313,29 +307,55 @@ object GraphImpl {
*/
protected def createETable[ED: ClassManifest](
edges: RDD[Edge[ED]],
- partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
- // Get the number of partitions
- val numPartitions = edges.partitions.size
+ partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
+ // Get the number of partitions
+ val numPartitions = edges.partitions.size
- val eTable = edges.map { e =>
- val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
+ val eTable = edges.map { e =>
+ val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
- // Should we be using 3-tuple or an optimized class
- new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
- }
- .partitionBy(new HashPartitioner(numPartitions))
- .mapPartitionsWithIndex( { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED]
- iter.foreach { message =>
- val data = message.data
- builder.add(data._1, data._2, data._3)
- }
- val edgePartition = builder.toEdgePartition
- Iterator((pid, edgePartition))
- }, preservesPartitioning = true).cache()
+ // Should we be using 3-tuple or an optimized class
+ new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+ }
+ .partitionBy(new HashPartitioner(numPartitions))
+ .mapPartitionsWithIndex( { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED]
+ iter.foreach { message =>
+ val data = message.data
+ builder.add(data._1, data._2, data._3)
+ }
+ val edgePartition = builder.toEdgePartition
+ Iterator((pid, edgePartition))
+ }, preservesPartitioning = true).cache()
new EdgeRDD(eTable)
}
+ protected def createETableFromEdgePartitions[ED: ClassManifest](
+ edges: RDD[(Pid, EdgePartition[ED])],
+ partitionStrategy: PartitionStrategy): EdgeRDD[ED] = {
+ // TODO(ankurdave): provide option to repartition edges using partitionStrategy
+ new EdgeRDD(edges)
+ }
+
+ private def fromEdgeRDD[VD: ClassManifest, ED: ClassManifest](
+ edges: EdgeRDD[ED],
+ defaultVertexAttr: VD): GraphImpl[VD, ED] = {
+ // Get the set of all vids
+ val vids = edges.flatMap { e =>
+ Iterator((e.srcId, 0), (e.dstId, 0))
+ }
+
+ // Shuffle the vids and create the VertexRDD.
+ // TODO: Consider doing map side distinct before shuffle.
+ val shuffled = new ShuffledRDD[Vid, Int, (Vid, Int)](
+ vids, new HashPartitioner(edges.partitions.size))
+ shuffled.setSerializer(classOf[VidMsgSerializer].getName)
+ val vtable = VertexRDD(shuffled.mapValues(x => defaultVertexAttr))
+
+ val vertexPlacement = new VertexPlacement(edges, vtable)
+ new GraphImpl(vtable, edges, vertexPlacement)
+ }
+
private def accessesVertexAttr[VD, ED](closure: AnyRef, attrName: String): Boolean = {
try {
BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)