aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-26 18:02:49 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-26 18:02:49 -0800
commit6e43039614ed1ec55a134fb82fb3e8d4e80996ef (patch)
tree14b2325558807a1a0f82a79707c2d2d975f008b7
parente240bad03b9f9e19cb84b0914b729c8d109d4815 (diff)
downloadspark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.gz
spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.tar.bz2
spark-6e43039614ed1ec55a134fb82fb3e8d4e80996ef.zip
Refactored streaming project to separate out the twitter functionality.
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala1
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala27
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala)12
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala7
-rw-r--r--project/SparkBuild.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala8
9 files changed, 64 insertions, 14 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 35b6329ab3..a00b3bde6e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -23,6 +23,8 @@ import com.twitter.algebird._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
+
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 8bfde2a829..82156060a8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
-import org.apache.spark.streaming.dstream.TwitterInputDStream
+import org.apache.spark.streaming.twitter._
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 27aa6b14bf..c4ded5e071 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala
new file mode 100644
index 0000000000..fe66e28ce6
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/StreamingContextWithTwitter.scala
@@ -0,0 +1,27 @@
+package org.apache.spark.streaming.twitter
+
+import twitter4j.Status
+import twitter4j.auth.Authorization
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+
+
+class StreamingContextWithTwitter(ssc: StreamingContext) {
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
+ * authorization; this uses the system properties twitter4j.oauth.consumerKey,
+ * .consumerSecret, .accessToken and .accessTokenSecret.
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ twitterAuth: Option[Authorization] = None,
+ filters: Seq[String] = Nil,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+ ): DStream[Status] = {
+ val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
+ ssc.registerInputStream(inputStream)
+ inputStream
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 387e15b0e6..97e48ebeca 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -15,18 +15,19 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.dstream
+package org.apache.spark.streaming.twitter
-import org.apache.spark._
-import org.apache.spark.streaming._
-import storage.StorageLevel
+import java.util.prefs.Preferences
import twitter4j._
import twitter4j.auth.Authorization
-import java.util.prefs.Preferences
import twitter4j.conf.ConfigurationBuilder
import twitter4j.conf.PropertyConfiguration
import twitter4j.auth.OAuthAuthorization
import twitter4j.auth.AccessToken
+import org.apache.spark._
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.storage.StorageLevel
/* A stream of Twitter statuses, potentially filtered by one or more keywords.
*
@@ -97,3 +98,4 @@ class TwitterReceiver(
logInfo("Twitter receiver stopped")
}
}
+
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
new file mode 100644
index 0000000000..89c202a730
--- /dev/null
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/package.scala
@@ -0,0 +1,7 @@
+package org.apache.spark.streaming
+
+package object twitter {
+ implicit def enrichMyStreamingContext(ssc: StreamingContext): StreamingContextWithTwitter = {
+ new StreamingContextWithTwitter(ssc)
+ }
+}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 7bcbd90bd3..f9ff781f38 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -50,7 +50,7 @@ object SparkBuild extends Build {
.dependsOn(core, bagel, mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, bagel, streaming)
+ .dependsOn(core, mllib, bagel, streaming, externalTwitter)
lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
@@ -60,6 +60,8 @@ object SparkBuild extends Build {
lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
+ lazy val externalTwitter = Project("streaming-twitter", file("external/twitter"), settings = twitterSettings) dependsOn(streaming)
+
lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
.dependsOn(core, bagel, mllib, repl, streaming) dependsOn(maybeYarn: _*)
@@ -313,7 +315,7 @@ object SparkBuild extends Build {
excludeAll(excludeNetty),
"org.eclipse.paho" % "mqtt-client" % "0.4.0",
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
+ // "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
)
)
@@ -354,4 +356,11 @@ object SparkBuild extends Build {
case _ => MergeStrategy.first
}
)
+
+ def twitterSettings() = streamingSettings ++ Seq(
+ name := "spark-twitter",
+ libraryDependencies ++= Seq(
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty)
+ )
+ )
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 41da028a3c..25b9b70b2c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -45,8 +45,8 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
-import twitter4j.Status
-import twitter4j.auth.Authorization
+//import twitter4j.Status
+//import twitter4j.auth.Authorization
import org.apache.spark.streaming.scheduler._
import akka.util.ByteString
@@ -414,6 +414,7 @@ class StreamingContext private (
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
+ /*
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
@@ -431,6 +432,7 @@ class StreamingContext private (
registerInputStream(inputStream)
inputStream
}
+ */
/**
* Create an input stream from a queue of RDDs. In each batch,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 78d318cf27..b32cfbb677 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -25,13 +25,13 @@ import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import twitter4j.Status
+//import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import akka.util.ByteString
-import twitter4j.auth.Authorization
+//import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -338,7 +338,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
-
+ /*
/**
* Create a input stream that returns tweets received from Twitter.
* @param twitterAuth Twitter4J Authorization object
@@ -409,7 +409,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def twitterStream(): JavaDStream[Status] = {
ssc.twitterStream()
}
-
+ */
/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor