aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-14 17:49:43 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-14 17:49:43 -0800
commitdef8126d7788a8bd991ac6f9f9403de701a39dc5 (patch)
treeaedd6ef98806069248149d0fdfd08e95dbc11e85
parent2eacf22401f75b956036fb0c32eb38baa16b224e (diff)
downloadspark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.gz
spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.bz2
spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.zip
Added TwitterInputDStream from example to StreamingContext. Renamed example TwitterBasic to TwitterPopularTags.
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala)33
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala52
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala)5
4 files changed, 53 insertions, 45 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index 377bc0c98e..fdb3a4c73c 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -1,19 +1,19 @@
-package spark.streaming.examples.twitter
+package spark.streaming.examples
-import spark.streaming.StreamingContext._
import spark.streaming.{Seconds, StreamingContext}
+import StreamingContext._
import spark.SparkContext._
-import spark.storage.StorageLevel
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
+ *
*/
-object TwitterBasic {
+object TwitterPopularTags {
def main(args: Array[String]) {
if (args.length < 3) {
- System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
+ System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
@@ -21,10 +21,8 @@ object TwitterBasic {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
- val stream = new TwitterInputDStream(ssc, username, password, filters,
- StorageLevel.MEMORY_ONLY_SER)
- ssc.registerInputStream(stream)
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+ val stream = ssc.twitterStream(username, password, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
@@ -39,22 +37,17 @@ object TwitterBasic {
// Print popular hashtags
topCounts60.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts10.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
}
-
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index af8b5ba017..c6d3cc8b15 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -154,10 +154,7 @@ object SparkBuild extends Build {
)
def examplesSettings = sharedSettings ++ Seq(
- name := "spark-examples",
- libraryDependencies ++= Seq(
- "org.twitter4j" % "twitter4j-stream" % "3.0.3"
- )
+ name := "spark-examples"
)
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
@@ -166,7 +163,8 @@ object SparkBuild extends Build {
name := "spark-streaming",
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
- "com.github.sgroschupf" % "zkclient" % "0.1"
+ "com.github.sgroschupf" % "zkclient" % "0.1",
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3"
)
) ++ assemblySettings ++ extraAssemblySettings
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 8cfbec51d2..9be9d884be 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -17,6 +17,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import java.util.UUID
+import twitter4j.Status
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -30,14 +31,14 @@ class StreamingContext private (
) extends Logging {
/**
- * Creates a StreamingContext using an existing SparkContext.
+ * Create a StreamingContext using an existing SparkContext.
* @param sparkContext Existing SparkContext
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = this(sparkContext, null, batchDuration)
/**
- * Creates a StreamingContext by providing the details necessary for creating a new SparkContext.
+ * Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param frameworkName A name for your job, to display on the cluster web UI
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -46,7 +47,7 @@ class StreamingContext private (
this(StreamingContext.createNewSparkContext(master, frameworkName), null, batchDuration)
/**
- * Re-creates a StreamingContext from a checkpoint file.
+ * Re-create a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
@@ -101,12 +102,12 @@ class StreamingContext private (
protected[streaming] var scheduler: Scheduler = null
/**
- * Returns the associated Spark context
+ * Return the associated Spark context
*/
def sparkContext = sc
/**
- * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * Set each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of time and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
@@ -117,7 +118,7 @@ class StreamingContext private (
}
/**
- * Sets the context to periodically checkpoint the DStream operations for master
+ * Set the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
@@ -200,7 +201,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream from a Flume source.
+ * Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
@@ -236,7 +237,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -255,7 +256,7 @@ class StreamingContext private (
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
@@ -274,9 +275,8 @@ class StreamingContext private (
inputStream
}
-
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
@@ -286,7 +286,25 @@ class StreamingContext private (
}
/**
- * Creates an input stream from a queue of RDDs. In each batch,
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Seq[String],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -300,7 +318,7 @@ class StreamingContext private (
}
/**
- * Creates an input stream from a queue of RDDs. In each batch,
+ * Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
@@ -325,7 +343,7 @@ class StreamingContext private (
}
/**
- * Registers an input stream that will be started (InputDStream.start() called) to get the
+ * Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
def registerInputStream(inputStream: InputDStream[_]) {
@@ -333,7 +351,7 @@ class StreamingContext private (
}
/**
- * Registers an output stream that will be computed every interval
+ * Register an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
@@ -351,7 +369,7 @@ class StreamingContext private (
}
/**
- * Starts the execution of the streams.
+ * Start the execution of the streams.
*/
def start() {
if (checkpointDir != null && checkpointDuration == null && graph != null) {
@@ -379,7 +397,7 @@ class StreamingContext private (
}
/**
- * Stops the execution of the streams.
+ * Stop the execution of the streams.
*/
def stop() {
try {
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index 99ed4cdc1c..d733254ddb 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -1,4 +1,4 @@
-package spark.streaming.examples.twitter
+package spark.streaming.dstream
import spark._
import spark.streaming._
@@ -6,7 +6,6 @@ import dstream.{NetworkReceiver, NetworkInputDStream}
import storage.StorageLevel
import twitter4j._
import twitter4j.auth.BasicAuthorization
-import collection.JavaConversions._
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -50,7 +49,7 @@ class TwitterReceiver(
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {}
+ def onException(e: Exception) { stopOnError(e) }
})
val query: FilterQuery = new FilterQuery