diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
commit | 4106ae9fbf8a582697deba2198b3b966dec00bfe (patch) | |
tree | 7c3046faee5f62f9ec4c4176125988d7cb5d70e2 /bagel | |
parent | e0dd24dc858777904335218f3001a24bffe73b27 (diff) | |
parent | 5c7494d7c1b7301138fb3dc155a1b0c961126ec6 (diff) | |
download | spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.gz spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.bz2 spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.zip |
Merged with master
Diffstat (limited to 'bagel')
-rw-r--r-- | bagel/pom.xml | 128 | ||||
-rw-r--r-- | bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala (renamed from bagel/src/main/scala/spark/bagel/Bagel.scala) | 52 | ||||
-rw-r--r-- | bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala | 106 | ||||
-rw-r--r-- | bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala | 84 | ||||
-rw-r--r-- | bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala | 206 | ||||
-rw-r--r-- | bagel/src/test/resources/log4j.properties | 19 | ||||
-rw-r--r-- | bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala (renamed from bagel/src/test/scala/bagel/BagelSuite.scala) | 27 |
7 files changed, 98 insertions, 524 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml index b83a0ef6c0..51173c32b2 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -1,25 +1,46 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> - <groupId>org.spark-project</groupId> + <groupId>org.apache.spark</groupId> <artifactId>spark-parent</artifactId> <version>0.8.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> - <groupId>org.spark-project</groupId> + <groupId>org.apache.spark</groupId> <artifactId>spark-bagel</artifactId> <packaging>jar</packaging> <name>Spark Project Bagel</name> - <url>http://spark-project.org/</url> + <url>http://spark.incubator.apache.org/</url> <dependencies> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.version}</artifactId> @@ -41,103 +62,4 @@ </plugin> </plugins> </build> - - <profiles> - <profile> - <id>hadoop1</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop1</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop1</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>hadoop2-yarn</id> - <dependencies> - <dependency> - <groupId>org.spark-project</groupId> - <artifactId>spark-core</artifactId> - <version>${project.version}</version> - <classifier>hadoop2-yarn</classifier> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <classifier>hadoop2-yarn</classifier> - </configuration> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> diff --git a/bagel/src/main/scala/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index 5ecdd7d004..44e26bbb9e 100644 --- a/bagel/src/main/scala/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -1,29 +1,45 @@ -package spark.bagel +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import spark._ -import spark.SparkContext._ +package org.apache.spark.bagel -import scala.collection.mutable.ArrayBuffer -import storage.StorageLevel +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel object Bagel extends Logging { val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK /** * Runs a Bagel program. - * @param sc [[spark.SparkContext]] to use for the program. + * @param sc [[org.apache.spark.SparkContext]] to use for the program. * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be * the vertex id. * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an * empty array, i.e. sc.parallelize(Array[K, Message]()). - * @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one + * @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one * message before sending (which often involves network I/O). - * @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep, + * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep, * and provides the result to each vertex in the next superstep. - * @param partitioner [[spark.Partitioner]] partitions values by key + * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key * @param numPartitions number of partitions across which to split the graph. * Default is the default parallelism of the SparkContext - * @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep. + * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep. * Defaults to caching in memory. * @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex, * optional Aggregator and the current superstep, @@ -81,7 +97,7 @@ object Bagel extends Logging { verts } - /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */ + /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -93,7 +109,7 @@ object Bagel extends Logging { compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) - /** Runs a Bagel program with no [[spark.bagel.Aggregator]] */ + /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -111,7 +127,7 @@ object Bagel extends Logging { } /** - * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]] + * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]] * and default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( @@ -124,7 +140,7 @@ object Bagel extends Logging { compute: (V, Option[C], Int) => (V, Array[M]) ): RDD[(K, V)] = run(sc, vertices, messages, combiner, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) - /** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/ + /** Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the default [[org.apache.spark.HashPartitioner]]*/ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, vertices: RDD[(K, V)], @@ -142,8 +158,8 @@ object Bagel extends Logging { } /** - * Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]], - * [[spark.bagel.DefaultCombiner]] and the default storage level + * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default [[org.apache.spark.HashPartitioner]], + * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( sc: SparkContext, @@ -155,8 +171,8 @@ object Bagel extends Logging { ): RDD[(K, V)] = run(sc, vertices, messages, numPartitions, DEFAULT_STORAGE_LEVEL)(compute) /** - * Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]] - * and [[spark.bagel.DefaultCombiner]] + * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], the default [[org.apache.spark.HashPartitioner]] + * and [[org.apache.spark.bagel.DefaultCombiner]] */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( sc: SparkContext, diff --git a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala deleted file mode 100644 index b97d786ed4..0000000000 --- a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala +++ /dev/null @@ -1,106 +0,0 @@ -package spark.bagel.examples - -import spark._ -import spark.SparkContext._ - -import spark.bagel._ -import spark.bagel.Bagel._ - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} - -import com.esotericsoftware.kryo._ - -class PageRankUtils extends Serializable { - def computeWithCombiner(numVertices: Long, epsilon: Double)( - self: PRVertex, messageSum: Option[Double], superstep: Int - ): (PRVertex, Array[PRMessage]) = { - val newValue = messageSum match { - case Some(msgSum) if msgSum != 0 => - 0.15 / numVertices + 0.85 * msgSum - case _ => self.value - } - - val terminate = superstep >= 10 - - val outbox: Array[PRMessage] = - if (!terminate) - self.outEdges.map(targetId => - new PRMessage(targetId, newValue / self.outEdges.size)) - else - Array[PRMessage]() - - (new PRVertex(newValue, self.outEdges, !terminate), outbox) - } - - def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) = - computeWithCombiner(numVertices, epsilon)(self, messages match { - case Some(msgs) => Some(msgs.map(_.value).sum) - case None => None - }, superstep) -} - -class PRCombiner extends Combiner[PRMessage, Double] with Serializable { - def createCombiner(msg: PRMessage): Double = - msg.value - def mergeMsg(combiner: Double, msg: PRMessage): Double = - combiner + msg.value - def mergeCombiners(a: Double, b: Double): Double = - a + b -} - -class PRVertex() extends Vertex with Serializable { - var value: Double = _ - var outEdges: Array[String] = _ - var active: Boolean = _ - - def this(value: Double, outEdges: Array[String], active: Boolean = true) { - this() - this.value = value - this.outEdges = outEdges - this.active = active - } - - override def toString(): String = { - "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString) - } -} - -class PRMessage() extends Message[String] with Serializable { - var targetId: String = _ - var value: Double = _ - - def this(targetId: String, value: Double) { - this() - this.targetId = targetId - this.value = value - } -} - -class PRKryoRegistrator extends KryoRegistrator { - def registerClasses(kryo: Kryo) { - kryo.register(classOf[PRVertex]) - kryo.register(classOf[PRMessage]) - } -} - -class CustomPartitioner(partitions: Int) extends Partitioner { - def numPartitions = partitions - - def getPartition(key: Any): Int = { - val hash = key match { - case k: Long => (k & 0x00000000FFFFFFFFL).toInt - case _ => key.hashCode - } - - val mod = key.hashCode % partitions - if (mod < 0) mod + partitions else mod - } - - override def equals(other: Any): Boolean = other match { - case c: CustomPartitioner => - c.numPartitions == numPartitions - case _ => false - } -} diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala deleted file mode 100644 index bc32663e0f..0000000000 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala +++ /dev/null @@ -1,84 +0,0 @@ -package spark.bagel.examples - -import spark._ -import spark.SparkContext._ - -import spark.bagel._ -import spark.bagel.Bagel._ - -import scala.xml.{XML,NodeSeq} - -/** - * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles" - * files from there, which contains one line per wiki article in a tab-separated format - * (http://wiki.freebase.com/wiki/WEX/Documentation#articles). - */ -object WikipediaPageRank { - def main(args: Array[String]) { - if (args.length < 5) { - System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>") - System.exit(-1) - } - - System.setProperty("spark.serializer", "spark.KryoSerializer") - System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) - - val inputFile = args(0) - val threshold = args(1).toDouble - val numPartitions = args(2).toInt - val host = args(3) - val usePartitioner = args(4).toBoolean - val sc = new SparkContext(host, "WikipediaPageRank") - - // Parse the Wikipedia page data into a graph - val input = sc.textFile(inputFile) - - println("Counting vertices...") - val numVertices = input.count() - println("Done counting vertices.") - - println("Parsing input file...") - var vertices = input.map(line => { - val fields = line.split("\t") - val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) - val links = - if (body == "\\N") - NodeSeq.Empty - else - try { - XML.loadString(body) \\ "link" \ "target" - } catch { - case e: org.xml.sax.SAXParseException => - System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) - NodeSeq.Empty - } - val outEdges = links.map(link => new String(link.text)).toArray - val id = new String(title) - (id, new PRVertex(1.0 / numVertices, outEdges)) - }) - if (usePartitioner) - vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache - else - vertices = vertices.cache - println("Done parsing input file.") - - // Do the computation - val epsilon = 0.01 / numVertices - val messages = sc.parallelize(Array[(String, PRMessage)]()) - val utils = new PageRankUtils - val result = - Bagel.run( - sc, vertices, messages, combiner = new PRCombiner(), - numPartitions = numPartitions)( - utils.computeWithCombiner(numVertices, epsilon)) - - // Print the result - System.err.println("Articles with PageRank >= "+threshold+":") - val top = - (result - .filter { case (id, vertex) => vertex.value >= threshold } - .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) } - .collect.mkString) - println(top) - } -} diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala deleted file mode 100644 index 9d9d80d809..0000000000 --- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala +++ /dev/null @@ -1,206 +0,0 @@ -package spark.bagel.examples - -import spark._ -import serializer.{DeserializationStream, SerializationStream, SerializerInstance} -import spark.SparkContext._ - -import spark.bagel._ -import spark.bagel.Bagel._ - -import scala.xml.{XML,NodeSeq} - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} -import java.nio.ByteBuffer - -object WikipediaPageRankStandalone { - def main(args: Array[String]) { - if (args.length < 5) { - System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>") - System.exit(-1) - } - - System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer") - - val inputFile = args(0) - val threshold = args(1).toDouble - val numIterations = args(2).toInt - val host = args(3) - val usePartitioner = args(4).toBoolean - val sc = new SparkContext(host, "WikipediaPageRankStandalone") - - val input = sc.textFile(inputFile) - val partitioner = new HashPartitioner(sc.defaultParallelism) - val links = - if (usePartitioner) - input.map(parseArticle _).partitionBy(partitioner).cache() - else - input.map(parseArticle _).cache() - val n = links.count() - val defaultRank = 1.0 / n - val a = 0.15 - - // Do the computation - val startTime = System.currentTimeMillis - val ranks = - pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism) - - // Print the result - System.err.println("Articles with PageRank >= "+threshold+":") - val top = - (ranks - .filter { case (id, rank) => rank >= threshold } - .map { case (id, rank) => "%s\t%s\n".format(id, rank) } - .collect().mkString) - println(top) - - val time = (System.currentTimeMillis - startTime) / 1000.0 - println("Completed %d iterations in %f seconds: %f seconds per iteration" - .format(numIterations, time, time / numIterations)) - System.exit(0) - } - - def parseArticle(line: String): (String, Array[String]) = { - val fields = line.split("\t") - val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) - val id = new String(title) - val links = - if (body == "\\N") - NodeSeq.Empty - else - try { - XML.loadString(body) \\ "link" \ "target" - } catch { - case e: org.xml.sax.SAXParseException => - System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) - NodeSeq.Empty - } - val outEdges = links.map(link => new String(link.text)).toArray - (id, outEdges) - } - - def pageRank( - links: RDD[(String, Array[String])], - numIterations: Int, - defaultRank: Double, - a: Double, - n: Long, - partitioner: Partitioner, - usePartitioner: Boolean, - numPartitions: Int - ): RDD[(String, Double)] = { - var ranks = links.mapValues { edges => defaultRank } - for (i <- 1 to numIterations) { - val contribs = links.groupWith(ranks).flatMap { - case (id, (linksWrapper, rankWrapper)) => - if (linksWrapper.length > 0) { - if (rankWrapper.length > 0) { - linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size)) - } else { - linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size)) - } - } else { - Array[(String, Double)]() - } - } - ranks = (contribs.combineByKey((x: Double) => x, - (x: Double, y: Double) => x + y, - (x: Double, y: Double) => x + y, - partitioner) - .mapValues(sum => a/n + (1-a)*sum)) - } - ranks - } -} - -class WPRSerializer extends spark.serializer.Serializer { - def newInstance(): SerializerInstance = new WPRSerializerInstance() -} - -class WPRSerializerInstance extends SerializerInstance { - def serialize[T](t: T): ByteBuffer = { - throw new UnsupportedOperationException() - } - - def deserialize[T](bytes: ByteBuffer): T = { - throw new UnsupportedOperationException() - } - - def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { - throw new UnsupportedOperationException() - } - - def serializeStream(s: OutputStream): SerializationStream = { - new WPRSerializationStream(s) - } - - def deserializeStream(s: InputStream): DeserializationStream = { - new WPRDeserializationStream(s) - } -} - -class WPRSerializationStream(os: OutputStream) extends SerializationStream { - val dos = new DataOutputStream(os) - - def writeObject[T](t: T): SerializationStream = t match { - case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match { - case links: Array[String] => { - dos.writeInt(0) // links - dos.writeUTF(id) - dos.writeInt(links.length) - for (link <- links) { - dos.writeUTF(link) - } - this - } - case rank: Double => { - dos.writeInt(1) // rank - dos.writeUTF(id) - dos.writeDouble(rank) - this - } - } - case (id: String, rank: Double) => { - dos.writeInt(2) // rank without wrapper - dos.writeUTF(id) - dos.writeDouble(rank) - this - } - } - - def flush() { dos.flush() } - def close() { dos.close() } -} - -class WPRDeserializationStream(is: InputStream) extends DeserializationStream { - val dis = new DataInputStream(is) - - def readObject[T](): T = { - val typeId = dis.readInt() - typeId match { - case 0 => { - val id = dis.readUTF() - val numLinks = dis.readInt() - val links = new Array[String](numLinks) - for (i <- 0 until numLinks) { - val link = dis.readUTF() - links(i) = link - } - (id, ArrayBuffer(links)).asInstanceOf[T] - } - case 1 => { - val id = dis.readUTF() - val rank = dis.readDouble() - (id, ArrayBuffer(rank)).asInstanceOf[T] - } - case 2 => { - val id = dis.readUTF() - val rank = dis.readDouble() - (id, rank).asInstanceOf[T] - } - } - } - - def close() { dis.close() } -} diff --git a/bagel/src/test/resources/log4j.properties b/bagel/src/test/resources/log4j.properties index 83d05cab2f..5cdcf35b23 100644 --- a/bagel/src/test/resources/log4j.properties +++ b/bagel/src/test/resources/log4j.properties @@ -1,4 +1,21 @@ -# Set everything to be logged to the file bagel/target/unit-tests.log +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file bagel/target/unit-tests.log log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala index a09c978068..7b954a4775 100644 --- a/bagel/src/test/scala/bagel/BagelSuite.scala +++ b/bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala @@ -1,13 +1,28 @@ -package spark.bagel +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import org.scalatest.{FunSuite, Assertions, BeforeAndAfter} +package org.apache.spark.bagel + +import org.scalatest.{BeforeAndAfter, FunSuite, Assertions} import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import scala.collection.mutable.ArrayBuffer - -import spark._ -import storage.StorageLevel +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable class TestMessage(val targetId: String) extends Message[String] with Serializable |