aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-06-03 14:54:26 -0700
committerReynold Xin <rxin@apache.org>2014-06-03 14:54:26 -0700
commitb1feb60209174433262de2a26d39616ba00edcc8 (patch)
treef684a33a32976b1451de09b4e5641acf1cffdb9e /graphx
parent894ecde04faa7e2054a40825a58b2e9cdaa93c70 (diff)
downloadspark-b1feb60209174433262de2a26d39616ba00edcc8.tar.gz
spark-b1feb60209174433262de2a26d39616ba00edcc8.tar.bz2
spark-b1feb60209174433262de2a26d39616ba00edcc8.zip
[SPARK-1991] Support custom storage levels for vertices and edges
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed. The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels. In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods. I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed. Author: Ankur Dave <ankurdave@gmail.com> Closes #946 from ankurdave/SPARK-1991 and squashes the following commits: ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0 c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks" 34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks 6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala67
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala34
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala49
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala55
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala82
7 files changed, 208 insertions, 97 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index a8fc095072..899a3cbd62 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
- val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
+ val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
+ val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
partitionsRDD.setName("EdgeRDD")
@@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
+ /**
+ * Persists the edge partitions at the specified storage level, ignoring any existing target
+ * storage level.
+ */
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
@@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this
}
+ /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ override def cache(): this.type = {
+ partitionsRDD.persist(targetStorageLevel)
+ this
+ }
+
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
- new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+ this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
@@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
- new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+ this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}
+
+ /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
+ private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+ partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
+ new EdgeRDD(partitionsRDD, this.targetStorageLevel)
+ }
+
+ /**
+ * Changes the target storage level while preserving all other properties of the
+ * EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
+ *
+ * This does not actually trigger a cache; to do this, call
+ * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
+ */
+ private[graphx] def withTargetStorageLevel(
+ targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+ new EdgeRDD(this.partitionsRDD, targetStorageLevel)
+ }
+
+}
+
+object EdgeRDD {
+ /**
+ * Creates an EdgeRDD from a set of edges.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
+ */
+ def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
+ val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED, VD]
+ iter.foreach { e =>
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }
+ EdgeRDD.fromEdgePartitions(edgePartitions)
+ }
+
+ /**
+ * Creates an EdgeRDD from already-constructed edge partitions.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
+ */
+ def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
+ edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
+ new EdgeRDD(edgePartitions)
+ }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index dc5dac4fda..c4f9d6514c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
@transient val triplets: RDD[EdgeTriplet[VD, ED]]
/**
- * Caches the vertices and edges associated with this graph at the specified storage level.
+ * Caches the vertices and edges associated with this graph at the specified storage level,
+ * ignoring any target storage levels previously set.
*
* @param newLevel the level at which to cache the graph.
*
@@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
/**
- * Caches the vertices and edges associated with this graph. This is used to
- * pin a graph in memory enabling multiple queries to reuse the same
- * construction process.
+ * Caches the vertices and edges associated with this graph at the previously-specified target
+ * storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
+ * multiple queries to reuse the same construction process.
*/
def cache(): Graph[VD, ED]
@@ -358,9 +359,12 @@ object Graph {
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src, dst) form
+ * @param defaultValue the vertex attributes with which to create vertices referenced by the edges
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* `uniqueEdges`, a [[PartitionStrategy]] must be provided.
+ * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+ * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
@@ -368,10 +372,12 @@ object Graph {
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
- uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
+ uniqueEdges: Option[PartitionStrategy] = None,
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
- val graph = GraphImpl(edges, defaultValue)
+ val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph
@@ -383,14 +389,18 @@ object Graph {
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
+ * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+ * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
- defaultValue: VD): Graph[VD, ED] = {
- GraphImpl(edges, defaultValue)
+ defaultValue: VD,
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
+ GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}
/**
@@ -405,12 +415,16 @@ object Graph {
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices
+ * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+ * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
- GraphImpl(vertices, edges, defaultVertexAttr)
+ defaultVertexAttr: VD = null.asInstanceOf[VD],
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
+ GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
/**
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 389490c139..2e814e34f9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -17,6 +17,7 @@
package org.apache.spark.graphx
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
@@ -48,12 +49,16 @@ object GraphLoader extends Logging {
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the edge RDD
+ * @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex
+ * storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
- minEdgePartitions: Int = 1)
+ minEdgePartitions: Int = 1,
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis
@@ -78,12 +83,13 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
- }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
+ }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
- GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
+ GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 8b910fbc5a..f1b6df9a30 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
class VertexRDD[@specialized VD: ClassTag](
- val partitionsRDD: RDD[ShippableVertexPartition[VD]])
+ val partitionsRDD: RDD[ShippableVertexPartition[VD]],
+ val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined)
@@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag](
* VertexRDD will be based on a different index and can no longer be quickly joined with this
* RDD.
*/
- def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
+ def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
override val partitioner = partitionsRDD.partitioner
@@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag](
}
setName("VertexRDD")
+ /**
+ * Persists the vertex partitions at the specified storage level, ignoring any existing target
+ * storage level.
+ */
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
@@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag](
this
}
+ /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ override def cache(): this.type = {
+ partitionsRDD.persist(targetStorageLevel)
+ this
+ }
+
/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
@@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
@@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.diff(otherPart))
}
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
/**
@@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.leftJoin(otherPart)(f))
}
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
/**
@@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
leftZipJoin(other)(f)
case _ =>
- new VertexRDD[VD3](
+ this.withPartitionsRDD[VD3](
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
@@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.innerJoin(otherPart)(f))
}
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
/**
@@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
innerZipJoin(other)(f)
case _ =>
- new VertexRDD(
+ this.withPartitionsRDD(
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
@@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag](
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
- new VertexRDD[VD2](parts)
+ this.withPartitionsRDD[VD2](parts)
}
/**
@@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag](
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
partIter.map(_.withRoutingTable(routingTable))
}
- new VertexRDD(vertexPartitions)
+ this.withPartitionsRDD(vertexPartitions)
+ }
+
+ /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
+ private[graphx] def withPartitionsRDD[VD2: ClassTag](
+ partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
+ new VertexRDD(partitionsRDD, this.targetStorageLevel)
+ }
+
+ /**
+ * Changes the target storage level while preserving all other properties of the
+ * VertexRDD. Operations on the returned VertexRDD will preserve this storage level.
+ *
+ * This does not actually trigger a cache; to do this, call
+ * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
+ */
+ private[graphx] def withTargetStorageLevel(
+ targetStorageLevel: StorageLevel): VertexRDD[VD] = {
+ new VertexRDD(this.partitionsRDD, targetStorageLevel)
}
/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 1649b244d2..59d9a8808e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
this
}
- override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+ override def cache(): Graph[VD, ED] = {
+ vertices.cache()
+ replicatedVertexView.edges.cache()
+ this
+ }
override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
vertices.unpersist(blocking)
@@ -70,10 +74,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
- val numPartitions = replicatedVertexView.edges.partitions.size
+ val numPartitions = edges.partitions.size
val edTag = classTag[ED]
val vdTag = classTag[VD]
- val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
+ val newEdges = edges.withPartitionsRDD(edges.map { e =>
val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class
@@ -256,24 +260,33 @@ object GraphImpl {
/** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
/** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */
def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ fromEdgeRDD(EdgeRDD.fromEdgePartitions(edgePartitions), defaultVertexAttr, edgeStorageLevel,
+ vertexStorageLevel)
}
/** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
+ .withTargetStorageLevel(edgeStorageLevel).cache()
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
+ .withTargetStorageLevel(vertexStorageLevel).cache()
GraphImpl(vertexRDD, edgeRDD)
}
@@ -309,23 +322,13 @@ object GraphImpl {
*/
private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
edges: EdgeRDD[ED, VD],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- edges.cache()
- val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr)
- fromExistingRDDs(vertices, edges)
- }
-
- /** Create an EdgeRDD from a set of edges. */
- private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
- edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
- val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED, VD]
- iter.foreach { e =>
- builder.add(e.srcId, e.dstId, e.attr)
- }
- Iterator((pid, builder.toEdgePartition))
- }
- new EdgeRDD(edgePartitions)
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
+ val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
+ .withTargetStorageLevel(vertexStorageLevel)
+ fromExistingRDDs(vertices, edgesCached)
}
} // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 3a0bba1b93..86b366eb92 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
.setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
includeSrc, includeDst, shipSrc, shipDst))
.partitionBy(edges.partitioner.get)
- val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+ val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
@@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
.setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
.partitionBy(edges.partitioner.get)
- val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
+ val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
(ePartIter, shippedActivesIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
@@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
hasSrcId, hasDstId))
.partitionBy(edges.partitioner.get)
- val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+ val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 069e042ed9..c1513a0045 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -17,7 +17,9 @@
package org.apache.spark.graphx.lib
+import scala.collection.mutable
import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy._
@@ -28,18 +30,20 @@ object Analytics extends Logging {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
- System.err.println("Usage: Analytics <taskType> <file> [other options]")
+ System.err.println(
+ "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
System.exit(1)
}
val taskType = args(0)
val fname = args(1)
- val options = args.drop(2).map { arg =>
+ val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
+ val options = mutable.Map(optionsList: _*)
def pickPartitioner(v: String): PartitionStrategy = {
// TODO: Use reflection rather than listing all the partitioning strategies here.
@@ -57,20 +61,24 @@ object Analytics extends Logging {
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
.set("spark.locality.wait", "100000")
+ val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
+ println("Set the number of edge partitions using --numEPart.")
+ sys.exit(1)
+ }
+ val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
+ .map(pickPartitioner(_))
+ val edgeStorageLevel = options.remove("edgeStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+ val vertexStorageLevel = options.remove("vertexStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
taskType match {
case "pagerank" =>
- var tol: Float = 0.001F
- var outFname = ""
- var numEPart = 4
- var partitionStrategy: Option[PartitionStrategy] = None
- var numIterOpt: Option[Int] = None
-
- options.foreach{
- case ("tol", v) => tol = v.toFloat
- case ("output", v) => outFname = v
- case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
- case ("numIter", v) => numIterOpt = Some(v.toInt)
+ val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
+ val outFname = options.remove("output").getOrElse("")
+ val numIterOpt = options.remove("numIter").map(_.toInt)
+
+ options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
@@ -81,7 +89,9 @@ object Analytics extends Logging {
val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart).cache()
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
println("GRAPHX: Number of vertices " + graph.vertices.count)
@@ -102,32 +112,19 @@ object Analytics extends Logging {
sc.stop()
case "cc" =>
- var numIter = Int.MaxValue
- var numVPart = 4
- var numEPart = 4
- var isDynamic = false
- var partitionStrategy: Option[PartitionStrategy] = None
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
- if (!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
println("======================================")
println("| Connected Components |")
println("======================================")
val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart).cache()
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
val cc = ConnectedComponents.run(graph)
@@ -135,24 +132,25 @@ object Analytics extends Logging {
sc.stop()
case "triangles" =>
- var numEPart = 4
- // TriangleCount requires the graph to be partitioned
- var partitionStrategy: PartitionStrategy = RandomVertexCut
-
- options.foreach{
- case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+ options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
+
println("======================================")
println("| Triangle Count |")
println("======================================")
+
val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
- val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
- minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
+ val graph = GraphLoader.edgeListFile(sc, fname,
+ canonicalOrientation = true,
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel)
+ // TriangleCount requires the graph to be partitioned
+ .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
- case (vid,data) => data.toLong
+ case (vid, data) => data.toLong
}.reduce(_ + _) / 3)
sc.stop()