From 94cbe2329021296b660d88f3e8ef3734374020d2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 21 Apr 2014 19:04:49 -0700 Subject: [SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP] The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51 Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability. Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented. This PR is blocked on the graceful shutdown PR #247 Author: Tathagata Das Closes #300 from tdas/network-receiver-api and squashes the following commits: ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff. 838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers. a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues. 91bfa72 [Tathagata Das] Fixed bugs. 8533094 [Tathagata Das] Scala style fixes. 028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver. 43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java. 2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread. 9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable. a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability. (cherry picked from commit 04c37b6f749dc2418cc28c89964cdc687dfcbd51) Signed-off-by: Patrick Wendell --- .../streaming/twitter/TwitterInputDStream.scala | 23 +++++++++++----------- .../spark/streaming/twitter/TwitterUtils.scala | 20 ++++++++++--------- .../streaming/twitter/TwitterStreamSuite.scala | 20 ++++++++++++------- 3 files changed, 36 insertions(+), 27 deletions(-) (limited to 'external/twitter/src') diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala index 843a4a7a9a..7bca140711 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala @@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.Logging +import org.apache.spark.streaming.receiver.Receiver /* A stream of Twitter statuses, potentially filtered by one or more keywords. * @@ -41,7 +43,7 @@ class TwitterInputDStream( twitterAuth: Option[Authorization], filters: Seq[String], storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { + ) extends ReceiverInputDStream[Status](ssc_) { private def createOAuthAuthorization(): Authorization = { new OAuthAuthorization(new ConfigurationBuilder().build()) @@ -49,7 +51,7 @@ class TwitterInputDStream( private val authorization = twitterAuth.getOrElse(createOAuthAuthorization()) - override def getReceiver(): NetworkReceiver[Status] = { + override def getReceiver(): Receiver[Status] = { new TwitterReceiver(authorization, filters, storageLevel) } } @@ -59,27 +61,27 @@ class TwitterReceiver( twitterAuth: Authorization, filters: Seq[String], storageLevel: StorageLevel - ) extends NetworkReceiver[Status] { + ) extends Receiver[Status](storageLevel) with Logging { var twitterStream: TwitterStream = _ - lazy val blockGenerator = new BlockGenerator(storageLevel) - protected override def onStart() { - blockGenerator.start() + def onStart() { twitterStream = new TwitterStreamFactory().getInstance(twitterAuth) twitterStream.addListener(new StatusListener { def onStatus(status: Status) = { - blockGenerator += status + store(status) } // Unimplemented def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} def onTrackLimitationNotice(i: Int) {} def onScrubGeo(l: Long, l1: Long) {} def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) { stopOnError(e) } + def onException(e: Exception) { + restart("Error receiving tweets", e) + } }) - val query: FilterQuery = new FilterQuery + val query = new FilterQuery if (filters.size > 0) { query.track(filters.toArray) twitterStream.filter(query) @@ -89,8 +91,7 @@ class TwitterReceiver( logInfo("Twitter receiver started") } - protected override def onStop() { - blockGenerator.stop() + def onStop() { twitterStream.shutdown() logInfo("Twitter receiver stopped") } diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index e8433b7e9f..c6a9a2b737 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -21,8 +21,8 @@ import twitter4j.Status import twitter4j.auth.Authorization import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object TwitterUtils { /** @@ -40,7 +40,7 @@ object TwitterUtils { twitterAuth: Option[Authorization], filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { + ): ReceiverInputDStream[Status] = { new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } @@ -52,7 +52,7 @@ object TwitterUtils { * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object */ - def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = { + def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None) } @@ -65,7 +65,8 @@ object TwitterUtils { * @param jssc JavaStreamingContext object * @param filters Set of filter strings to get only those tweets that match them */ - def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = { + def createStream(jssc: JavaStreamingContext, filters: Array[String] + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters) } @@ -82,7 +83,7 @@ object TwitterUtils { jssc: JavaStreamingContext, filters: Array[String], storageLevel: StorageLevel - ): JavaDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, None, filters, storageLevel) } @@ -92,7 +93,8 @@ object TwitterUtils { * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization */ - def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = { + def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth)) } @@ -107,7 +109,7 @@ object TwitterUtils { jssc: JavaStreamingContext, twitterAuth: Authorization, filters: Array[String] - ): JavaDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters) } @@ -123,7 +125,7 @@ object TwitterUtils { twitterAuth: Authorization, filters: Array[String], storageLevel: StorageLevel - ): JavaDStream[Status] = { + ): JavaReceiverInputDStream[Status] = { createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel) } } diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index 06ab0cdaf3..93741e0375 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel import twitter4j.auth.{NullAuthorization, Authorization} +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import twitter4j.Status class TwitterStreamSuite extends TestSuiteBase { @@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase { val authorization: Authorization = NullAuthorization.getInstance() // tests the API, does not actually test data receiving - val test1 = TwitterUtils.createStream(ssc, None) - val test2 = TwitterUtils.createStream(ssc, None, filters) - val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) - val test4 = TwitterUtils.createStream(ssc, Some(authorization)) - val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters) - val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters, - StorageLevel.MEMORY_AND_DISK_SER_2) + val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None) + val test2: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, None, filters) + val test3: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, Some(authorization)) + val test5: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, Some(authorization), filters) + val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream( + ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream -- cgit v1.2.3