aboutsummaryrefslogtreecommitdiff
path: root/external/twitter/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-21 19:04:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-21 19:04:49 -0700
commit04c37b6f749dc2418cc28c89964cdc687dfcbd51 (patch)
treeba434fee57cba6fe201e83ad9c049fded5e09bc0 /external/twitter/src
parent5a5b3346c79abb659260284fed0ace51942f3193 (diff)
downloadspark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.gz
spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.bz2
spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.zip
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]
The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51 Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability. Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented. This PR is blocked on the graceful shutdown PR #247 Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #300 from tdas/network-receiver-api and squashes the following commits: ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff. 838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers. a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues. 91bfa72 [Tathagata Das] Fixed bugs. 8533094 [Tathagata Das] Scala style fixes. 028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver. 43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java. 2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread. 9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable. a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
Diffstat (limited to 'external/twitter/src')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala23
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala20
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala20
3 files changed, 36 insertions, 27 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 843a4a7a9a..7bca140711 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -41,7 +43,7 @@ class TwitterInputDStream(
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
+ ) extends ReceiverInputDStream[Status](ssc_) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
@@ -49,7 +51,7 @@ class TwitterInputDStream(
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
- override def getReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): Receiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}
}
@@ -59,27 +61,27 @@ class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
+ ) extends Receiver[Status](storageLevel) with Logging {
var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
- protected override def onStart() {
- blockGenerator.start()
+ def onStart() {
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
- blockGenerator += status
+ store(status)
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) { stopOnError(e) }
+ def onException(e: Exception) {
+ restart("Error receiving tweets", e)
+ }
})
- val query: FilterQuery = new FilterQuery
+ val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.toArray)
twitterStream.filter(query)
@@ -89,8 +91,7 @@ class TwitterReceiver(
logInfo("Twitter receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
twitterStream.shutdown()
logInfo("Twitter receiver stopped")
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index e8433b7e9f..c6a9a2b737 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -21,8 +21,8 @@ import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object TwitterUtils {
/**
@@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
+ ): ReceiverInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}
@@ -52,7 +52,7 @@ object TwitterUtils {
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
- def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None)
}
@@ -65,7 +65,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
- def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters)
}
@@ -82,7 +83,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters, storageLevel)
}
@@ -92,7 +93,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
- def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth))
}
@@ -107,7 +109,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters)
}
@@ -123,7 +125,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 06ab0cdaf3..93741e0375 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.{NullAuthorization, Authorization}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import twitter4j.Status
class TwitterStreamSuite extends TestSuiteBase {
@@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
- val test1 = TwitterUtils.createStream(ssc, None)
- val test2 = TwitterUtils.createStream(ssc, None, filters)
- val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4 = TwitterUtils.createStream(ssc, Some(authorization))
- val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
- val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
- StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
+ val test2: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters)
+ val test3: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization))
+ val test5: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
+ ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream