From f7d3e309cb76ef208ab51f23c90c5e891fb333a3 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 8 Feb 2013 16:56:42 +0530 Subject: ZeroMQ stream as receiver --- .../scala/spark/streaming/StreamingContext.scala | 22 +++++++++++++++ .../spark/streaming/receivers/ZeroMQReceiver.scala | 33 ++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index a9684c5772..8c772aec6e 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -2,12 +2,14 @@ package spark.streaming import akka.actor.Props import akka.actor.SupervisorStrategy +import akka.zeromq.Subscribe import spark.streaming.dstream._ import spark.{RDD, Logging, SparkEnv, SparkContext} import spark.streaming.receivers.ActorReceiver import spark.streaming.receivers.ReceiverSupervisorStrategy +import spark.streaming.receivers.ZeroMQReceiver import spark.storage.StorageLevel import spark.util.MetadataCleaner import spark.streaming.receivers.ActorReceiver @@ -174,6 +176,26 @@ class StreamingContext private ( networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } + /** + * ZeroMQ stream receiver + * @param publisherUrl Url of remote zeromq publisher + * @param zeroMQ topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T: ClassManifest](publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala new file mode 100644 index 0000000000..5533c3cf1e --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala @@ -0,0 +1,33 @@ +package spark.streaming.receivers + +import akka.actor.Actor +import akka.zeromq._ + +import spark.Logging + +/** + * A receiver to subscribe to ZeroMQ stream. + */ +private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) + extends Actor with Receiver with Logging { + + override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + Connect(publisherUrl), subscribe) + + def receive: Receive = { + + case Connecting ⇒ logInfo("connecting ...") + + case m: ZMQMessage ⇒ + logDebug("Received message for:" + m.firstFrameAsString) + + //We ignore first frame for processing as it is the topic + val bytes = m.frames.tail.map(_.payload) + pushBlock(bytesToObjects(bytes)) + + case Closed ⇒ logInfo("received closed ") + + } +} -- cgit v1.2.3 From 8d44480d840079cb444b5e19511e5027dedd7f77 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Tue, 19 Feb 2013 19:42:14 +0530 Subject: example for demonstrating ZeroMQ stream --- .../spark/streaming/examples/ZeroMQWordCount.scala | 70 ++++++++++++++++++++++ .../scala/spark/streaming/StreamingContext.scala | 15 +++-- 2 files changed, 77 insertions(+), 8 deletions(-) create mode 100644 examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala (limited to 'streaming') diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala new file mode 100644 index 0000000000..ab7b67ed4b --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala @@ -0,0 +1,70 @@ +package spark.streaming.examples + +import akka.actor.ActorSystem +import akka.actor.actorRef2Scala +import akka.zeromq._ +import spark.streaming.{ Seconds, StreamingContext } +import spark.streaming.StreamingContext._ +import akka.zeromq.Subscribe + +/** + * A simple publisher for demonstration purposes, repeatedly publishes random Messages + * every one second. + */ +object SimpleZeroMQPublisher { + + def main(args: Array[String]) = { + if (args.length < 2) { + System.err.println("Usage: SimpleZeroMQPublisher ") + System.exit(1) + } + + val Seq(url, topic) = args.toSeq + val acs: ActorSystem = ActorSystem() + + val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) + val messages: Array[String] = Array("words ", "may ", "count ") + while (true) { + Thread.sleep(1000) + pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList) + } + acs.awaitTermination() + } +} + +/** + * A sample wordcount with ZeroMQStream stream + * + * Usage: WordCountZeroMQ + * In local mode, should be 'local[n]' with n > 1 + * and describe where zeroMq publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * and then run the example + * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` + */ +object ZeroMQWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: WordCountZeroMQ " + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + val Seq(master, url, topic) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2)) + + def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator + + //For this stream, a zeroMQ publisher should be running. + val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + } + +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8c772aec6e..f15e6bd23d 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -163,7 +163,7 @@ class StreamingContext private ( * @param props Props object defining creation of the actor * @param name Name of the actor * @param storageLevel RDD storage level. Defaults to memory-only. - * + * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e parametrized type of data received and actorStream @@ -181,9 +181,9 @@ class StreamingContext private ( * @param publisherUrl Url of remote zeromq publisher * @param zeroMQ topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ def zeroMQStream[T: ClassManifest](publisherUrl:String, @@ -191,11 +191,11 @@ class StreamingContext private ( bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { - - actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + + actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } - + /** * Create an input stream that pulls messages form a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). @@ -500,4 +500,3 @@ object StreamingContext { new Path(sscCheckpointDir, UUID.randomUUID.toString).toString } } - -- cgit v1.2.3 From d8ee184d950ac91ad4f617fc0d7b2f02ed6b74d3 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Tue, 19 Feb 2013 17:42:57 +0200 Subject: Dependencies and refactoring for streaming HLL example, and using context.twitterStream method --- examples/pom.xml | 6 --- .../streaming/examples/TwitterAlgebirdHLL.scala | 62 +++++++++++++++++++++ .../streaming/examples/twitter/StreamingHLL.scala | 63 ---------------------- streaming/pom.xml | 10 ++++ 4 files changed, 72 insertions(+), 69 deletions(-) create mode 100644 examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala delete mode 100644 examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala (limited to 'streaming') diff --git a/examples/pom.xml b/examples/pom.xml index 28da3dbde4..7d975875fa 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -19,17 +19,11 @@ org.eclipse.jetty jetty-server - - org.twitter4j - twitter4j-stream - 3.0.3 - com.twitter algebird-core_2.9.2 0.1.8 - org.scalatest scalatest_${scala.version} diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000000..c2095f5b94 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -0,0 +1,62 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import spark.storage.StorageLevel +import com.twitter.algebird.HyperLogLog._ +import com.twitter.algebird.HyperLogLogMonoid +import spark.streaming.dstream.TwitterInputDStream + +/** + * Example using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's + * TwitterInputDStream to compute approximate distinct counts of userids. + */ +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterAlgebirdHLL " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + /** Bit size parameter for HyperLogLog */ + val BIT_SIZE = 12 + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + var globalHll = new HyperLogLogMonoid(BIT_SIZE).zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + val hll = new HyperLogLogMonoid(BIT_SIZE) + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreach(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1) * 100)) + } + }) + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala deleted file mode 100644 index 023a0add80..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala +++ /dev/null @@ -1,63 +0,0 @@ -package spark.streaming.examples.twitter - -import spark.streaming.{Seconds, StreamingContext} -import spark.storage.StorageLevel -import com.twitter.algebird.HyperLogLog._ -import com.twitter.algebird.HyperLogLogMonoid -import spark.streaming.dstream.TwitterInputDStream - -/** - * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's - * TwitterInputDStream - */ -object StreamingHLL { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: TwitterStreamingHLL " + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - - val Array(master, username, password) = args.slice(0, 3) - val filters = args.slice(3, args.length) - - val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) - - val users = stream.map(status => status.getUser.getId) - - val globalHll = new HyperLogLogMonoid(12) - var userSet: Set[Long] = Set() - - val approxUsers = users.mapPartitions(ids => { - val hll = new HyperLogLogMonoid(12) - ids.map(id => hll(id)) - }).reduce(_ + _) - - val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - - var h = globalHll.zero - approxUsers.foreach(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - h += partial - println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt)) - } - }) - - exactUsers.foreach(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - userSet ++= partial - println("Exact distinct users this batch: %d".format(partial.size)) - println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100)) - } - }) - - ssc.start() - } -} diff --git a/streaming/pom.xml b/streaming/pom.xml index 6ee7e59df3..d78c39da0d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -47,6 +47,16 @@ zkclient 0.1 + + org.twitter4j + twitter4j-stream + 3.0.3 + + + org.twitter4j + twitter4j-core + 3.0.3 + org.scalatest -- cgit v1.2.3 From 4e5b09664cdf95effff61c042b6243107355b55c Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 20 Feb 2013 12:33:37 +0530 Subject: fixes corresponding to review feedback at pull request #479 --- .../main/scala/spark/streaming/examples/ZeroMQWordCount.scala | 9 ++++++--- project/SparkBuild.scala | 4 ++-- streaming/src/main/scala/spark/streaming/StreamingContext.scala | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) (limited to 'streaming') diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala index ab7b67ed4b..5ed9b7cb76 100644 --- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala @@ -35,20 +35,23 @@ object SimpleZeroMQPublisher { /** * A sample wordcount with ZeroMQStream stream * - * Usage: WordCountZeroMQ + * To work with zeroMQ, some native libraries have to be installed. + * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide](http://www.zeromq.org/intro:get-the-software) + * + * Usage: ZeroMQWordCount * In local mode, should be 'local[n]' with n > 1 * and describe where zeroMq publisher is running. * * To run this example locally, you may run publisher as * `$ ./run spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` - * and then run the example + * and run the example as * `$ ./run spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` */ object ZeroMQWordCount { def main(args: Array[String]) { if (args.length < 3) { System.err.println( - "Usage: WordCountZeroMQ " + + "Usage: ZeroMQWordCount " + "In local mode, should be 'local[n]' with n > 1") System.exit(1) } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 5fe85a28c3..7f432b60db 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -134,7 +134,6 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3", - "com.typesafe.akka" % "akka-zeromq" % "2.0.3", "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "cc.spray" % "spray-can" % "1.0-M2.1", @@ -165,7 +164,8 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile", "com.github.sgroschupf" % "zkclient" % "0.1", - "org.twitter4j" % "twitter4j-stream" % "3.0.3" + "org.twitter4j" % "twitter4j-stream" % "3.0.3", + "com.typesafe.akka" % "akka-zeromq" % "2.0.3" ) ) ++ assemblySettings ++ extraAssemblySettings diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index f15e6bd23d..2ca7dcc218 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -177,7 +177,7 @@ class StreamingContext private ( } /** - * ZeroMQ stream receiver + * Create an input stream that receives messages pushed by a zeromq publisher. * @param publisherUrl Url of remote zeromq publisher * @param zeroMQ topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence -- cgit v1.2.3