aboutsummaryrefslogtreecommitdiff
path: root/external/twitter/src
diff options
context:
space:
mode:
Diffstat (limited to 'external/twitter/src')
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala23
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala20
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala20
3 files changed, 36 insertions, 27 deletions
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 843a4a7a9a..7bca140711 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -25,6 +25,8 @@ import twitter4j.auth.OAuthAuthorization
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -41,7 +43,7 @@ class TwitterInputDStream(
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
+ ) extends ReceiverInputDStream[Status](ssc_) {
private def createOAuthAuthorization(): Authorization = {
new OAuthAuthorization(new ConfigurationBuilder().build())
@@ -49,7 +51,7 @@ class TwitterInputDStream(
private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
- override def getReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): Receiver[Status] = {
new TwitterReceiver(authorization, filters, storageLevel)
}
}
@@ -59,27 +61,27 @@ class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
+ ) extends Receiver[Status](storageLevel) with Logging {
var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
- protected override def onStart() {
- blockGenerator.start()
+ def onStart() {
twitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
twitterStream.addListener(new StatusListener {
def onStatus(status: Status) = {
- blockGenerator += status
+ store(status)
}
// Unimplemented
def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
def onTrackLimitationNotice(i: Int) {}
def onScrubGeo(l: Long, l1: Long) {}
def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) { stopOnError(e) }
+ def onException(e: Exception) {
+ restart("Error receiving tweets", e)
+ }
})
- val query: FilterQuery = new FilterQuery
+ val query = new FilterQuery
if (filters.size > 0) {
query.track(filters.toArray)
twitterStream.filter(query)
@@ -89,8 +91,7 @@ class TwitterReceiver(
logInfo("Twitter receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
twitterStream.shutdown()
logInfo("Twitter receiver stopped")
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index e8433b7e9f..c6a9a2b737 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -21,8 +21,8 @@ import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object TwitterUtils {
/**
@@ -40,7 +40,7 @@ object TwitterUtils {
twitterAuth: Option[Authorization],
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
+ ): ReceiverInputDStream[Status] = {
new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}
@@ -52,7 +52,7 @@ object TwitterUtils {
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
- def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None)
}
@@ -65,7 +65,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
- def createStream(jssc: JavaStreamingContext, filters: Array[String]): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, filters: Array[String]
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters)
}
@@ -82,7 +83,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, None, filters, storageLevel)
}
@@ -92,7 +93,8 @@ object TwitterUtils {
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
- def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization): JavaDStream[Status] = {
+ def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth))
}
@@ -107,7 +109,7 @@ object TwitterUtils {
jssc: JavaStreamingContext,
twitterAuth: Authorization,
filters: Array[String]
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters)
}
@@ -123,7 +125,7 @@ object TwitterUtils {
twitterAuth: Authorization,
filters: Array[String],
storageLevel: StorageLevel
- ): JavaDStream[Status] = {
+ ): JavaReceiverInputDStream[Status] = {
createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
index 06ab0cdaf3..93741e0375 100644
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.streaming.twitter
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.storage.StorageLevel
import twitter4j.auth.{NullAuthorization, Authorization}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import twitter4j.Status
class TwitterStreamSuite extends TestSuiteBase {
@@ -29,13 +31,17 @@ class TwitterStreamSuite extends TestSuiteBase {
val authorization: Authorization = NullAuthorization.getInstance()
// tests the API, does not actually test data receiving
- val test1 = TwitterUtils.createStream(ssc, None)
- val test2 = TwitterUtils.createStream(ssc, None, filters)
- val test3 = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4 = TwitterUtils.createStream(ssc, Some(authorization))
- val test5 = TwitterUtils.createStream(ssc, Some(authorization), filters)
- val test6 = TwitterUtils.createStream(ssc, Some(authorization), filters,
- StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
+ val test2: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters)
+ val test3: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test4: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization))
+ val test5: ReceiverInputDStream[Status] =
+ TwitterUtils.createStream(ssc, Some(authorization), filters)
+ val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
+ ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
// Note that actually testing the data receiving is hard as authentication keys are
// necessary for accessing Twitter live stream