diff options
Diffstat (limited to 'examples')
9 files changed, 14 insertions, 17 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 83900a18df..0a2b3def18 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.api.java.flume.FlumeFunctions; +import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; /** @@ -53,8 +53,7 @@ public class JavaFlumeEventCount { JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); - FlumeFunctions flumeFunc = new FlumeFunctions(ssc); - JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port); + JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port); flumeStream.count(); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 51de4054cc..3bd7a3a90e 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.api.java.kafka.KafkaFunctions; +import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** @@ -54,7 +54,7 @@ public class JavaKafkaWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount", + JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); @@ -65,8 +65,7 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - KafkaFunctions kafkaFunc = new KafkaFunctions(ssc); - JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap); + JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override @@ -96,6 +95,6 @@ public class JavaKafkaWordCount { }); wordCounts.print(); - ssc.start(); + jssc.start(); } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index 149640e0d1..ae3709b3d9 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -52,7 +52,7 @@ object FlumeEventCount { System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a flume stream - val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) + val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 633712e816..022c8c5cb9 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -53,7 +53,7 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)) .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index f65c3f8b91..325290b66f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -97,7 +97,7 @@ object MQTTWordCount { val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) + val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) val words = lines.flatMap(x => x.toString.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index a60570f884..3ccdc908e2 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -35,7 +35,7 @@ import org.apache.spark.streaming.twitter._ * <p> * <p> * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold @@ -63,7 +63,7 @@ object TwitterAlgebirdCMS { val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index 1382fa4d1d..c7e83e76b0 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -50,7 +50,7 @@ object TwitterAlgebirdHLL { val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 84842b3d65..e2b0418d55 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -40,7 +40,7 @@ object TwitterPopularTags { val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters) + val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 789c5f2d08..5a7673756e 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -85,11 +85,10 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator //For this stream, a zeroMQ publisher should be running. - val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) + val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() } - } |