aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-23 23:30:17 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-08-29 21:19:04 -0700
commit53cd50c0699efc8733518658100c62426b425de2 (patch)
tree334e1924a46f7faafe680f46d910ce3e6ac5edc6 /bagel
parentabdbacf2521ec40ee03ecc8e1aae8823013f24f1 (diff)
downloadspark-53cd50c0699efc8733518658100c62426b425de2.tar.gz
spark-53cd50c0699efc8733518658100c62426b425de2.tar.bz2
spark-53cd50c0699efc8733518658100c62426b425de2.zip
Change build and run instructions to use assemblies
This commit makes Spark invocation saner by using an assembly JAR to find all of Spark's dependencies instead of adding all the JARs in lib_managed. It also packages the examples into an assembly and uses that as SPARK_EXAMPLES_JAR. Finally, it replaces the old "run" script with two better-named scripts: "run-examples" for examples, and "spark-class" for Spark internal classes (e.g. REPL, master, etc). This is also designed to minimize the confusion people have in trying to use "run" to run their own classes; it's not meant to do that, but now at least if they look at it, they can modify run-examples to do a decent job for them. As part of this, Bagel's examples are also now properly moved to the examples package instead of bagel.
Diffstat (limited to 'bagel')
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala123
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala101
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala223
3 files changed, 0 insertions, 447 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala b/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
deleted file mode 100644
index de65e27fe0..0000000000
--- a/bagel/src/main/scala/spark/bagel/examples/PageRankUtils.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.bagel.examples
-
-import spark._
-import spark.SparkContext._
-
-import spark.bagel._
-import spark.bagel.Bagel._
-
-import scala.collection.mutable.ArrayBuffer
-
-import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
-
-import com.esotericsoftware.kryo._
-
-class PageRankUtils extends Serializable {
- def computeWithCombiner(numVertices: Long, epsilon: Double)(
- self: PRVertex, messageSum: Option[Double], superstep: Int
- ): (PRVertex, Array[PRMessage]) = {
- val newValue = messageSum match {
- case Some(msgSum) if msgSum != 0 =>
- 0.15 / numVertices + 0.85 * msgSum
- case _ => self.value
- }
-
- val terminate = superstep >= 10
-
- val outbox: Array[PRMessage] =
- if (!terminate)
- self.outEdges.map(targetId =>
- new PRMessage(targetId, newValue / self.outEdges.size))
- else
- Array[PRMessage]()
-
- (new PRVertex(newValue, self.outEdges, !terminate), outbox)
- }
-
- def computeNoCombiner(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[Array[PRMessage]], superstep: Int): (PRVertex, Array[PRMessage]) =
- computeWithCombiner(numVertices, epsilon)(self, messages match {
- case Some(msgs) => Some(msgs.map(_.value).sum)
- case None => None
- }, superstep)
-}
-
-class PRCombiner extends Combiner[PRMessage, Double] with Serializable {
- def createCombiner(msg: PRMessage): Double =
- msg.value
- def mergeMsg(combiner: Double, msg: PRMessage): Double =
- combiner + msg.value
- def mergeCombiners(a: Double, b: Double): Double =
- a + b
-}
-
-class PRVertex() extends Vertex with Serializable {
- var value: Double = _
- var outEdges: Array[String] = _
- var active: Boolean = _
-
- def this(value: Double, outEdges: Array[String], active: Boolean = true) {
- this()
- this.value = value
- this.outEdges = outEdges
- this.active = active
- }
-
- override def toString(): String = {
- "PRVertex(value=%f, outEdges.length=%d, active=%s)".format(value, outEdges.length, active.toString)
- }
-}
-
-class PRMessage() extends Message[String] with Serializable {
- var targetId: String = _
- var value: Double = _
-
- def this(targetId: String, value: Double) {
- this()
- this.targetId = targetId
- this.value = value
- }
-}
-
-class PRKryoRegistrator extends KryoRegistrator {
- def registerClasses(kryo: Kryo) {
- kryo.register(classOf[PRVertex])
- kryo.register(classOf[PRMessage])
- }
-}
-
-class CustomPartitioner(partitions: Int) extends Partitioner {
- def numPartitions = partitions
-
- def getPartition(key: Any): Int = {
- val hash = key match {
- case k: Long => (k & 0x00000000FFFFFFFFL).toInt
- case _ => key.hashCode
- }
-
- val mod = key.hashCode % partitions
- if (mod < 0) mod + partitions else mod
- }
-
- override def equals(other: Any): Boolean = other match {
- case c: CustomPartitioner =>
- c.numPartitions == numPartitions
- case _ => false
- }
-}
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
deleted file mode 100644
index a0c5ac9c18..0000000000
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.bagel.examples
-
-import spark._
-import spark.SparkContext._
-
-import spark.bagel._
-import spark.bagel.Bagel._
-
-import scala.xml.{XML,NodeSeq}
-
-/**
- * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles"
- * files from there, which contains one line per wiki article in a tab-separated format
- * (http://wiki.freebase.com/wiki/WEX/Documentation#articles).
- */
-object WikipediaPageRank {
- def main(args: Array[String]) {
- if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
- System.exit(-1)
- }
-
- System.setProperty("spark.serializer", "spark.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
-
- val inputFile = args(0)
- val threshold = args(1).toDouble
- val numPartitions = args(2).toInt
- val host = args(3)
- val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRank")
-
- // Parse the Wikipedia page data into a graph
- val input = sc.textFile(inputFile)
-
- println("Counting vertices...")
- val numVertices = input.count()
- println("Done counting vertices.")
-
- println("Parsing input file...")
- var vertices = input.map(line => {
- val fields = line.split("\t")
- val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
- val links =
- if (body == "\\N")
- NodeSeq.Empty
- else
- try {
- XML.loadString(body) \\ "link" \ "target"
- } catch {
- case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
- NodeSeq.Empty
- }
- val outEdges = links.map(link => new String(link.text)).toArray
- val id = new String(title)
- (id, new PRVertex(1.0 / numVertices, outEdges))
- })
- if (usePartitioner)
- vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache
- else
- vertices = vertices.cache
- println("Done parsing input file.")
-
- // Do the computation
- val epsilon = 0.01 / numVertices
- val messages = sc.parallelize(Array[(String, PRMessage)]())
- val utils = new PageRankUtils
- val result =
- Bagel.run(
- sc, vertices, messages, combiner = new PRCombiner(),
- numPartitions = numPartitions)(
- utils.computeWithCombiner(numVertices, epsilon))
-
- // Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
- val top =
- (result
- .filter { case (id, vertex) => vertex.value >= threshold }
- .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) }
- .collect.mkString)
- println(top)
- }
-}
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
deleted file mode 100644
index 3c54a85f42..0000000000
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.bagel.examples
-
-import spark._
-import serializer.{DeserializationStream, SerializationStream, SerializerInstance}
-import spark.SparkContext._
-
-import spark.bagel._
-import spark.bagel.Bagel._
-
-import scala.xml.{XML,NodeSeq}
-
-import scala.collection.mutable.ArrayBuffer
-
-import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
-import java.nio.ByteBuffer
-
-object WikipediaPageRankStandalone {
- def main(args: Array[String]) {
- if (args.length < 5) {
- System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
- System.exit(-1)
- }
-
- System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer")
-
- val inputFile = args(0)
- val threshold = args(1).toDouble
- val numIterations = args(2).toInt
- val host = args(3)
- val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRankStandalone")
-
- val input = sc.textFile(inputFile)
- val partitioner = new HashPartitioner(sc.defaultParallelism)
- val links =
- if (usePartitioner)
- input.map(parseArticle _).partitionBy(partitioner).cache()
- else
- input.map(parseArticle _).cache()
- val n = links.count()
- val defaultRank = 1.0 / n
- val a = 0.15
-
- // Do the computation
- val startTime = System.currentTimeMillis
- val ranks =
- pageRank(links, numIterations, defaultRank, a, n, partitioner, usePartitioner, sc.defaultParallelism)
-
- // Print the result
- System.err.println("Articles with PageRank >= "+threshold+":")
- val top =
- (ranks
- .filter { case (id, rank) => rank >= threshold }
- .map { case (id, rank) => "%s\t%s\n".format(id, rank) }
- .collect().mkString)
- println(top)
-
- val time = (System.currentTimeMillis - startTime) / 1000.0
- println("Completed %d iterations in %f seconds: %f seconds per iteration"
- .format(numIterations, time, time / numIterations))
- System.exit(0)
- }
-
- def parseArticle(line: String): (String, Array[String]) = {
- val fields = line.split("\t")
- val (title, body) = (fields(1), fields(3).replace("\\n", "\n"))
- val id = new String(title)
- val links =
- if (body == "\\N")
- NodeSeq.Empty
- else
- try {
- XML.loadString(body) \\ "link" \ "target"
- } catch {
- case e: org.xml.sax.SAXParseException =>
- System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body)
- NodeSeq.Empty
- }
- val outEdges = links.map(link => new String(link.text)).toArray
- (id, outEdges)
- }
-
- def pageRank(
- links: RDD[(String, Array[String])],
- numIterations: Int,
- defaultRank: Double,
- a: Double,
- n: Long,
- partitioner: Partitioner,
- usePartitioner: Boolean,
- numPartitions: Int
- ): RDD[(String, Double)] = {
- var ranks = links.mapValues { edges => defaultRank }
- for (i <- 1 to numIterations) {
- val contribs = links.groupWith(ranks).flatMap {
- case (id, (linksWrapper, rankWrapper)) =>
- if (linksWrapper.length > 0) {
- if (rankWrapper.length > 0) {
- linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size))
- } else {
- linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size))
- }
- } else {
- Array[(String, Double)]()
- }
- }
- ranks = (contribs.combineByKey((x: Double) => x,
- (x: Double, y: Double) => x + y,
- (x: Double, y: Double) => x + y,
- partitioner)
- .mapValues(sum => a/n + (1-a)*sum))
- }
- ranks
- }
-}
-
-class WPRSerializer extends spark.serializer.Serializer {
- def newInstance(): SerializerInstance = new WPRSerializerInstance()
-}
-
-class WPRSerializerInstance extends SerializerInstance {
- def serialize[T](t: T): ByteBuffer = {
- throw new UnsupportedOperationException()
- }
-
- def deserialize[T](bytes: ByteBuffer): T = {
- throw new UnsupportedOperationException()
- }
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- throw new UnsupportedOperationException()
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new WPRSerializationStream(s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new WPRDeserializationStream(s)
- }
-}
-
-class WPRSerializationStream(os: OutputStream) extends SerializationStream {
- val dos = new DataOutputStream(os)
-
- def writeObject[T](t: T): SerializationStream = t match {
- case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match {
- case links: Array[String] => {
- dos.writeInt(0) // links
- dos.writeUTF(id)
- dos.writeInt(links.length)
- for (link <- links) {
- dos.writeUTF(link)
- }
- this
- }
- case rank: Double => {
- dos.writeInt(1) // rank
- dos.writeUTF(id)
- dos.writeDouble(rank)
- this
- }
- }
- case (id: String, rank: Double) => {
- dos.writeInt(2) // rank without wrapper
- dos.writeUTF(id)
- dos.writeDouble(rank)
- this
- }
- }
-
- def flush() { dos.flush() }
- def close() { dos.close() }
-}
-
-class WPRDeserializationStream(is: InputStream) extends DeserializationStream {
- val dis = new DataInputStream(is)
-
- def readObject[T](): T = {
- val typeId = dis.readInt()
- typeId match {
- case 0 => {
- val id = dis.readUTF()
- val numLinks = dis.readInt()
- val links = new Array[String](numLinks)
- for (i <- 0 until numLinks) {
- val link = dis.readUTF()
- links(i) = link
- }
- (id, ArrayBuffer(links)).asInstanceOf[T]
- }
- case 1 => {
- val id = dis.readUTF()
- val rank = dis.readDouble()
- (id, ArrayBuffer(rank)).asInstanceOf[T]
- }
- case 2 => {
- val id = dis.readUTF()
- val rank = dis.readDouble()
- (id, rank).asInstanceOf[T]
- }
- }
- }
-
- def close() { dis.close() }
-}