aboutsummaryrefslogtreecommitdiff
path: root/external/twitter
diff options
context:
space:
mode:
Diffstat (limited to 'external/twitter')
-rw-r--r--external/twitter/pom.xml70
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala115
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala132
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java21
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala23
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java44
-rw-r--r--external/twitter/src/test/resources/log4j.properties28
-rw-r--r--external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala59
9 files changed, 0 insertions, 536 deletions
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
deleted file mode 100644
index 5d4053afcb..0000000000
--- a/external/twitter/pom.xml
+++ /dev/null
@@ -1,70 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-twitter_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-twitter</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External Twitter</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.twitter4j</groupId>
- <artifactId>twitter4j-stream</artifactId>
- <version>4.0.4</version>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
deleted file mode 100644
index bdd57fdde3..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j._
-import twitter4j.auth.Authorization
-import twitter4j.auth.OAuthAuthorization
-import twitter4j.conf.ConfigurationBuilder
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-
-/* A stream of Twitter statuses, potentially filtered by one or more keywords.
-*
-* @constructor create a new Twitter stream using the supplied Twitter4J authentication credentials.
-* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
-* such that this may return a sampled subset of all tweets during each interval.
-*
-* If no Authorization object is provided, initializes OAuth authorization using the system
-* properties twitter4j.oauth.consumerKey, .consumerSecret, .accessToken and .accessTokenSecret.
-*/
-private[streaming]
-class TwitterInputDStream(
- _ssc: StreamingContext,
- twitterAuth: Option[Authorization],
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[Status](_ssc) {
-
- private def createOAuthAuthorization(): Authorization = {
- new OAuthAuthorization(new ConfigurationBuilder().build())
- }
-
- private val authorization = twitterAuth.getOrElse(createOAuthAuthorization())
-
- override def getReceiver(): Receiver[Status] = {
- new TwitterReceiver(authorization, filters, storageLevel)
- }
-}
-
-private[streaming]
-class TwitterReceiver(
- twitterAuth: Authorization,
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends Receiver[Status](storageLevel) with Logging {
-
- @volatile private var twitterStream: TwitterStream = _
- @volatile private var stopped = false
-
- def onStart() {
- try {
- val newTwitterStream = new TwitterStreamFactory().getInstance(twitterAuth)
- newTwitterStream.addListener(new StatusListener {
- def onStatus(status: Status): Unit = {
- store(status)
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {
- if (!stopped) {
- restart("Error receiving tweets", e)
- }
- }
- })
-
- val query = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.mkString(","))
- newTwitterStream.filter(query)
- } else {
- newTwitterStream.sample()
- }
- setTwitterStream(newTwitterStream)
- logInfo("Twitter receiver started")
- stopped = false
- } catch {
- case e: Exception => restart("Error starting Twitter stream", e)
- }
- }
-
- def onStop() {
- stopped = true
- setTwitterStream(null)
- logInfo("Twitter receiver stopped")
- }
-
- private def setTwitterStream(newTwitterStream: TwitterStream) = synchronized {
- if (twitterStream != null) {
- twitterStream.shutdown()
- }
- twitterStream = newTwitterStream
- }
-}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
deleted file mode 100644
index 9cb0106ab1..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import twitter4j.Status
-import twitter4j.auth.Authorization
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object TwitterUtils {
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param ssc StreamingContext object
- * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
- * authorization; this uses the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- ssc: StreamingContext,
- twitterAuth: Option[Authorization],
- filters: Seq[String] = Nil,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[Status] = {
- new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- */
- def createStream(jssc: JavaStreamingContext): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, None)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def createStream(jssc: JavaStreamingContext, filters: Array[String]
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, None, filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter using Twitter4J's default
- * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
- * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
- * twitter4j.oauth.accessTokenSecret.
- * @param jssc JavaStreamingContext object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- jssc: JavaStreamingContext,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, None, filters, storageLevel)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param twitterAuth Twitter4J Authorization
- */
- def createStream(jssc: JavaStreamingContext, twitterAuth: Authorization
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, Some(twitterAuth))
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param twitterAuth Twitter4J Authorization
- * @param filters Set of filter strings to get only those tweets that match them
- */
- def createStream(
- jssc: JavaStreamingContext,
- twitterAuth: Authorization,
- filters: Array[String]
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, Some(twitterAuth), filters)
- }
-
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param jssc JavaStreamingContext object
- * @param twitterAuth Twitter4J Authorization object
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def createStream(
- jssc: JavaStreamingContext,
- twitterAuth: Authorization,
- filters: Array[String],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[Status] = {
- createStream(jssc.ssc, Some(twitterAuth), filters, storageLevel)
- }
-}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java
deleted file mode 100644
index 258c0950a0..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Twitter feed receiver for spark streaming.
- */
-package org.apache.spark.streaming.twitter; \ No newline at end of file
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
deleted file mode 100644
index 580e37fa8f..0000000000
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Twitter feed receiver for spark streaming.
- */
-package object twitter
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
deleted file mode 100644
index 26ec8af455..0000000000
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter;
-
-import org.junit.Test;
-import twitter4j.Status;
-import twitter4j.auth.Authorization;
-import twitter4j.auth.NullAuthorization;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaDStream;
-
-public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testTwitterStream() {
- String[] filters = { "filter1", "filter2" };
- Authorization auth = NullAuthorization.getInstance();
-
- // tests the API, does not actually test data receiving
- JavaDStream<Status> test1 = TwitterUtils.createStream(ssc);
- JavaDStream<Status> test2 = TwitterUtils.createStream(ssc, filters);
- JavaDStream<Status> test3 = TwitterUtils.createStream(
- ssc, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<Status> test4 = TwitterUtils.createStream(ssc, auth);
- JavaDStream<Status> test5 = TwitterUtils.createStream(ssc, auth, filters);
- JavaDStream<Status> test6 = TwitterUtils.createStream(ssc,
- auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- }
-}
diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties
deleted file mode 100644
index 9a3569789d..0000000000
--- a/external/twitter/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the filetarget/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala b/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
deleted file mode 100644
index 7e5fc0cbb9..0000000000
--- a/external/twitter/src/test/scala/org/apache/spark/streaming/twitter/TwitterStreamSuite.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.twitter
-
-import org.scalatest.BeforeAndAfter
-import twitter4j.Status
-import twitter4j.auth.{Authorization, NullAuthorization}
-
-import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-class TwitterStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-
- val batchDuration = Seconds(1)
-
- private val master: String = "local[2]"
-
- private val framework: String = this.getClass.getSimpleName
-
- test("twitter input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val filters = Seq("filter1", "filter2")
- val authorization: Authorization = NullAuthorization.getInstance()
-
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[Status] = TwitterUtils.createStream(ssc, None)
- val test2: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, None, filters)
- val test3: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test4: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, Some(authorization))
- val test5: ReceiverInputDStream[Status] =
- TwitterUtils.createStream(ssc, Some(authorization), filters)
- val test6: ReceiverInputDStream[Status] = TwitterUtils.createStream(
- ssc, Some(authorization), filters, StorageLevel.MEMORY_AND_DISK_SER_2)
-
- // Note that actually testing the data receiving is hard as authentication keys are
- // necessary for accessing Twitter live stream
- ssc.stop()
- }
-}