From 1e981d8b260423bfed6d4e67bd94ba4c07a4ff71 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Sat, 9 Mar 2013 12:40:48 +0200 Subject: Added choice of persitance level to Bagel. Also added documentation. --- bagel/src/main/scala/spark/bagel/Bagel.scala | 91 +++++++++++++++++++++++++--- 1 file changed, 83 insertions(+), 8 deletions(-) (limited to 'bagel/src') diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/spark/bagel/Bagel.scala index 094e57dacb..013388cbdf 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/spark/bagel/Bagel.scala @@ -4,8 +4,38 @@ import spark._ import spark.SparkContext._ import scala.collection.mutable.ArrayBuffer +import storage.StorageLevel object Bagel extends Logging { + + val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY + + /** + * Runs a Bagel program. + * @param sc [[spark.SparkContext]] to use for the program. + * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be + * the vertex id. + * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an + * empty array, i.e. sc.parallelize(Array[K, Message]()). + * @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one + * message before sending (which often involves network I/O). + * @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep, + * and provides the result to each vertex in the next superstep. + * @param partitioner [[spark.Partitioner]] partitions values by key + * @param numPartitions number of partitions across which to split the graph. + * Default is the default parallelism of the SparkContext + * @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep. + * Defaults to caching in memory. + * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex, + * optional Aggregator and the current superstep, + * and returns a set of (Vertex, outgoing Messages) pairs + * @tparam K key + * @tparam V vertex type + * @tparam M message type + * @tparam C combiner + * @tparam A aggregator + * @return a set of (K, V) pairs representing the graph after completion of the program + */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest]( sc: SparkContext, @@ -14,7 +44,8 @@ object Bagel extends Logging { combiner: Combiner[M, C], aggregator: Option[Aggregator[V, A]], partitioner: Partitioner, - numPartitions: Int + numPartitions: Int, + storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL )( compute: (V, Option[C], Option[A], Int) => (V, Array[M]) ): RDD[(K, V)] = { @@ -33,7 +64,7 @@ object Bagel extends Logging { combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner) val grouped = combinedMsgs.groupWith(verts) val (processed, numMsgs, numActiveVerts) = - comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep)) + comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel) val timeTaken = System.currentTimeMillis - startTime logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000)) @@ -50,6 +81,7 @@ object Bagel extends Logging { verts } + /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -59,12 +91,27 @@ object Bagel extends Logging { numPartitions: Int )( compute: (V, Option[C], Int) => (V, Array[M]) + ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + + /** Runs a Bagel program with no [[spark.bagel.Aggregator]] */ + def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( + sc: SparkContext, + vertices: RDD[(K, V)], + messages: RDD[(K, M)], + combiner: Combiner[M, C], + partitioner: Partitioner, + numPartitions: Int, + storageLevel: StorageLevel + )( + compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = { run[K, V, M, C, Nothing]( - sc, vertices, messages, combiner, None, partitioner, numPartitions)( + sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)( addAggregatorArg[K, V, M, C](compute)) } + /** Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]] + * and default storage level*/ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -73,13 +120,27 @@ object Bagel extends Logging { numPartitions: Int )( compute: (V, Option[C], Int) => (V, Array[M]) + ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + + /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/ + def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( + sc: SparkContext, + vertices: RDD[(K, V)], + messages: RDD[(K, M)], + combiner: Combiner[M, C], + numPartitions: Int, + storageLevel: StorageLevel + )( + compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = { val part = new HashPartitioner(numPartitions) run[K, V, M, C, Nothing]( - sc, vertices, messages, combiner, None, part, numPartitions)( + sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)( addAggregatorArg[K, V, M, C](compute)) } + /** Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]], + * [[spark.bagel.DefaultCombiner]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -87,10 +148,22 @@ object Bagel extends Logging { numPartitions: Int )( compute: (V, Option[Array[M]], Int) => (V, Array[M]) - ): RDD[(K, V)] = { + ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) + + /** Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]] + * and [[spark.bagel.DefaultCombiner]]*/ + def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( + sc: SparkContext, + vertices: RDD[(K, V)], + messages: RDD[(K, M)], + numPartitions: Int, + storageLevel: StorageLevel + )( + compute: (V, Option[Array[M]], Int) => (V, Array[M]) + ): RDD[(K, V)] = { val part = new HashPartitioner(numPartitions) run[K, V, M, Array[M], Nothing]( - sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions)( + sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)( addAggregatorArg[K, V, M, Array[M]](compute)) } @@ -117,7 +190,8 @@ object Bagel extends Logging { private def comp[K: Manifest, V <: Vertex, M <: Message[K], C]( sc: SparkContext, grouped: RDD[(K, (Seq[C], Seq[V]))], - compute: (V, Option[C]) => (V, Array[M]) + compute: (V, Option[C]) => (V, Array[M]), + storageLevel: StorageLevel ): (RDD[(K, (V, Array[M]))], Int, Int) = { var numMsgs = sc.accumulator(0) var numActiveVerts = sc.accumulator(0) @@ -135,7 +209,7 @@ object Bagel extends Logging { numActiveVerts += 1 Some((newVert, newMsgs)) - }.cache + }.persist(storageLevel) // Force evaluation of processed RDD for accurate performance measurements processed.foreach(x => {}) @@ -166,6 +240,7 @@ trait Aggregator[V, A] { def mergeAggregators(a: A, b: A): A } +/** Default combiner that simply appends messages together (i.e. performs no aggregation) */ class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable { def createCombiner(msg: M): Array[M] = Array(msg) -- cgit v1.2.3