aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala21
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala67
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala34
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala12
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala49
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala55
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala82
8 files changed, 229 insertions, 97 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 363de93e06..2d8ff1194a 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -149,6 +149,27 @@ object StorageLevel {
/**
* :: DeveloperApi ::
+ * Return the StorageLevel object with the specified name.
+ */
+ @DeveloperApi
+ def fromString(s: String): StorageLevel = s match {
+ case "NONE" => NONE
+ case "DISK_ONLY" => DISK_ONLY
+ case "DISK_ONLY_2" => DISK_ONLY_2
+ case "MEMORY_ONLY" => MEMORY_ONLY
+ case "MEMORY_ONLY_2" => MEMORY_ONLY_2
+ case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
+ case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
+ case "MEMORY_AND_DISK" => MEMORY_AND_DISK
+ case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
+ case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
+ case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
+ case "OFF_HEAP" => OFF_HEAP
+ case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
+ }
+
+ /**
+ * :: DeveloperApi ::
* Create a new StorageLevel object without setting useOffHeap.
*/
@DeveloperApi
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index a8fc095072..899a3cbd62 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
- val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
+ val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
+ val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
partitionsRDD.setName("EdgeRDD")
@@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
+ /**
+ * Persists the edge partitions at the specified storage level, ignoring any existing target
+ * storage level.
+ */
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
@@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this
}
+ /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ override def cache(): this.type = {
+ partitionsRDD.persist(targetStorageLevel)
+ this
+ }
+
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
- new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+ this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
@@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
- new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+ this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}
+
+ /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
+ private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+ partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
+ new EdgeRDD(partitionsRDD, this.targetStorageLevel)
+ }
+
+ /**
+ * Changes the target storage level while preserving all other properties of the
+ * EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
+ *
+ * This does not actually trigger a cache; to do this, call
+ * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
+ */
+ private[graphx] def withTargetStorageLevel(
+ targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+ new EdgeRDD(this.partitionsRDD, targetStorageLevel)
+ }
+
+}
+
+object EdgeRDD {
+ /**
+ * Creates an EdgeRDD from a set of edges.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
+ */
+ def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
+ val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
+ val builder = new EdgePartitionBuilder[ED, VD]
+ iter.foreach { e =>
+ builder.add(e.srcId, e.dstId, e.attr)
+ }
+ Iterator((pid, builder.toEdgePartition))
+ }
+ EdgeRDD.fromEdgePartitions(edgePartitions)
+ }
+
+ /**
+ * Creates an EdgeRDD from already-constructed edge partitions.
+ *
+ * @tparam ED the edge attribute type
+ * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
+ */
+ def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
+ edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
+ new EdgeRDD(edgePartitions)
+ }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index dc5dac4fda..c4f9d6514c 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
@transient val triplets: RDD[EdgeTriplet[VD, ED]]
/**
- * Caches the vertices and edges associated with this graph at the specified storage level.
+ * Caches the vertices and edges associated with this graph at the specified storage level,
+ * ignoring any target storage levels previously set.
*
* @param newLevel the level at which to cache the graph.
*
@@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
/**
- * Caches the vertices and edges associated with this graph. This is used to
- * pin a graph in memory enabling multiple queries to reuse the same
- * construction process.
+ * Caches the vertices and edges associated with this graph at the previously-specified target
+ * storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
+ * multiple queries to reuse the same construction process.
*/
def cache(): Graph[VD, ED]
@@ -358,9 +359,12 @@ object Graph {
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src, dst) form
+ * @param defaultValue the vertex attributes with which to create vertices referenced by the edges
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* `uniqueEdges`, a [[PartitionStrategy]] must be provided.
+ * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+ * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
@@ -368,10 +372,12 @@ object Graph {
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
- uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
+ uniqueEdges: Option[PartitionStrategy] = None,
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
- val graph = GraphImpl(edges, defaultValue)
+ val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph
@@ -383,14 +389,18 @@ object Graph {
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
+ * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+ * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
- defaultValue: VD): Graph[VD, ED] = {
- GraphImpl(edges, defaultValue)
+ defaultValue: VD,
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
+ GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}
/**
@@ -405,12 +415,16 @@ object Graph {
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices
+ * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+ * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
- GraphImpl(vertices, edges, defaultVertexAttr)
+ defaultVertexAttr: VD = null.asInstanceOf[VD],
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
+ GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
/**
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 389490c139..2e814e34f9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -17,6 +17,7 @@
package org.apache.spark.graphx
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
@@ -48,12 +49,16 @@ object GraphLoader extends Logging {
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the edge RDD
+ * @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex
+ * storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
- minEdgePartitions: Int = 1)
+ minEdgePartitions: Int = 1,
+ edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis
@@ -78,12 +83,13 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
- }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
+ }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
- GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
+ GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 8b910fbc5a..f1b6df9a30 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
class VertexRDD[@specialized VD: ClassTag](
- val partitionsRDD: RDD[ShippableVertexPartition[VD]])
+ val partitionsRDD: RDD[ShippableVertexPartition[VD]],
+ val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
require(partitionsRDD.partitioner.isDefined)
@@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag](
* VertexRDD will be based on a different index and can no longer be quickly joined with this
* RDD.
*/
- def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
+ def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
override val partitioner = partitionsRDD.partitioner
@@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag](
}
setName("VertexRDD")
+ /**
+ * Persists the vertex partitions at the specified storage level, ignoring any existing target
+ * storage level.
+ */
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
@@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag](
this
}
+ /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
+ override def cache(): this.type = {
+ partitionsRDD.persist(targetStorageLevel)
+ this
+ }
+
/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
@@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
@@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.diff(otherPart))
}
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
/**
@@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.leftJoin(otherPart)(f))
}
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
/**
@@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
leftZipJoin(other)(f)
case _ =>
- new VertexRDD[VD3](
+ this.withPartitionsRDD[VD3](
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
@@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.innerJoin(otherPart)(f))
}
- new VertexRDD(newPartitionsRDD)
+ this.withPartitionsRDD(newPartitionsRDD)
}
/**
@@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
innerZipJoin(other)(f)
case _ =>
- new VertexRDD(
+ this.withPartitionsRDD(
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
@@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag](
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
- new VertexRDD[VD2](parts)
+ this.withPartitionsRDD[VD2](parts)
}
/**
@@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag](
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
partIter.map(_.withRoutingTable(routingTable))
}
- new VertexRDD(vertexPartitions)
+ this.withPartitionsRDD(vertexPartitions)
+ }
+
+ /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
+ private[graphx] def withPartitionsRDD[VD2: ClassTag](
+ partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
+ new VertexRDD(partitionsRDD, this.targetStorageLevel)
+ }
+
+ /**
+ * Changes the target storage level while preserving all other properties of the
+ * VertexRDD. Operations on the returned VertexRDD will preserve this storage level.
+ *
+ * This does not actually trigger a cache; to do this, call
+ * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
+ */
+ private[graphx] def withTargetStorageLevel(
+ targetStorageLevel: StorageLevel): VertexRDD[VD] = {
+ new VertexRDD(this.partitionsRDD, targetStorageLevel)
}
/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 1649b244d2..59d9a8808e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
this
}
- override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+ override def cache(): Graph[VD, ED] = {
+ vertices.cache()
+ replicatedVertexView.edges.cache()
+ this
+ }
override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
vertices.unpersist(blocking)
@@ -70,10 +74,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
- val numPartitions = replicatedVertexView.edges.partitions.size
+ val numPartitions = edges.partitions.size
val edTag = classTag[ED]
val vdTag = classTag[VD]
- val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
+ val newEdges = edges.withPartitionsRDD(edges.map { e =>
val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class
@@ -256,24 +260,33 @@ object GraphImpl {
/** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
/** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */
def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ fromEdgeRDD(EdgeRDD.fromEdgePartitions(edgePartitions), defaultVertexAttr, edgeStorageLevel,
+ vertexStorageLevel)
}
/** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
+ .withTargetStorageLevel(edgeStorageLevel).cache()
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
+ .withTargetStorageLevel(vertexStorageLevel).cache()
GraphImpl(vertexRDD, edgeRDD)
}
@@ -309,23 +322,13 @@ object GraphImpl {
*/
private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
edges: EdgeRDD[ED, VD],
- defaultVertexAttr: VD): GraphImpl[VD, ED] = {
- edges.cache()
- val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr)
- fromExistingRDDs(vertices, edges)
- }
-
- /** Create an EdgeRDD from a set of edges. */
- private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
- edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
- val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED, VD]
- iter.foreach { e =>
- builder.add(e.srcId, e.dstId, e.attr)
- }
- Iterator((pid, builder.toEdgePartition))
- }
- new EdgeRDD(edgePartitions)
+ defaultVertexAttr: VD,
+ edgeStorageLevel: StorageLevel,
+ vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+ val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
+ val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
+ .withTargetStorageLevel(vertexStorageLevel)
+ fromExistingRDDs(vertices, edgesCached)
}
} // end of object GraphImpl
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 3a0bba1b93..86b366eb92 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
.setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
includeSrc, includeDst, shipSrc, shipDst))
.partitionBy(edges.partitioner.get)
- val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+ val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
@@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
.setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
.partitionBy(edges.partitioner.get)
- val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
+ val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
(ePartIter, shippedActivesIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
@@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
hasSrcId, hasDstId))
.partitionBy(edges.partitioner.get)
- val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+ val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
(ePartIter, shippedVertsIter) => ePartIter.map {
case (pid, edgePartition) =>
(pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 069e042ed9..c1513a0045 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -17,7 +17,9 @@
package org.apache.spark.graphx.lib
+import scala.collection.mutable
import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy._
@@ -28,18 +30,20 @@ object Analytics extends Logging {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
- System.err.println("Usage: Analytics <taskType> <file> [other options]")
+ System.err.println(
+ "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions> [other options]")
System.exit(1)
}
val taskType = args(0)
val fname = args(1)
- val options = args.drop(2).map { arg =>
+ val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
+ val options = mutable.Map(optionsList: _*)
def pickPartitioner(v: String): PartitionStrategy = {
// TODO: Use reflection rather than listing all the partitioning strategies here.
@@ -57,20 +61,24 @@ object Analytics extends Logging {
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
.set("spark.locality.wait", "100000")
+ val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
+ println("Set the number of edge partitions using --numEPart.")
+ sys.exit(1)
+ }
+ val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
+ .map(pickPartitioner(_))
+ val edgeStorageLevel = options.remove("edgeStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+ val vertexStorageLevel = options.remove("vertexStorageLevel")
+ .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
taskType match {
case "pagerank" =>
- var tol: Float = 0.001F
- var outFname = ""
- var numEPart = 4
- var partitionStrategy: Option[PartitionStrategy] = None
- var numIterOpt: Option[Int] = None
-
- options.foreach{
- case ("tol", v) => tol = v.toFloat
- case ("output", v) => outFname = v
- case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
- case ("numIter", v) => numIterOpt = Some(v.toInt)
+ val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
+ val outFname = options.remove("output").getOrElse("")
+ val numIterOpt = options.remove("numIter").map(_.toInt)
+
+ options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
@@ -81,7 +89,9 @@ object Analytics extends Logging {
val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart).cache()
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
println("GRAPHX: Number of vertices " + graph.vertices.count)
@@ -102,32 +112,19 @@ object Analytics extends Logging {
sc.stop()
case "cc" =>
- var numIter = Int.MaxValue
- var numVPart = 4
- var numEPart = 4
- var isDynamic = false
- var partitionStrategy: Option[PartitionStrategy] = None
-
- options.foreach{
- case ("numIter", v) => numIter = v.toInt
- case ("dynamic", v) => isDynamic = v.toBoolean
- case ("numEPart", v) => numEPart = v.toInt
- case ("numVPart", v) => numVPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+ options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
- if (!isDynamic && numIter == Int.MaxValue) {
- println("Set number of iterations!")
- sys.exit(1)
- }
println("======================================")
println("| Connected Components |")
println("======================================")
val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
- minEdgePartitions = numEPart).cache()
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel).cache()
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
val cc = ConnectedComponents.run(graph)
@@ -135,24 +132,25 @@ object Analytics extends Logging {
sc.stop()
case "triangles" =>
- var numEPart = 4
- // TriangleCount requires the graph to be partitioned
- var partitionStrategy: PartitionStrategy = RandomVertexCut
-
- options.foreach{
- case ("numEPart", v) => numEPart = v.toInt
- case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+ options.foreach {
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
+
println("======================================")
println("| Triangle Count |")
println("======================================")
+
val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
- val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
- minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
+ val graph = GraphLoader.edgeListFile(sc, fname,
+ canonicalOrientation = true,
+ minEdgePartitions = numEPart,
+ edgeStorageLevel = edgeStorageLevel,
+ vertexStorageLevel = vertexStorageLevel)
+ // TriangleCount requires the graph to be partitioned
+ .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
val triangles = TriangleCount.run(graph)
println("Triangles: " + triangles.vertices.map {
- case (vid,data) => data.toLong
+ case (vid, data) => data.toLong
}.reduce(_ + _) / 3)
sc.stop()