aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-12-19 22:40:35 -0800
committerReynold Xin <rxin@databricks.com>2015-12-19 22:40:35 -0800
commit284e29a870bbb62f59988a5d88cd12f1b0b6f9d3 (patch)
treedc1f029020716e90950c9208b81b821c11dda164 /bagel
parent0c4d6ad87389286280209b3f84a7fdc4d4be1441 (diff)
downloadspark-284e29a870bbb62f59988a5d88cd12f1b0b6f9d3.tar.gz
spark-284e29a870bbb62f59988a5d88cd12f1b0b6f9d3.tar.bz2
spark-284e29a870bbb62f59988a5d88cd12f1b0b6f9d3.zip
[SPARK-11808] Remove Bagel.
Author: Reynold Xin <rxin@databricks.com> Closes #10395 from rxin/SPARK-11808.
Diffstat (limited to 'bagel')
-rw-r--r--bagel/pom.xml64
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala318
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/package-info.java21
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/package.scala23
4 files changed, 0 insertions, 426 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml
deleted file mode 100644
index d45224cc80..0000000000
--- a/bagel/pom.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.10</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel_2.10</artifactId>
- <properties>
- <sbt.project.name>bagel</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project Bagel</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
deleted file mode 100644
index 8399033ac6..0000000000
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.bagel
-
-import org.apache.spark._
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-object Bagel extends Logging {
- val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
-
- /**
- * Runs a Bagel program.
- * @param sc org.apache.spark.SparkContext to use for the program.
- * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
- * Key will be the vertex id.
- * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
- * this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
- * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
- * given vertex into one message before sending (which often involves network
- * I/O).
- * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
- * after each superstep and provides the result to each vertex in the next
- * superstep.
- * @param partitioner org.apache.spark.Partitioner partitions values by key
- * @param numPartitions number of partitions across which to split the graph.
- * Default is the default parallelism of the SparkContext
- * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
- * intermediate RDDs in each superstep. Defaults to caching in memory.
- * @param compute function that takes a Vertex, optional set of (possibly combined) messages to
- * the Vertex, optional Aggregator and the current superstep,
- * and returns a set of (Vertex, outgoing Messages) pairs
- * @tparam K key
- * @tparam V vertex type
- * @tparam M message type
- * @tparam C combiner
- * @tparam A aggregator
- * @return an RDD of (K, V) pairs representing the graph after completion of the program
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest,
- C: Manifest, A: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- aggregator: Option[Aggregator[V, A]],
- partitioner: Partitioner,
- numPartitions: Int,
- storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
- )(
- compute: (V, Option[C], Option[A], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- val splits = if (numPartitions != 0) numPartitions else sc.defaultParallelism
-
- var superstep = 0
- var verts = vertices
- var msgs = messages
- var noActivity = false
- var lastRDD: RDD[(K, (V, Array[M]))] = null
- do {
- logInfo("Starting superstep " + superstep + ".")
- val startTime = System.currentTimeMillis
-
- val aggregated = agg(verts, aggregator)
- val combinedMsgs = msgs.combineByKeyWithClassTag(
- combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
- val grouped = combinedMsgs.groupWith(verts)
- val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
- val (processed, numMsgs, numActiveVerts) =
- comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
- if (lastRDD != null) {
- lastRDD.unpersist(false)
- }
- lastRDD = processed
-
- val timeTaken = System.currentTimeMillis - startTime
- logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
-
- verts = processed.mapValues { case (vert, msgs) => vert }
- msgs = processed.flatMap {
- case (id, (vert, msgs)) => msgs.map(m => (m.targetId, m))
- }
- superstep += 1
-
- noActivity = numMsgs == 0 && numActiveVerts == 0
- } while (!noActivity)
-
- verts
- }
-
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default
- * storage level */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- partitioner: Partitioner,
- numPartitions: Int
- )(
- compute: (V, Option[C], Int) => (V, Array[M])): RDD[(K, V)] = run(sc, vertices, messages,
- combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
-
- /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- partitioner: Partitioner,
- numPartitions: Int,
- storageLevel: StorageLevel
- )(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, partitioner, numPartitions, storageLevel)(
- addAggregatorArg[K, V, M, C](compute))
- }
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
- * org.apache.spark.HashPartitioner and default storage level
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- numPartitions: Int
- )(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions,
- DEFAULT_STORAGE_LEVEL)(compute)
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
- * default org.apache.spark.HashPartitioner
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- combiner: Combiner[M, C],
- numPartitions: Int,
- storageLevel: StorageLevel
- )(
- compute: (V, Option[C], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- val part = new HashPartitioner(numPartitions)
- run[K, V, M, C, Nothing](
- sc, vertices, messages, combiner, None, part, numPartitions, storageLevel)(
- addAggregatorArg[K, V, M, C](compute))
- }
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * default org.apache.spark.HashPartitioner,
- * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- numPartitions: Int
- )(
- compute: (V, Option[Array[M]], Int) => (V, Array[M])
- ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
-
- /**
- * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
- * the default org.apache.spark.HashPartitioner
- * and [[org.apache.spark.bagel.DefaultCombiner]]
- */
- def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
- sc: SparkContext,
- vertices: RDD[(K, V)],
- messages: RDD[(K, M)],
- numPartitions: Int,
- storageLevel: StorageLevel
- )(
- compute: (V, Option[Array[M]], Int) => (V, Array[M])
- ): RDD[(K, V)] = {
- val part = new HashPartitioner(numPartitions)
- run[K, V, M, Array[M], Nothing](
- sc, vertices, messages, new DefaultCombiner(), None, part, numPartitions, storageLevel)(
- addAggregatorArg[K, V, M, Array[M]](compute))
- }
-
- /**
- * Aggregates the given vertices using the given aggregator, if it
- * is specified.
- */
- private def agg[K, V <: Vertex, A: Manifest](
- verts: RDD[(K, V)],
- aggregator: Option[Aggregator[V, A]]
- ): Option[A] = aggregator match {
- case Some(a) =>
- Some(verts.map {
- case (id, vert) => a.createAggregator(vert)
- }.reduce(a.mergeAggregators(_, _)))
- case None => None
- }
-
- /**
- * Processes the given vertex-message RDD using the compute
- * function. Returns the processed RDD, the number of messages
- * created, and the number of active vertices.
- */
- private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
- sc: SparkContext,
- grouped: RDD[(K, (Iterable[C], Iterable[V]))],
- compute: (V, Option[C]) => (V, Array[M]),
- storageLevel: StorageLevel
- ): (RDD[(K, (V, Array[M]))], Int, Int) = {
- var numMsgs = sc.accumulator(0)
- var numActiveVerts = sc.accumulator(0)
- val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
- .flatMapValues {
- case (_, vs) if !vs.hasNext => None
- case (c, vs) => {
- val (newVert, newMsgs) =
- compute(vs.next,
- c.hasNext match {
- case true => Some(c.next)
- case false => None
- }
- )
-
- numMsgs += newMsgs.size
- if (newVert.active) {
- numActiveVerts += 1
- }
-
- Some((newVert, newMsgs))
- }
- }.persist(storageLevel)
-
- // Force evaluation of processed RDD for accurate performance measurements
- processed.foreach(x => {})
-
- (processed, numMsgs.value, numActiveVerts.value)
- }
-
- /**
- * Converts a compute function that doesn't take an aggregator to
- * one that does, so it can be passed to Bagel.run.
- */
- private def addAggregatorArg[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C](
- compute: (V, Option[C], Int) => (V, Array[M])
- ): (V, Option[C], Option[Nothing], Int) => (V, Array[M]) = {
- (vert: V, msgs: Option[C], aggregated: Option[Nothing], superstep: Int) =>
- compute(vert, msgs, superstep)
- }
-}
-
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Combiner[M, C] {
- def createCombiner(msg: M): C
- def mergeMsg(combiner: C, msg: M): C
- def mergeCombiners(a: C, b: C): C
-}
-
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Aggregator[V, A] {
- def createAggregator(vert: V): A
- def mergeAggregators(a: A, b: A): A
-}
-
-/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
- def createCombiner(msg: M): Array[M] =
- Array(msg)
- def mergeMsg(combiner: Array[M], msg: M): Array[M] =
- combiner :+ msg
- def mergeCombiners(a: Array[M], b: Array[M]): Array[M] =
- a ++ b
-}
-
-/**
- * Represents a Bagel vertex.
- *
- * Subclasses may store state along with each vertex and must
- * inherit from java.io.Serializable or scala.Serializable.
- */
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Vertex {
- def active: Boolean
-}
-
-/**
- * Represents a Bagel message to a target vertex.
- *
- * Subclasses may contain a payload to deliver to the target vertex
- * and must inherit from java.io.Serializable or scala.Serializable.
- */
-@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
-trait Message[K] {
- def targetId: K
-}
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/package-info.java b/bagel/src/main/scala/org/apache/spark/bagel/package-info.java
deleted file mode 100644
index 81f26f2765..0000000000
--- a/bagel/src/main/scala/org/apache/spark/bagel/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Bagel: An implementation of Pregel in Spark. THIS IS DEPRECATED - use Spark's GraphX library.
- */
-package org.apache.spark.bagel; \ No newline at end of file
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/package.scala b/bagel/src/main/scala/org/apache/spark/bagel/package.scala
deleted file mode 100644
index 2fb1934579..0000000000
--- a/bagel/src/main/scala/org/apache/spark/bagel/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-/**
- * Bagel: An implementation of Pregel in Spark. THIS IS DEPRECATED - use Spark's GraphX library.
- */
-package object bagel