aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
Diffstat (limited to 'bagel')
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala5
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala16
-rw-r--r--bagel/src/test/resources/log4j.properties10
-rw-r--r--bagel/src/test/scala/bagel/BagelSuite.scala2
4 files changed, 23 insertions, 10 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
index f37ee01fd2..03843019c0 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRank.scala
@@ -8,6 +8,11 @@ import spark.bagel.Bagel._
import scala.xml.{XML,NodeSeq}
+/**
+ * Run PageRank on XML Wikipedia dumps from http://wiki.freebase.com/wiki/WEX. Uses the "articles"
+ * files from there, which contains one line per wiki article in a tab-separated format
+ * (http://wiki.freebase.com/wiki/WEX/Documentation#articles).
+ */
object WikipediaPageRank {
def main(args: Array[String]) {
if (args.length < 5) {
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index ed8ace3a57..06cc8c748b 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -1,6 +1,7 @@
package spark.bagel.examples
import spark._
+import serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import spark.SparkContext._
import spark.bagel._
@@ -33,10 +34,10 @@ object WikipediaPageRankStandalone {
val partitioner = new HashPartitioner(sc.defaultParallelism)
val links =
if (usePartitioner)
- input.map(parseArticle _).partitionBy(partitioner).cache
+ input.map(parseArticle _).partitionBy(partitioner).cache()
else
- input.map(parseArticle _).cache
- val n = links.count
+ input.map(parseArticle _).cache()
+ val n = links.count()
val defaultRank = 1.0 / n
val a = 0.15
@@ -51,7 +52,7 @@ object WikipediaPageRankStandalone {
(ranks
.filter { case (id, rank) => rank >= threshold }
.map { case (id, rank) => "%s\t%s\n".format(id, rank) }
- .collect.mkString)
+ .collect().mkString)
println(top)
val time = (System.currentTimeMillis - startTime) / 1000.0
@@ -113,7 +114,7 @@ object WikipediaPageRankStandalone {
}
}
-class WPRSerializer extends spark.Serializer {
+class WPRSerializer extends spark.serializer.Serializer {
def newInstance(): SerializerInstance = new WPRSerializerInstance()
}
@@ -142,7 +143,7 @@ class WPRSerializerInstance extends SerializerInstance {
class WPRSerializationStream(os: OutputStream) extends SerializationStream {
val dos = new DataOutputStream(os)
- def writeObject[T](t: T): Unit = t match {
+ def writeObject[T](t: T): SerializationStream = t match {
case (id: String, wrapper: ArrayBuffer[_]) => wrapper(0) match {
case links: Array[String] => {
dos.writeInt(0) // links
@@ -151,17 +152,20 @@ class WPRSerializationStream(os: OutputStream) extends SerializationStream {
for (link <- links) {
dos.writeUTF(link)
}
+ this
}
case rank: Double => {
dos.writeInt(1) // rank
dos.writeUTF(id)
dos.writeDouble(rank)
+ this
}
}
case (id: String, rank: Double) => {
dos.writeInt(2) // rank without wrapper
dos.writeUTF(id)
dos.writeDouble(rank)
+ this
}
}
diff --git a/bagel/src/test/resources/log4j.properties b/bagel/src/test/resources/log4j.properties
index 02fe16866e..4c99e450bc 100644
--- a/bagel/src/test/resources/log4j.properties
+++ b/bagel/src/test/resources/log4j.properties
@@ -1,8 +1,10 @@
# Set everything to be logged to the console
-log4j.rootCategory=WARN, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=spark-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
diff --git a/bagel/src/test/scala/bagel/BagelSuite.scala b/bagel/src/test/scala/bagel/BagelSuite.scala
index 3da7152a09..ca59f46843 100644
--- a/bagel/src/test/scala/bagel/BagelSuite.scala
+++ b/bagel/src/test/scala/bagel/BagelSuite.scala
@@ -22,6 +22,8 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter {
sc.stop()
sc = null
}
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
}
test("halting by voting") {