aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-09-06 17:53:01 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-09-06 17:53:01 +0530
commit4106ae9fbf8a582697deba2198b3b966dec00bfe (patch)
tree7c3046faee5f62f9ec4c4176125988d7cb5d70e2 /bagel
parente0dd24dc858777904335218f3001a24bffe73b27 (diff)
parent5c7494d7c1b7301138fb3dc155a1b0c961126ec6 (diff)
downloadspark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.gz
spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.bz2
spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.zip
Merged with master
Diffstat (limited to 'bagel')
-rw-r--r--bagel/pom.xml128
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala (renamed from bagel/src/main/scala/spark/bagel/Bagel.scala)52
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala106
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala84
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala206
-rw-r--r--bagel/src/test/resources/log4j.properties19
-rw-r--r--bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala (renamed from bagel/src/test/scala/bagel/BagelSuite.scala)27
7 files changed, 98 insertions, 524 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml
index b83a0ef6c0..51173c32b2 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -1,25 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>org.spark-project</groupId>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <groupId>org.spark-project</groupId>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-bagel</artifactId>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
- <url>http://spark-project.org/</url>
+ <url>http://spark.incubator.apache.org/</url>
<dependencies>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
-
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
@@ -41,103 +62,4 @@
</plugin>
</plugins>
</build>
-
- <profiles>
- <profile>
- <id>hadoop1</id>
- <dependencies>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- <classifier>hadoop1</classifier>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop1</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>hadoop2</id>
- <dependencies>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- <classifier>hadoop2</classifier>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop2</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-core</artifactId>
- <version>${project.version}</version>
- <classifier>hadoop2-yarn</classifier>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop2-yarn</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
</project>
diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 5ecdd7d004..44e26bbb9e 100644
--- a/bagel/src/main/scala/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -1,29 +1,45 @@
-package spark.bagel
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import spark._
-import spark.SparkContext._
+package org.apache.spark.bagel
-import scala.collection.mutable.ArrayBuffer
-import storage.StorageLevel
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
object Bagel extends Logging {
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
/**
* Runs a Bagel program.
- * @param sc [[spark.SparkContext]] to use for the program.
+ * @param sc [[org.apache.spark.SparkContext]] to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
* the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
* empty array, i.e. sc.parallelize(Array[K, Message]()).
- * @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
+ * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
* message before sending (which often involves network I/O).
- * @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
+ * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
* and provides the result to each vertex in the next superstep.
- * @param partitioner [[spark.Partitioner]] partitions values by key
+ * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
- * @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
+ * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
* Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
* optional Aggregator and the current superstep,
@@ -81,7 +97,7 @@ object Bagel extends Logging {
verts
}
- /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -93,7 +109,7 @@ object Bagel extends Logging {
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
- /** Runs a Bagel program with no [[spark.bagel.Aggregator]] */
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -111,7 +127,7 @@ object Bagel extends Logging {
}
/**
- * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]]
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]]
* and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
@@ -124,7 +140,7 @@ object Bagel extends Logging {
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
- /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/
+ /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
@@ -142,8 +158,8 @@ object Bagel extends Logging {
}
/**
- * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]],
- * [[spark.bagel.DefaultCombiner]] and the default storage level
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]],
+ * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
@@ -155,8 +171,8 @@ object Bagel extends Logging {
): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute)
/**
- * Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]]
- * and [[spark.bagel.DefaultCombiner]]
+ * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]]
+ * and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
diff --git a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
deleted file mode 100644
index b97d786ed4..0000000000
--- a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-package spark.bagel.examples
-
-import spark._
-import spark.SparkContext._
-
-import spark.bagel._
-import spark.bagel.Bagel._
-
-import scala.collection.mutable.ArrayBuffer
-
-import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
-
-import com.esotericsoftware.kryo._
-
-class PageRankUtils extends Serializable {
- def computeWithCombiner(numVertices: Long, epsilon: Double)(
- self: PRVertex, messageSum: Option[Double], superstep: Int
- ): (PRVertex, Array[PRMessage]) = {
- val newValue = messageSum match {
- case Some(msgSum) if msgSum != 0 =>
- 0.15 / numVertices + 0.85 * msgSum
- case _ => self.value
- }
-
- val terminate = superstep >= 10
-
- val outbox: Array[PRMessage] =
- if (!terminate)
- self.outEdges.map(targetId =>
- new PRMessage(targetId, newValue / self.outEdges.size))
- else
- Array[PRMessage]()
-
- (new PRVertex(newValue, self.outEdges, !terminate), outbox)
- }
-
- def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) =
- computeWithCombiner(numVertices, epsilon)(self, messages match {
- case Some(msgs) => Some(msgs.map(_.value).sum)
- case None => None
- }, superstep)
-}
-
-class PRCombiner extends Combiner[PRMessage, Double] with Serializable {
- def createCombiner(msg: PRMessage): Double =
- msg.value
- def mergeMsg(combiner: Double, msg: PRMessage): Double =
- combiner + msg.value
- def mergeCombiners(a: Double, b: Double): Double =
- a + b
-}
-
-class PRVertex() extends Vertex with Serializable {
- var value: Double = _
- var outEdges: Array[String] = _
- var active: Boolean = _
-
- def this(value: Double, outEdges: Array[String], active: Boolean = true) {
- this()
- this.value = value
- this.outEdges = outEdges
- this.active = active
- }
-
- override def toString(): String = {
- "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString)
- }
-}
-
-class PRMessage() extends Message[String] with Serializable {
- var targetId: String = _
- var value: Double = _
-
- def this(targetId: String, value: Double) {
- this()
- this.targetId = targetId
- this.value = value
- }
-}
-
-class PRKryoRegistrator extends KryoRegistrator {
- def registerClasses(kryo: Kryo) {
- kryo.register(classOf[PRVertex])
- kryo.register(classOf[PRMessage])
- }
-}
-
-class CustomPartitioner(partitions: Int) extends Partitioner {
- def numPartitions = partitions
-
- def getPartition(key: Any): Int = {
- val hash = key match {
- case k: Long => (k & 0x00000000FFFFFFFFL).toInt
- case _ => key.hashCode
- }
-
- val mod = key.hashCode % partitions
- if (mod < 0) mod + partitions else mod
- }
-
- override def equals(other: Any): Boolean = other match {
- case c: CustomPartitioner =>
- c.numPartitions == numPartitions
- case _ => false
- }
-}
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
deleted file mode 100644
index bc32663e0f..0000000000
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-package spark.bagel.examples
-
-import spark._
-import spark.SparkContext._
-
-import spark.bagel._
-import spark.bagel.Bagel._
-
-import scala.xml.{XML,NodeSeq}
-
-/**
- * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles"
- * files from there, which contains one line per wiki article in a tab-separated format
- * (http://wiki.freebase.com/wiki/WEX/Documentation#articles).
- */
-object WikipediaPageRank {
- def main(args: Array[String]) {
- if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
- System.exit(-1)
- }
-
- System.setProperty("spark.serializer", "spark.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
-
- val inputFile = args(0)
- val threshold = args(1).toDouble
- val numPartitions = args(2).toInt
- val host = args(3)
- val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRank")
-
- // Parse the Wikipedia page data into a graph
- val input = sc.textFile(inputFile)
-
- println("Counting vertices...")
- val numVertices = input.count()
- println("Done counting vertices.")
-
- println("Parsing input file...")
- var vertices = input.map(line => {
- val fields = line.split("\t")
- val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
- val links =
- if (body == "\\N")
- NodeSeq.Empty
- else
- try {
- XML.loadString(body) \\ "link" \ "target"
- } catch {
- case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
- NodeSeq.Empty
- }
- val outEdges = links.map(link => new String(link.text)).toArray
- val id = new String(title)
- (id, new PRVertex(1.0 / numVertices, outEdges))
- })
- if (usePartitioner)
- vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
- else
- vertices = vertices.cache
- println("Done parsing input file.")
-
- // Do the computation
- val epsilon = 0.01 / numVertices
- val messages = sc.parallelize(Array[(String, PRMessage)]())
- val utils = new PageRankUtils
- val result =
- Bagel.run(
- sc, vertices, messages, combiner = new PRCombiner(),
- numPartitions = numPartitions)(
- utils.computeWithCombiner(numVertices, epsilon))
-
- // Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
- val top =
- (result
- .filter { case (id, vertex) => vertex.value >= threshold }
- .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
- .collect.mkString)
- println(top)
- }
-}
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
deleted file mode 100644
index 9d9d80d809..0000000000
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-package spark.bagel.examples
-
-import spark._
-import serializer.{DeserializationStream, SerializationStream, SerializerInstance}
-import spark.SparkContext._
-
-import spark.bagel._
-import spark.bagel.Bagel._
-
-import scala.xml.{XML,NodeSeq}
-
-import scala.collection.mutable.ArrayBuffer
-
-import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
-import java.nio.ByteBuffer
-
-object WikipediaPageRankStandalone {
- def main(args: Array[String]) {
- if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
- System.exit(-1)
- }
-
- System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer")
-
- val inputFile = args(0)
- val threshold = args(1).toDouble
- val numIterations = args(2).toInt
- val host = args(3)
- val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRankStandalone")
-
- val input = sc.textFile(inputFile)
- val partitioner = new HashPartitioner(sc.defaultParallelism)
- val links =
- if (usePartitioner)
- input.map(parseArticle _).partitionBy(partitioner).cache()
- else
- input.map(parseArticle _).cache()
- val n = links.count()
- val defaultRank = 1.0 / n
- val a = 0.15
-
- // Do the computation
- val startTime = System.currentTimeMillis
- val ranks =
- pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism)
-
- // Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
- val top =
- (ranks
- .filter { case (id, rank) => rank >= threshold }
- .map { case (id, rank) => "%s\t%s\n".format(id, rank) }
- .collect().mkString)
- println(top)
-
- val time = (System.currentTimeMillis - startTime) / 1000.0
- println("Completed %d iterations in %f seconds: %f seconds per iteration"
- .format(numIterations, time, time / numIterations))
- System.exit(0)
- }
-
- def parseArticle(line: String): (String, Array[String]) = {
- val fields = line.split("\t")
- val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
- val id = new String(title)
- val links =
- if (body == "\\N")
- NodeSeq.Empty
- else
- try {
- XML.loadString(body) \\ "link" \ "target"
- } catch {
- case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
- NodeSeq.Empty
- }
- val outEdges = links.map(link => new String(link.text)).toArray
- (id, outEdges)
- }
-
- def pageRank(
- links: RDD[(String, Array[String])],
- numIterations: Int,
- defaultRank: Double,
- a: Double,
- n: Long,
- partitioner: Partitioner,
- usePartitioner: Boolean,
- numPartitions: Int
- ): RDD[(String, Double)] = {
- var ranks = links.mapValues { edges => defaultRank }
- for (i <- 1 to numIterations) {
- val contribs = links.groupWith(ranks).flatMap {
- case (id, (linksWrapper, rankWrapper)) =>
- if (linksWrapper.length > 0) {
- if (rankWrapper.length > 0) {
- linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size))
- } else {
- linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size))
- }
- } else {
- Array[(String, Double)]()
- }
- }
- ranks = (contribs.combineByKey((x: Double) => x,
- (x: Double, y: Double) => x + y,
- (x: Double, y: Double) => x + y,
- partitioner)
- .mapValues(sum => a/n + (1-a)*sum))
- }
- ranks
- }
-}
-
-class WPRSerializer extends spark.serializer.Serializer {
- def newInstance(): SerializerInstance = new WPRSerializerInstance()
-}
-
-class WPRSerializerInstance extends SerializerInstance {
- def serialize[T](t: T): ByteBuffer = {
- throw new UnsupportedOperationException()
- }
-
- def deserialize[T](bytes: ByteBuffer): T = {
- throw new UnsupportedOperationException()
- }
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- throw new UnsupportedOperationException()
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new WPRSerializationStream(s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new WPRDeserializationStream(s)
- }
-}
-
-class WPRSerializationStream(os: OutputStream) extends SerializationStream {
- val dos = new DataOutputStream(os)
-
- def writeObject[T](t: T): SerializationStream = t match {
- case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match {
- case links: Array[String] => {
- dos.writeInt(0) // links
- dos.writeUTF(id)
- dos.writeInt(links.length)
- for (link <- links) {
- dos.writeUTF(link)
- }
- this
- }
- case rank: Double => {
- dos.writeInt(1) // rank
- dos.writeUTF(id)
- dos.writeDouble(rank)
- this
- }
- }
- case (id: String, rank: Double) => {
- dos.writeInt(2) // rank without wrapper
- dos.writeUTF(id)
- dos.writeDouble(rank)
- this
- }
- }
-
- def flush() { dos.flush() }
- def close() { dos.close() }
-}
-
-class WPRDeserializationStream(is: InputStream) extends DeserializationStream {
- val dis = new DataInputStream(is)
-
- def readObject[T](): T = {
- val typeId = dis.readInt()
- typeId match {
- case 0 => {
- val id = dis.readUTF()
- val numLinks = dis.readInt()
- val links = new Array[String](numLinks)
- for (i <- 0 until numLinks) {
- val link = dis.readUTF()
- links(i) = link
- }
- (id, ArrayBuffer(links)).asInstanceOf[T]
- }
- case 1 => {
- val id = dis.readUTF()
- val rank = dis.readDouble()
- (id, ArrayBuffer(rank)).asInstanceOf[T]
- }
- case 2 => {
- val id = dis.readUTF()
- val rank = dis.readDouble()
- (id, rank).asInstanceOf[T]
- }
- }
- }
-
- def close() { dis.close() }
-}
diff --git a/bagel/src/test/resources/log4j.properties b/bagel/src/test/resources/log4j.properties
index 83d05cab2f..5cdcf35b23 100644
--- a/bagel/src/test/resources/log4j.properties
+++ b/bagel/src/test/resources/log4j.properties
@@ -1,4 +1,21 @@
-# Set everything to be logged to the file bagel/target/unit-tests.log
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file bagel/target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
index a09c978068..7b954a4775 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
@@ -1,13 +1,28 @@
-package spark.bagel
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
+package org.apache.spark.bagel
+
+import org.scalatest.{BeforeAndAfter, FunSuite, Assertions}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
-import scala.collection.mutable.ArrayBuffer
-
-import spark._
-import storage.StorageLevel
+import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable