diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 13:26:12 -0800 |
commit | 6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch) | |
tree | 3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /examples | |
parent | 56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff) | |
parent | 8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff) | |
download | spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2 spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip |
Merge branch 'streaming' into ScrapCode-streaming
Conflicts:
streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'examples')
10 files changed, 64 insertions, 145 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index 782c026d73..f43af670c6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -19,6 +19,11 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> + <dependency> + <groupId>org.twitter4j</groupId> + <artifactId>twitter4j-stream</artifactId> + <version>3.0.3</version> + </dependency> <dependency> <groupId>org.scalatest</groupId> @@ -53,6 +58,12 @@ <classifier>hadoop1</classifier> </dependency> <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-streaming</artifactId> + <version>${project.version}</version> + <classifier>hadoop1</classifier> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <scope>provided</scope> @@ -80,6 +91,12 @@ <classifier>hadoop2</classifier> </dependency> <dependency> + <groupId>org.spark-project</groupId> + <artifactId>spark-streaming</artifactId> + <version>${project.version}</version> + <classifier>hadoop2</classifier> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <scope>provided</scope> diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java index cddce16e39..cddce16e39 100644 --- a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java index 4299febfd6..4299febfd6 100644 --- a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java index 43c3cd4dfa..43c3cd4dfa 100644 --- a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala index f2ac2b3e06..9553162004 100644 --- a/examples/src/main/scala/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -5,7 +5,7 @@ import spark.util.Vector object LocalLR { val N = 10000 // Number of data points - val D = 10 // Numer of dimensions + val D = 10 // Number of dimensions val R = 0.7 // Scaling factor val ITERATIONS = 5 val rand = new Random(42) diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala index fb28e2c932..5e01885dbb 100644 --- a/examples/src/main/scala/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/spark/examples/SparkALS.scala @@ -7,6 +7,7 @@ import cern.jet.math._ import cern.colt.matrix._ import cern.colt.matrix.linalg._ import spark._ +import scala.Option object SparkALS { // Parameters set through command line arguments @@ -42,7 +43,7 @@ object SparkALS { return sqrt(sumSqs / (M * U)) } - def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], + def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], R: DoubleMatrix2D) : DoubleMatrix1D = { val U = us.size @@ -68,50 +69,30 @@ object SparkALS { return solved2D.viewColumn(0) } - def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { - val M = ms.size - val F = ms(0).size - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) - // For each movie that the user rated - for (i <- 0 until M) { - val m = ms(i) - // Add m * m^t to XtX - blas.dger(1, m, m, XtX) - // Add m * rating to Xty - blas.daxpy(R.get(i, j), m, Xty) - } - // Add regularization coefs to diagonal terms - for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) - } - // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) - } - def main(args: Array[String]) { var host = "" var slices = 0 - args match { - case Array(m, u, f, iters, slices_, host_) => { - M = m.toInt - U = u.toInt - F = f.toInt - ITERATIONS = iters.toInt - slices = slices_.toInt - host = host_ + + (0 to 5).map(i => { + i match { + case a if a < args.length => Some(args(a)) + case _ => None + } + }).toArray match { + case Array(host_, m, u, f, iters, slices_) => { + host = host_ getOrElse "local" + M = (m getOrElse "100").toInt + U = (u getOrElse "500").toInt + F = (f getOrElse "10").toInt + ITERATIONS = (iters getOrElse "5").toInt + slices = (slices_ getOrElse "2").toInt } case _ => { - System.err.println("Usage: SparkALS <M> <U> <F> <iters> <slices> <master>") + System.err.println("Usage: SparkALS [<master> <M> <U> <F> <iters> <slices>]") System.exit(1) } } - printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); + printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) val spark = new SparkContext(host, "SparkALS") val R = generateR() @@ -127,11 +108,11 @@ object SparkALS { for (iter <- 1 to ITERATIONS) { println("Iteration " + iter + ":") ms = spark.parallelize(0 until M, slices) - .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value)) + .map(i => update(i, msc.value(i), usc.value, Rc.value)) .toArray msc = spark.broadcast(ms) // Re-broadcast ms because it was updated us = spark.parallelize(0 until U, slices) - .map(i => updateUser(i, usc.value(i), msc.value, Rc.value)) + .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value))) .toArray usc = spark.broadcast(us) // Re-broadcast us because it was updated println("RMSE = " + rmse(R, ms, us)) diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index fe55db6e2c..65d5da82fc 100644 --- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 6) { - System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>") + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } - val Array(master, hostname, port, group, topics, numThreads) = args + val Array(master, zkQuorum, group, topics, numThreads) = args val sc = new SparkContext(master, "KafkaWordCount") val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) + val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() @@ -38,16 +38,16 @@ object KafkaWordCount { object KafkaWordCountProducer { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>") + if (args.length < 2) { + System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>") System.exit(1) } - val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args + val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", hostname + ":" + port) + props.put("zk.connect", zkQuorum) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala index 377bc0c98e..fdb3a4c73c 100644 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -1,19 +1,19 @@ -package spark.streaming.examples.twitter +package spark.streaming.examples -import spark.streaming.StreamingContext._ import spark.streaming.{Seconds, StreamingContext} +import StreamingContext._ import spark.SparkContext._ -import spark.storage.StorageLevel /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter * stream. The stream is instantiated with credentials and optionally filters supplied by the * command line arguments. + * */ -object TwitterBasic { +object TwitterPopularTags { def main(args: Array[String]) { if (args.length < 3) { - System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + + System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" + " [filter1] [filter2] ... [filter n]") System.exit(1) } @@ -21,10 +21,8 @@ object TwitterBasic { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2)) + val stream = ssc.twitterStream(username, password, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) @@ -39,22 +37,17 @@ object TwitterBasic { // Print popular hashtags topCounts60.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } + val topList = rdd.take(5) + println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) topCounts10.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } + val topList = rdd.take(5) + println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) ssc.start() } - } diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index a191321d91..60f228b8ad 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -28,16 +28,15 @@ object PageViewStream { // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.networkTextStream(host, port) - .flatMap(_.split("\n")) - .map(PageView.fromString(_)) + .flatMap(_.split("\n")) + .map(PageView.fromString(_)) // Return a count of views per URL seen in each batch - val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey() + val pageCounts = pageViews.map(view => view.url).countByValue() // Return a sliding window of page views per URL in the last ten seconds - val slidingPageCounts = pageViews.map(view => ((view.url, 1))) - .window(Seconds(10), Seconds(2)) - .countByKey() + val slidingPageCounts = pageViews.map(view => view.url) + .countByValueAndWindow(Seconds(10), Seconds(2)) // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala deleted file mode 100644 index 2532f27d1a..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala +++ /dev/null @@ -1,71 +0,0 @@ -package spark.streaming.examples.twitter - -import spark._ -import spark.streaming._ -import dstream.{NetworkReceiver, NetworkInputDStream} -import storage.StorageLevel -import twitter4j._ -import twitter4j.auth.BasicAuthorization -import collection.JavaConversions._ - -/* A stream of Twitter statuses, potentially filtered by one or more keywords. -* -* @constructor create a new Twitter stream using the supplied username and password to authenticate. -* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is -* such that this may return a sampled subset of all tweets during each interval. -*/ -class TwitterInputDStream( - @transient ssc_ : StreamingContext, - username: String, - password: String, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { - - override def getReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(username, password, filters, storageLevel) - } -} - -class TwitterReceiver( - username: String, - password: String, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkReceiver[Status] { - - var twitterStream: TwitterStream = _ - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - blockGenerator.start() - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - blockGenerator += status - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) {} - }) - - val query: FilterQuery = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() - } - logInfo("Twitter receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - twitterStream.shutdown() - logInfo("Twitter receiver stopped") - } -} |