aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
commit6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch)
tree3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /examples
parent56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff)
parent8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff)
downloadspark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip
Merge branch 'streaming' into ScrapCode-streaming
Conflicts: streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml17
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java)0
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java)0
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaQueueStream.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java)0
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala2
-rw-r--r--examples/src/main/scala/spark/examples/SparkALS.scala59
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala16
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala)33
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala11
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala71
10 files changed, 64 insertions, 145 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index 782c026d73..f43af670c6 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -19,6 +19,11 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.twitter4j</groupId>
+ <artifactId>twitter4j-stream</artifactId>
+ <version>3.0.3</version>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>
@@ -53,6 +58,12 @@
<classifier>hadoop1</classifier>
</dependency>
<dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
@@ -80,6 +91,12 @@
<classifier>hadoop2</classifier>
</dependency>
<dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-streaming</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
index cddce16e39..cddce16e39 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
index 4299febfd6..4299febfd6 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
index 43c3cd4dfa..43c3cd4dfa 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
index f2ac2b3e06..9553162004 100644
--- a/examples/src/main/scala/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -5,7 +5,7 @@ import spark.util.Vector
object LocalLR {
val N = 10000 // Number of data points
- val D = 10 // Numer of dimensions
+ val D = 10 // Number of dimensions
val R = 0.7 // Scaling factor
val ITERATIONS = 5
val rand = new Random(42)
diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala
index fb28e2c932..5e01885dbb 100644
--- a/examples/src/main/scala/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/spark/examples/SparkALS.scala
@@ -7,6 +7,7 @@ import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
import spark._
+import scala.Option
object SparkALS {
// Parameters set through command line arguments
@@ -42,7 +43,7 @@ object SparkALS {
return sqrt(sumSqs / (M * U))
}
- def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
R: DoubleMatrix2D) : DoubleMatrix1D =
{
val U = us.size
@@ -68,50 +69,30 @@ object SparkALS {
return solved2D.viewColumn(0)
}
- def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
- R: DoubleMatrix2D) : DoubleMatrix1D =
- {
- val M = ms.size
- val F = ms(0).size
- val XtX = factory2D.make(F, F)
- val Xty = factory1D.make(F)
- // For each movie that the user rated
- for (i <- 0 until M) {
- val m = ms(i)
- // Add m * m^t to XtX
- blas.dger(1, m, m, XtX)
- // Add m * rating to Xty
- blas.daxpy(R.get(i, j), m, Xty)
- }
- // Add regularization coefs to diagonal terms
- for (d <- 0 until F) {
- XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
- }
- // Solve it with Cholesky
- val ch = new CholeskyDecomposition(XtX)
- val Xty2D = factory2D.make(Xty.toArray, F)
- val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
- }
-
def main(args: Array[String]) {
var host = ""
var slices = 0
- args match {
- case Array(m, u, f, iters, slices_, host_) => {
- M = m.toInt
- U = u.toInt
- F = f.toInt
- ITERATIONS = iters.toInt
- slices = slices_.toInt
- host = host_
+
+ (0 to 5).map(i => {
+ i match {
+ case a if a < args.length => Some(args(a))
+ case _ => None
+ }
+ }).toArray match {
+ case Array(host_, m, u, f, iters, slices_) => {
+ host = host_ getOrElse "local"
+ M = (m getOrElse "100").toInt
+ U = (u getOrElse "500").toInt
+ F = (f getOrElse "10").toInt
+ ITERATIONS = (iters getOrElse "5").toInt
+ slices = (slices_ getOrElse "2").toInt
}
case _ => {
- System.err.println("Usage: SparkALS <M> <U> <F> <iters> <slices> <master>")
+ System.err.println("Usage: SparkALS [<master> <M> <U> <F> <iters> <slices>]")
System.exit(1)
}
}
- printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
val spark = new SparkContext(host, "SparkALS")
val R = generateR()
@@ -127,11 +108,11 @@ object SparkALS {
for (iter <- 1 to ITERATIONS) {
println("Iteration " + iter + ":")
ms = spark.parallelize(0 until M, slices)
- .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value))
+ .map(i => update(i, msc.value(i), usc.value, Rc.value))
.toArray
msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
us = spark.parallelize(0 until U, slices)
- .map(i => updateUser(i, usc.value(i), msc.value, Rc.value))
+ .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value)))
.toArray
usc = spark.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index fe55db6e2c..65d5da82fc 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._
object KafkaWordCount {
def main(args: Array[String]) {
- if (args.length < 6) {
- System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
- val Array(master, hostname, port, group, topics, numThreads) = args
+ val Array(master, zkQuorum, group, topics, numThreads) = args
val sc = new SparkContext(master, "KafkaWordCount")
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
+ val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
@@ -38,16 +38,16 @@ object KafkaWordCount {
object KafkaWordCountProducer {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
+ if (args.length < 2) {
+ System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
- val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
+ val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", hostname + ":" + port)
+ props.put("zk.connect", zkQuorum)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index 377bc0c98e..fdb3a4c73c 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -1,19 +1,19 @@
-package spark.streaming.examples.twitter
+package spark.streaming.examples
-import spark.streaming.StreamingContext._
import spark.streaming.{Seconds, StreamingContext}
+import StreamingContext._
import spark.SparkContext._
-import spark.storage.StorageLevel
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
+ *
*/
-object TwitterBasic {
+object TwitterPopularTags {
def main(args: Array[String]) {
if (args.length < 3) {
- System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
+ System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
@@ -21,10 +21,8 @@ object TwitterBasic {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
- val stream = new TwitterInputDStream(ssc, username, password, filters,
- StorageLevel.MEMORY_ONLY_SER)
- ssc.registerInputStream(stream)
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+ val stream = ssc.twitterStream(username, password, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
@@ -39,22 +37,17 @@ object TwitterBasic {
// Print popular hashtags
topCounts60.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts10.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
}
-
}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index a191321d91..60f228b8ad 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -28,16 +28,15 @@ object PageViewStream {
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.networkTextStream(host, port)
- .flatMap(_.split("\n"))
- .map(PageView.fromString(_))
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
// Return a count of views per URL seen in each batch
- val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
+ val pageCounts = pageViews.map(view => view.url).countByValue()
// Return a sliding window of page views per URL in the last ten seconds
- val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
- .window(Seconds(10), Seconds(2))
- .countByKey()
+ val slidingPageCounts = pageViews.map(view => view.url)
+ .countByValueAndWindow(Seconds(10), Seconds(2))
// Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
deleted file mode 100644
index 2532f27d1a..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-package spark.streaming.examples.twitter
-
-import spark._
-import spark.streaming._
-import dstream.{NetworkReceiver, NetworkInputDStream}
-import storage.StorageLevel
-import twitter4j._
-import twitter4j.auth.BasicAuthorization
-import collection.JavaConversions._
-
-/* A stream of Twitter statuses, potentially filtered by one or more keywords.
-*
-* @constructor create a new Twitter stream using the supplied username and password to authenticate.
-* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
-* such that this may return a sampled subset of all tweets during each interval.
-*/
-class TwitterInputDStream(
- @transient ssc_ : StreamingContext,
- username: String,
- password: String,
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
-
- override def getReceiver(): NetworkReceiver[Status] = {
- new TwitterReceiver(username, password, filters, storageLevel)
- }
-}
-
-class TwitterReceiver(
- username: String,
- password: String,
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
-
- var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
-
- protected override def onStart() {
- blockGenerator.start()
- twitterStream = new TwitterStreamFactory()
- .getInstance(new BasicAuthorization(username, password))
- twitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
- blockGenerator += status
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {}
- })
-
- val query: FilterQuery = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.toArray)
- twitterStream.filter(query)
- } else {
- twitterStream.sample()
- }
- logInfo("Twitter receiver started")
- }
-
- protected override def onStop() {
- blockGenerator.stop()
- twitterStream.shutdown()
- logInfo("Twitter receiver stopped")
- }
-}