diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-21 19:04:49 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-21 19:05:05 -0700 |
commit | 94cbe2329021296b660d88f3e8ef3734374020d2 (patch) | |
tree | 1259ce6fa390dfa2947be38bc80a3b0b0d300938 /external/twitter/src/test | |
parent | a34e6fda1d6fb8e769c21db70845f1a6dde968d8 (diff) | |
download | spark-94cbe2329021296b660d88f3e8ef3734374020d2.tar.gz spark-94cbe2329021296b660d88f3e8ef3734374020d2.tar.bz2 spark-94cbe2329021296b660d88f3e8ef3734374020d2.zip |
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]
The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51
Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability.
Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented.
This PR is blocked on the graceful shutdown PR #247
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #300 from tdas/network-receiver-api and squashes the following commits:
ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff.
838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers.
a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues.
91bfa72 [Tathagata Das] Fixed bugs.
8533094 [Tathagata Das] Scala style fixes.
028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver.
43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java.
2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread.
9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api
3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable.
a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
(cherry picked from commit 04c37b6f749dc2418cc28c89964cdc687dfcbd51)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'external/twitter/src/test')
-rw-r--r-- | external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala index 06ab0cdaf3..93741e0375 100644 --- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala +++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} import org.apache.spark.storage.StorageLevel import twitter4j.auth.{NullAuthorization, Authorization} +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import twitter4j.Status class TwitterStreamSuite extends TestSuiteBase { @@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase { val authorization: Authorization = NullAuthorization.getInstance() // tests the API, does not actually test data receiving - val test1 = TwitterUtils.createStream(ssc, None) - val test2 = TwitterUtils.createStream(ssc, None, filters) - val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) - val test4 = TwitterUtils.createStream(ssc, Some(authorization)) - val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters) - val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters, - StorageLevel.MEMORY_AND_DISK_SER_2) + val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None) + val test2: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, None, filters) + val test3: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2) + val test4: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, Some(authorization)) + val test5: ReceiverInputDStream[Status] = + TwitterUtils.createStream(ssc, Some(authorization), filters) + val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream( + ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2) // Note that actually testing the data receiving is hard as authentication keys are // necessary for accessing Twitter live stream |