diff options
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala | 15 | ||||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala | 13 |
2 files changed, 21 insertions, 7 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 868ff81f67..529709c2f9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -22,12 +22,19 @@ import org.apache.spark.SparkContext object BroadcastTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]") + System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo] [blockSize]") System.exit(1) } - val sc = new SparkContext(args(0), "Broadcast Test", + val bcName = if (args.length > 3) args(3) else "Http" + val blockSize = if (args.length > 4) args(4) else "4096" + + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") + System.setProperty("spark.broadcast.blockSize", blockSize) + + val sc = new SparkContext(args(0), "Broadcast Test 2", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 @@ -36,13 +43,15 @@ object BroadcastTest { arr1(i) = i } - for (i <- 0 until 2) { + for (i <- 0 until 3) { println("Iteration " + i) println("===========") + val startTime = System.nanoTime val barr1 = sc.broadcast(arr1) sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size) } + println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) } System.exit(0) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 884d6d6f34..de70c50473 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -17,17 +17,19 @@ package org.apache.spark.streaming.examples.clickstream -import java.net.{InetAddress,ServerSocket,Socket,SocketException} -import java.io.{InputStreamReader, BufferedReader, PrintWriter} +import java.net.ServerSocket +import java.io.PrintWriter import util.Random /** Represents a page view on a website with associated dimension data.*/ -class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) { +class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) + extends Serializable { override def toString() : String = { "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) } } -object PageView { + +object PageView extends Serializable { def fromString(in : String) : PageView = { val parts = in.split("\t") new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) @@ -39,6 +41,9 @@ object PageView { * This should be used in tandem with PageViewStream.scala. Example: * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10 * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * + * When running this, you may want to set the root logging level to ERROR in + * conf/log4j.properties to reduce the verbosity of the output. * */ object PageViewGenerator { val pages = Map("http://foo.com/" -> .7, |