aboutsummaryrefslogtreecommitdiff
path: root/bagel/src
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2013-03-09 12:40:48 +0200
committerNick Pentreath <nick.pentreath@gmail.com>2013-03-09 12:40:48 +0200
commit1e981d8b260423bfed6d4e67bd94ba4c07a4ff71 (patch)
treed3a62ff3904a4f7c61dbeee31ba0c26e189e8f13 /bagel/src
parent9f0dc829cbaa9aa5011fb917010d13ea5e0a19d7 (diff)
downloadspark-1e981d8b260423bfed6d4e67bd94ba4c07a4ff71.tar.gz
spark-1e981d8b260423bfed6d4e67bd94ba4c07a4ff71.tar.bz2
spark-1e981d8b260423bfed6d4e67bd94ba4c07a4ff71.zip
Added choice of persitance level to Bagel. Also added documentation.
Diffstat (limited to 'bagel/src')
-rw-r--r--bagel/src/main/scala/spark/bagel/Bagel.scala91
1 files changed, 83 insertions, 8 deletions
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala
index 094e57dacb..013388cbdf 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/spark/bagel/Bagel.scala
@@ -4,8 +4,38 @@ import spark._
import spark.SparkContext._
import scala.collection.mutable.ArrayBuffer
+import storage.StorageLevel
object Bagel extends Logging {
+
+ val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY
+
+ /**
+ * Runs a Bagel program.
+ * @param sc [[spark.SparkContext]] to use for the program.
+ * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
+ * the vertex id.
+ * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
+ * empty array, i.e. sc.parallelize(Array[K, Message]()).
+ * @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
+ * message before sending (which often involves network I/O).
+ * @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
+ * and provides the result to each vertex in the next superstep.
+ * @param partitioner [[spark.Partitioner]] partitions values by key
+ * @param numPartitions number of partitions across which to split the graph.
+ * Default is the default parallelism of the SparkContext
+ * @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
+ * Defaults to caching in memory.
+ * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
+ * optional Aggregator and the current superstep,
+ * and returns a set of (Vertex, outgoing Messages) pairs
+ * @tparam K key
+ * @tparam V vertex type
+ * @tparam M message type
+ * @tparam C combiner
+ * @tparam A aggregator
+ * @return a set of (K, V) pairs representing the graph after completion of the program
+ */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
C: Manifest, A: Manifest](
sc: SparkContext,
@@ -14,7 +44,8 @@ object Bagel extends Logging {
combiner: Combiner[M, C],
aggregator: Option[Aggregator[V, A]],
partitioner: Partitioner,
- numPartitions: Int
+ numPartitions: Int,
+ storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
)(
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)] = {
@@ -33,7 +64,7 @@ object Bagel extends Logging {
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val (processed, numMsgs, numActiveVerts) =
- comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep))
+ comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel)
val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
@@ -50,6 +81,7 @@ object Bagel extends Logging {
verts
}
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -59,12 +91,27 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]] */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ partitioner: Partitioner,
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, partitioner, numPartitions)(
+ sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, C](compute))
}
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]]
+ * and default storage level*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -73,13 +120,27 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = {
val part = new HashPartitioner(numPartitions)
run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, part, numPartitions)(
+ sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, C](compute))
}
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]],
+ * [[spark.bagel.DefaultCombiner]] and the default storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -87,10 +148,22 @@ object Bagel extends Logging {
numPartitions: Int
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
+ ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /** Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]]
+ * and [[spark.bagel.DefaultCombiner]]*/
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[Array[M]], Int) => (V, Array[M])
+ ): RDD[(K, V)] = {
val part = new HashPartitioner(numPartitions)
run[K, V, M, Array[M], Nothing](
- sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)(
+ sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)(
addAggregatorArg[K, V, M, Array[M]](compute))
}
@@ -117,7 +190,8 @@ object Bagel extends Logging {
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
- compute: (V, Option[C]) => (V, Array[M])
+ compute: (V, Option[C]) => (V, Array[M]),
+ storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
@@ -135,7 +209,7 @@ object Bagel extends Logging {
numActiveVerts += 1
Some((newVert, newMsgs))
- }.cache
+ }.persist(storageLevel)
// Force evaluation of processed RDD for accurate performance measurements
processed.foreach(x => {})
@@ -166,6 +240,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}
+/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M] =
Array(msg)