aboutsummaryrefslogtreecommitdiff
path: root/bagel/src/main/scala/org
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-31 19:27:07 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-09-01 14:13:13 -0700
commit46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef (patch)
tree4a46971b36680bc5ef51be81ada8eb47670f6b22 /bagel/src/main/scala/org
parenta30fac16ca0525f2001b127e5f9518c9680844c9 (diff)
downloadspark-46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef.tar.gz
spark-46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef.tar.bz2
spark-46eecd110a4017ea0c86cbb1010d0ccd6a5eb2ef.zip
Initial work to rename package to org.apache.spark
Diffstat (limited to 'bagel/src/main/scala/org')
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala293
1 files changed, 293 insertions, 0 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
new file mode 100644
index 0000000000..fec8737fcd
--- /dev/null
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.bagel
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+
+import org.apache.spark.storage.StorageLevel
+
+object Bagel extends Logging {
+ val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
+
+ /**
+ * Runs a Bagel program.
+ * @param sc [[org.apache.spark.SparkContext]] to use for the program.
+ * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
+ * the vertex id.
+ * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
+ * empty array, i.e. sc.parallelize(Array[K, Message]()).
+ * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
+ * message before sending (which often involves network I/O).
+ * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
+ * and provides the result to each vertex in the next superstep.
+ * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
+ * @param numPartitions number of partitions across which to split the graph.
+ * Default is the default parallelism of the SparkContext
+ * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
+ * Defaults to caching in memory.
+ * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
+ * optional Aggregator and the current superstep,
+ * and returns a set of (Vertex, outgoing Messages) pairs
+ * @tparam K key
+ * @tparam V vertex type
+ * @tparam M message type
+ * @tparam C combiner
+ * @tparam A aggregator
+ * @return an RDD of (K, V) pairs representing the graph after completion of the program
+ */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
+ C: Manifest, A: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ aggregator: Option[Aggregator[V, A]],
+ partitioner: Partitioner,
+ numPartitions: Int,
+ storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
+ )(
+ compute: (V, Option[C], Option[A], Int) => (V, Array[M])
+ ): RDD[(K, V)] = {
+ val splits = if (numPartitions != 0) numPartitions else sc.defaultParallelism
+
+ var superstep = 0
+ var verts = vertices
+ var msgs = messages
+ var noActivity = false
+ do {
+ logInfo("Starting superstep "+superstep+".")
+ val startTime = System.currentTimeMillis
+
+ val aggregated = agg(verts, aggregator)
+ val combinedMsgs = msgs.combineByKey(
+ combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
+ val grouped = combinedMsgs.groupWith(verts)
+ val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
+ val (processed, numMsgs, numActiveVerts) =
+ comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
+
+ val timeTaken = System.currentTimeMillis - startTime
+ logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
+
+ verts = processed.mapValues { case (vert, msgs) => vert }
+ msgs = processed.flatMap {
+ case (id, (vert, msgs)) => msgs.map(m => (m.targetId, m))
+ }
+ superstep += 1
+
+ noActivity = numMsgs == 0 && numActiveVerts == 0
+ } while (!noActivity)
+
+ verts
+ }
+
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ partitioner: Partitioner,
+ numPartitions: Int
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ partitioner: Partitioner,
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = {
+ run[K, V, M, C, Nothing](
+ sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)(
+ addAggregatorArg[K, V, M, C](compute))
+ }
+
+ /**
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
+ * and default storage level
+ */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ numPartitions: Int
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ combiner: Combiner[M, C],
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[C], Int) => (V, Array[M])
+ ): RDD[(K, V)] = {
+ val part = new HashPartitioner(numPartitions)
+ run[K, V, M, C, Nothing](
+ sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)(
+ addAggregatorArg[K, V, M, C](compute))
+ }
+
+ /**
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
+ * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
+ */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ numPartitions: Int
+ )(
+ compute: (V, Option[Array[M]], Int) => (V, Array[M])
+ ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
+
+ /**
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
+ * and [[org.apache.spark.bagel.DefaultCombiner]]
+ */
+ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
+ sc: SparkContext,
+ vertices: RDD[(K, V)],
+ messages: RDD[(K, M)],
+ numPartitions: Int,
+ storageLevel: StorageLevel
+ )(
+ compute: (V, Option[Array[M]], Int) => (V, Array[M])
+ ): RDD[(K, V)] = {
+ val part = new HashPartitioner(numPartitions)
+ run[K, V, M, Array[M], Nothing](
+ sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)(
+ addAggregatorArg[K, V, M, Array[M]](compute))
+ }
+
+ /**
+ * Aggregates the given vertices using the given aggregator, if it
+ * is specified.
+ */
+ private def agg[K, V <: Vertex, A: Manifest](
+ verts: RDD[(K, V)],
+ aggregator: Option[Aggregator[V, A]]
+ ): Option[A] = aggregator match {
+ case Some(a) =>
+ Some(verts.map {
+ case (id, vert) => a.createAggregator(vert)
+ }.reduce(a.mergeAggregators(_, _)))
+ case None => None
+ }
+
+ /**
+ * Processes the given vertex-message RDD using the compute
+ * function. Returns the processed RDD, the number of messages
+ * created, and the number of active vertices.
+ */
+ private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
+ sc: SparkContext,
+ grouped: RDD[(K, (Seq[C], Seq[V]))],
+ compute: (V, Option[C]) => (V, Array[M]),
+ storageLevel: StorageLevel
+ ): (RDD[(K, (V, Array[M]))], Int, Int) = {
+ var numMsgs = sc.accumulator(0)
+ var numActiveVerts = sc.accumulator(0)
+ val processed = grouped.flatMapValues {
+ case (_, vs) if vs.size == 0 => None
+ case (c, vs) =>
+ val (newVert, newMsgs) =
+ compute(vs(0), c match {
+ case Seq(comb) => Some(comb)
+ case Seq() => None
+ })
+
+ numMsgs += newMsgs.size
+ if (newVert.active)
+ numActiveVerts += 1
+
+ Some((newVert, newMsgs))
+ }.persist(storageLevel)
+
+ // Force evaluation of processed RDD for accurate performance measurements
+ processed.foreach(x => {})
+
+ (processed, numMsgs.value, numActiveVerts.value)
+ }
+
+ /**
+ * Converts a compute function that doesn't take an aggregator to
+ * one that does, so it can be passed to Bagel.run.
+ */
+ private def addAggregatorArg[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C](
+ compute: (V, Option[C], Int) => (V, Array[M])
+ ): (V, Option[C], Option[Nothing], Int) => (V, Array[M]) = {
+ (vert: V, msgs: Option[C], aggregated: Option[Nothing], superstep: Int) =>
+ compute(vert, msgs, superstep)
+ }
+}
+
+trait Combiner[M, C] {
+ def createCombiner(msg: M): C
+ def mergeMsg(combiner: C, msg: M): C
+ def mergeCombiners(a: C, b: C): C
+}
+
+trait Aggregator[V, A] {
+ def createAggregator(vert: V): A
+ def mergeAggregators(a: A, b: A): A
+}
+
+/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
+class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
+ def createCombiner(msg: M): Array[M] =
+ Array(msg)
+ def mergeMsg(combiner: Array[M], msg: M): Array[M] =
+ combiner :+ msg
+ def mergeCombiners(a: Array[M], b: Array[M]): Array[M] =
+ a ++ b
+}
+
+/**
+ * Represents a Bagel vertex.
+ *
+ * Subclasses may store state along with each vertex and must
+ * inherit from java.io.Serializable or scala.Serializable.
+ */
+trait Vertex {
+ def active: Boolean
+}
+
+/**
+ * Represents a Bagel message to a target vertex.
+ *
+ * Subclasses may contain a payload to deliver to the target vertex
+ * and must inherit from java.io.Serializable or scala.Serializable.
+ */
+trait Message[K] {
+ def targetId: K
+}