aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--assembly/pom.xml5
-rw-r--r--bagel/pom.xml64
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala318
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/package-info.java21
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/package.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala2
-rwxr-xr-xdev/audit-release/audit_release.py2
-rwxr-xr-xdocs/_layouts/global.html1
-rw-r--r--docs/bagel-programming-guide.md159
-rw-r--r--examples/pom.xml6
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java2
-rw-r--r--pom.xml3
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--repl/pom.xml6
15 files changed, 9 insertions, 613 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index c3ab92f993..6c79f91897 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -46,11 +46,6 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/bagel/pom.xml b/bagel/pom.xml
deleted file mode 100644
index d45224cc80..0000000000
--- a/bagel/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.10</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel_2.10</artifactId>
- <properties>
- <sbt.project.name>bagel</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project Bagel</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
deleted file mode 100644
index 8399033ac6..0000000000
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.bagel
-
-import org.apache.spark._
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-object Bagel extends Logging {
- val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
-
- /**
- * Runs a Bagel program.
- * @param sc org.apache.spark.SparkContext to use for the program.
- * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
- * Key will be the vertex id.
- * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
- * this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
- * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
- * given vertex into one message before sending (which often involves network
- * I/O).
- * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
- * after each superstep and provides the result to each vertex in the next
- * superstep.
- * @param partitioner org.apache.spark.Partitioner partitions values by key
- * @param numPartitions number of partitions across which to split the graph.
- * Default is the default parallelism of the SparkContext
- * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
- * intermediate RDDs in each superstep. Defaults to caching in memory.
- * @param compute function that takes a Vertex, optional set of (possibly combined) messages to
- * the Vertex, optional Aggregator and the current superstep,
- * and returns a set of (Vertex, outgoing Messages) pairs
- * @tparam K key
- * @tparam V vertex type
- * @tparam M message type
- * @tparam C combiner
- * @tparam A aggregator
- * @return an RDD of (K, V) pairs representing the graph after completion of the program
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
- C: Manifest, A: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- aggregator: Option[Aggregator[V, A]],
- partitioner: Partitioner,
- numPartitions: Int,
- storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
- )(
- compute: (V, Option[C], Option[A], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- val splits = if (numPartitions != 0) numPartitions else sc.defaultParallelism
-
- var superstep = 0
- var verts = vertices
- var msgs = messages
- var noActivity = false
- var lastRDD: RDD[(K, (V, Array[M]))] = null
- do {
- logInfo("Starting superstep " + superstep + ".")
- val startTime = System.currentTimeMillis
-
- val aggregated = agg(verts, aggregator)
- val combinedMsgs = msgs.combineByKeyWithClassTag(
- combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
- val grouped = combinedMsgs.groupWith(verts)
- val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
- val (processed, numMsgs, numActiveVerts) =
- comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
- if (lastRDD != null) {
- lastRDD.unpersist(false)
- }
- lastRDD = processed
-
- val timeTaken = System.currentTimeMillis - startTime
- logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
-
- verts = processed.mapValues { case (vert, msgs) => vert }
- msgs = processed.flatMap {
- case (id, (vert, msgs)) => msgs.map(m => (m.targetId, m))
- }
- superstep += 1
-
- noActivity = numMsgs == 0 && numActiveVerts == 0
- } while (!noActivity)
-
- verts
- }
-
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
- * storage level */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- partitioner: Partitioner,
- numPartitions: Int
- )(
- compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
- combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
-
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- partitioner: Partitioner,
- numPartitions: Int,
- storageLevel: StorageLevel
- )(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)(
- addAggregatorArg[K, V, M, C](compute))
- }
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
- * org.apache.spark.HashPartitioner and default storage level
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- numPartitions: Int
- )(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
- DEFAULT_STORAGE_LEVEL)(compute)
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
- * default org.apache.spark.HashPartitioner
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- numPartitions: Int,
- storageLevel: StorageLevel
- )(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- val part = new HashPartitioner(numPartitions)
- run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)(
- addAggregatorArg[K, V, M, C](compute))
- }
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * default org.apache.spark.HashPartitioner,
- * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- numPartitions: Int
- )(
- compute: (V, Option[Array[M]], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * the default org.apache.spark.HashPartitioner
- * and [[org.apache.spark.bagel.DefaultCombiner]]
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- numPartitions: Int,
- storageLevel: StorageLevel
- )(
- compute: (V, Option[Array[M]], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- val part = new HashPartitioner(numPartitions)
- run[K, V, M, Array[M], Nothing](
- sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)(
- addAggregatorArg[K, V, M, Array[M]](compute))
- }
-
- /**
- * Aggregates the given vertices using the given aggregator, if it
- * is specified.
- */
- private def agg[K, V <: Vertex, A: Manifest](
- verts: RDD[(K, V)],
- aggregator: Option[Aggregator[V, A]]
- ): Option[A] = aggregator match {
- case Some(a) =>
- Some(verts.map {
- case (id, vert) => a.createAggregator(vert)
- }.reduce(a.mergeAggregators(_, _)))
- case None => None
- }
-
- /**
- * Processes the given vertex-message RDD using the compute
- * function. Returns the processed RDD, the number of messages
- * created, and the number of active vertices.
- */
- private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
- sc: SparkContext,
- grouped: RDD[(K, (Iterable[C], Iterable[V]))],
- compute: (V, Option[C]) => (V, Array[M]),
- storageLevel: StorageLevel
- ): (RDD[(K, (V, Array[M]))], Int, Int) = {
- var numMsgs = sc.accumulator(0)
- var numActiveVerts = sc.accumulator(0)
- val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
- .flatMapValues {
- case (_, vs) if !vs.hasNext => None
- case (c, vs) => {
- val (newVert, newMsgs) =
- compute(vs.next,
- c.hasNext match {
- case true => Some(c.next)
- case false => None
- }
- )
-
- numMsgs += newMsgs.size
- if (newVert.active) {
- numActiveVerts += 1
- }
-
- Some((newVert, newMsgs))
- }
- }.persist(storageLevel)
-
- // Force evaluation of processed RDD for accurate performance measurements
- processed.foreach(x => {})
-
- (processed, numMsgs.value, numActiveVerts.value)
- }
-
- /**
- * Converts a compute function that doesn't take an aggregator to
- * one that does, so it can be passed to Bagel.run.
- */
- private def addAggregatorArg[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C](
- compute: (V, Option[C], Int) => (V, Array[M])
- ): (V, Option[C], Option[Nothing], Int) => (V, Array[M]) = {
- (vert: V, msgs: Option[C], aggregated: Option[Nothing], superstep: Int) =>
- compute(vert, msgs, superstep)
- }
-}
-
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Combiner[M, C] {
- def createCombiner(msg: M): C
- def mergeMsg(combiner: C, msg: M): C
- def mergeCombiners(a: C, b: C): C
-}
-
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Aggregator[V, A] {
- def createAggregator(vert: V): A
- def mergeAggregators(a: A, b: A): A
-}
-
-/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
- def createCombiner(msg: M): Array[M] =
- Array(msg)
- def mergeMsg(combiner: Array[M], msg: M): Array[M] =
- combiner :+ msg
- def mergeCombiners(a: Array[M], b: Array[M]): Array[M] =
- a ++ b
-}
-
-/**
- * Represents a Bagel vertex.
- *
- * Subclasses may store state along with each vertex and must
- * inherit from java.io.Serializable or scala.Serializable.
- */
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Vertex {
- def active: Boolean
-}
-
-/**
- * Represents a Bagel message to a target vertex.
- *
- * Subclasses may contain a payload to deliver to the target vertex
- * and must inherit from java.io.Serializable or scala.Serializable.
- */
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Message[K] {
- def targetId: K
-}
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/package-info.java b/bagel/src/main/scala/org/apache/spark/bagel/package-info.java
deleted file mode 100644
index 81f26f2765..0000000000
--- a/bagel/src/main/scala/org/apache/spark/bagel/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Bagel: An implementation of Pregel in Spark. THIS IS DEPRECATED - use Spark's GraphX library.
- */
-package org.apache.spark.bagel; \ No newline at end of file
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/package.scala b/bagel/src/main/scala/org/apache/spark/bagel/package.scala
deleted file mode 100644
index 2fb1934579..0000000000
--- a/bagel/src/main/scala/org/apache/spark/bagel/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-/**
- * Bagel: An implementation of Pregel in Spark. THIS IS DEPRECATED - use Spark's GraphX library.
- */
-package object bagel
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 52d3ab34c1..669b6b614e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -965,7 +965,7 @@ private[spark] object SparkSubmitUtils {
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
// other spark-streaming utility components. Underscore is there to differentiate between
// spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
- val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
+ val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
components.foreach { comp =>
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
index 63c346c1b8..4b5039b668 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala
@@ -171,7 +171,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll {
}
test("neglects Spark and Spark's dependencies") {
- val components = Seq("bagel_", "catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
+ val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", "repl_",
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
val coordinates =
diff --git a/dev/audit-release/audit_release.py b/dev/audit-release/audit_release.py
index 27d1dd784c..972be30da1 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -115,7 +115,7 @@ original_dir = os.getcwd()
# maven that links against them. This will catch issues with messed up
# dependencies within those projects.
modules = [
- "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", "spark-repl",
+ "spark-core", "spark-mllib", "spark-streaming", "spark-repl",
"spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
"spark-streaming-mqtt", "spark-streaming-twitter", "spark-streaming-zeromq",
"spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 3089474c13..62d75eff71 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -75,7 +75,6 @@
<li><a href="sql-programming-guide.html">DataFrames, Datasets and SQL</a></li>
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
- <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
<li><a href="sparkr.html">SparkR (R on Spark)</a></li>
</ul>
</li>
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
deleted file mode 100644
index 347ca4a7af..0000000000
--- a/docs/bagel-programming-guide.md
+++ /dev/null
@@ -1,159 +0,0 @@
----
-layout: global
-displayTitle: Bagel Programming Guide
-title: Bagel
----
-
-**Bagel is deprecated, and superseded by [GraphX](graphx-programming-guide.html).**
-
-Bagel is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. Bagel currently supports basic graph computation, combiners, and aggregators.
-
-In the Pregel programming model, jobs run as a sequence of iterations called _supersteps_. In each superstep, each vertex in the graph runs a user-specified function that can update state associated with the vertex and send messages to other vertices for use in the *next* iteration.
-
-This guide shows the programming model and features of Bagel by walking through an example implementation of PageRank on Bagel.
-
-# Linking with Bagel
-
-To use Bagel in your program, add the following SBT or Maven dependency:
-
- groupId = org.apache.spark
- artifactId = spark-bagel_{{site.SCALA_BINARY_VERSION}}
- version = {{site.SPARK_VERSION}}
-
-# Programming Model
-
-Bagel operates on a graph represented as a [distributed dataset](programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
-
-For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.
-
-We first extend the default `Vertex` class to store a `Double`
-representing the current PageRank of the vertex, and similarly extend
-the `Message` and `Edge` classes. Note that these need to be marked `@serializable` to allow Spark to transfer them across machines. We also import the Bagel types and implicit conversions.
-
-{% highlight scala %}
-import org.apache.spark.bagel._
-import org.apache.spark.bagel.Bagel._
-
-@serializable class PREdge(val targetId: String) extends Edge
-
-@serializable class PRVertex(
- val id: String, val rank: Double, val outEdges: Seq[Edge],
- val active: Boolean) extends Vertex
-
-@serializable class PRMessage(
- val targetId: String, val rankShare: Double) extends Message
-{% endhighlight %}
-
-Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it.
-
-{% highlight scala %}
-val input = sc.textFile("data/mllib/pagerank_data.txt")
-
-val numVerts = input.count()
-
-val verts = input.map(line => {
- val fields = line.split('\t')
- val (id, linksStr) = (fields(0), fields(1))
- val links = linksStr.split(',').map(new PREdge(_))
- (id, new PRVertex(id, 1.0 / numVerts, links, true))
-}).cache
-{% endhighlight %}
-
-We run the Bagel job, passing in `verts`, an empty distributed dataset of messages, and a custom compute function that runs PageRank for 10 iterations.
-
-{% highlight scala %}
-val emptyMsgs = sc.parallelize(List[(String, PRMessage)]())
-
-def compute(self: PRVertex, msgs: Option[Seq[PRMessage]], superstep: Int)
-: (PRVertex, Iterable[PRMessage]) = {
- val msgSum = msgs.getOrElse(List()).map(_.rankShare).sum
- val newRank =
- if (msgSum != 0)
- 0.15 / numVerts + 0.85 * msgSum
- else
- self.rank
- val halt = superstep >= 10
- val msgsOut =
- if (!halt)
- self.outEdges.map(edge =>
- new PRMessage(edge.targetId, newRank / self.outEdges.size))
- else
- List()
- (new PRVertex(self.id, newRank, self.outEdges, !halt), msgsOut)
-}
-{% endhighlight %}
-
-val result = Bagel.run(sc, verts, emptyMsgs)()(compute)
-
-Finally, we print the results.
-
-{% highlight scala %}
-println(result.map(v => "%s\t%s\n".format(v.id, v.rank)).collect.mkString)
-{% endhighlight %}
-
-## Combiners
-
-Sending a message to another vertex generally involves expensive communication over the network. For certain algorithms, it's possible to reduce the amount of communication using _combiners_. For example, if the compute function receives integer messages and only uses their sum, it's possible for Bagel to combine multiple messages to the same vertex by summing them.
-
-For combiner support, Bagel can optionally take a set of combiner functions that convert messages to their combined form.
-
-_Example: PageRank with combiners_
-
-## Aggregators
-
-Aggregators perform a reduce across all vertices after each superstep, and provide the result to each vertex in the next superstep.
-
-For aggregator support, Bagel can optionally take an aggregator function that reduces across each vertex.
-
-_Example_
-
-## Operations
-
-Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/apache/spark/blob/master/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala) for details.
-
-### Actions
-
-{% highlight scala %}
-/*** Full form ***/
-
-Bagel.run(sc, vertices, messages, combiner, aggregator, partitioner, numSplits)(compute)
-// where compute takes (vertex: V, combinedMessages: Option[C], aggregated: Option[A], superstep: Int)
-// and returns (newVertex: V, outMessages: Array[M])
-
-/*** Abbreviated forms ***/
-
-Bagel.run(sc, vertices, messages, combiner, partitioner, numSplits)(compute)
-// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
-// and returns (newVertex: V, outMessages: Array[M])
-
-Bagel.run(sc, vertices, messages, combiner, numSplits)(compute)
-// where compute takes (vertex: V, combinedMessages: Option[C], superstep: Int)
-// and returns (newVertex: V, outMessages: Array[M])
-
-Bagel.run(sc, vertices, messages, numSplits)(compute)
-// where compute takes (vertex: V, messages: Option[Array[M]], superstep: Int)
-// and returns (newVertex: V, outMessages: Array[M])
-{% endhighlight %}
-
-### Types
-
-{% highlight scala %}
-trait Combiner[M, C] {
- def createCombiner(msg: M): C
- def mergeMsg(combiner: C, msg: M): C
- def mergeCombiners(a: C, b: C): C
-}
-
-trait Aggregator[V, A] {
- def createAggregator(vert: V): A
- def mergeAggregators(a: A, b: A): A
-}
-
-trait Vertex {
- def active: Boolean
-}
-
-trait Message[K] {
- def targetId: K
-}
-{% endhighlight %}
diff --git a/examples/pom.xml b/examples/pom.xml
index d27a5096a6..1a0d5e5854 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -55,12 +55,6 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 55fe156cf6..68af14397b 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -146,7 +146,7 @@ abstract class AbstractCommandBuilder {
boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
if (prependClasses || isTesting) {
String scala = getScalaVersion();
- List<String> projects = Arrays.asList("core", "repl", "mllib", "bagel", "graphx",
+ List<String> projects = Arrays.asList("core", "repl", "mllib", "graphx",
"streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
"yarn", "launcher", "network/common", "network/shuffle", "network/yarn");
if (prependClasses) {
diff --git a/pom.xml b/pom.xml
index 1f570727dc..32918d6a74 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,7 +88,6 @@
<modules>
<module>tags</module>
<module>core</module>
- <module>bagel</module> <!-- Deprecated -->
<module>graphx</module>
<module>mllib</module>
<module>tools</module>
@@ -194,7 +193,7 @@
declared in the projects that build assemblies.
For other projects the scope should remain as "compile", otherwise they are not available
- during compilation if the dependency is transivite (e.g. "bagel/" depending on "core/" and
+ during compilation if the dependency is transivite (e.g. "graphx/" depending on "core/" and
needing Hadoop classes in the classpath to compile).
-->
<flume.deps.scope>compile</flume.deps.scope>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index b1dcaedcba..c3d53f835f 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -34,10 +34,10 @@ object BuildCommons {
private val buildLocation = file(".").getAbsoluteFile.getParentFile
- val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
+ val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) =
- Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
+ Seq("catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
"sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
"streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
"streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _))
@@ -352,7 +352,7 @@ object OldDeps {
scalaVersion := "2.10.5",
libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
"spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
- "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
+ "spark-streaming", "spark-mllib", "spark-graphx",
"spark-core").map(versionArtifact(_).get intransitive())
)
}
@@ -556,7 +556,7 @@ object Unidoc {
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, testTags),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn, testTags),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, testTags),
// Skip actual catalyst, but include the subproject.
// Catalyst is not public API and contains quasiquotes which break scaladoc.
diff --git a/repl/pom.xml b/repl/pom.xml
index 67f9866509..efc3dd452e 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -52,12 +52,6 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>