aboutsummaryrefslogtreecommitdiff
path: root/examples/scala-2.10/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-11-11 21:36:48 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-11 21:37:00 -0800
commit12f56334bb308c19d1c6c017fe1ec10808bde12a (patch)
tree1984f9c15cb0ef568f45fde2976455abaa0450a4 /examples/scala-2.10/src
parent307b69d73c37b5a580a1079843b13aeac1f6f6f4 (diff)
downloadspark-12f56334bb308c19d1c6c017fe1ec10808bde12a.tar.gz
spark-12f56334bb308c19d1c6c017fe1ec10808bde12a.tar.bz2
spark-12f56334bb308c19d1c6c017fe1ec10808bde12a.zip
Support cross building for Scala 2.11
Let's give this another go using a version of Hive that shades its JLine dependency. Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #3159 from pwendell/scala-2.11-prashant and squashes the following commits: e93aa3e [Patrick Wendell] Restoring -Phive-thriftserver profile and cleaning up build script. f65d17d [Patrick Wendell] Fixing build issue due to merge conflict a8c41eb [Patrick Wendell] Reverting dev/run-tests back to master state. 7a6eb18 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into scala-2.11-prashant 583aa07 [Prashant Sharma] REVERT ME: removed hive thirftserver 3680e58 [Prashant Sharma] Revert "REVERT ME: Temporarily removing some Cli tests." 935fb47 [Prashant Sharma] Revert "Fixed by disabling a few tests temporarily." 925e90f [Prashant Sharma] Fixed by disabling a few tests temporarily. 2fffed3 [Prashant Sharma] Exclude groovy from sbt build, and also provide a way for such instances in future. 8bd4e40 [Prashant Sharma] Switched to gmaven plus, it fixes random failures observer with its predecessor gmaven. 5272ce5 [Prashant Sharma] SPARK_SCALA_VERSION related bugs. 2121071 [Patrick Wendell] Migrating version detection to PySpark b1ed44d [Patrick Wendell] REVERT ME: Temporarily removing some Cli tests. 1743a73 [Patrick Wendell] Removing decimal test that doesn't work with Scala 2.11 f5cad4e [Patrick Wendell] Add Scala 2.11 docs 210d7e1 [Patrick Wendell] Revert "Testing new Hive version with shaded jline" 48518ce [Patrick Wendell] Remove association of Hive and Thriftserver profiles. e9d0a06 [Patrick Wendell] Revert "Enable thritfserver for Scala 2.10 only" 67ec364 [Patrick Wendell] Guard building of thriftserver around Scala 2.10 check 8502c23 [Patrick Wendell] Enable thritfserver for Scala 2.10 only e22b104 [Patrick Wendell] Small fix in pom file ec402ab [Patrick Wendell] Various fixes 0be5a9d [Patrick Wendell] Testing new Hive version with shaded jline 4eaec65 [Prashant Sharma] Changed scripts to ignore target. 5167bea [Prashant Sharma] small correction a4fcac6 [Prashant Sharma] Run against scala 2.11 on jenkins. 80285f4 [Prashant Sharma] MAven equivalent of setting spark.executor.extraClasspath during tests. 034b369 [Prashant Sharma] Setting test jars on executor classpath during tests from sbt. d4874cb [Prashant Sharma] Fixed Python Runner suite. null check should be first case in scala 2.11. 6f50f13 [Prashant Sharma] Fixed build after rebasing with master. We should use ${scala.binary.version} instead of just 2.10 e56ca9d [Prashant Sharma] Print an error if build for 2.10 and 2.11 is spotted. 937c0b8 [Prashant Sharma] SCALA_VERSION -> SPARK_SCALA_VERSION cb059b0 [Prashant Sharma] Code review 0476e5e [Prashant Sharma] Scala 2.11 support with repl and all build changes. (cherry picked from commit daaca14c16dc2c1abc98f15ab8c6f7c14761b627) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'examples/scala-2.10/src')
-rw-r--r--examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java113
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala102
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala114
-rw-r--r--examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala92
4 files changed, 421 insertions, 0 deletions
diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
new file mode 100644
index 0000000000..16ae9a3319
--- /dev/null
+++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ *
+ * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
+ * zoo03 my-consumer-group topic1,topic2 1`
+ */
+
+public final class JavaKafkaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaKafkaWordCount() {
+ }
+
+ public static void main(String[] args) {
+ if (args.length < 4) {
+ System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+ SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
+ // Create the context with a 1 second batch size
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
+
+ int numThreads = Integer.parseInt(args[3]);
+ Map<String, Integer> topicMap = new HashMap<String, Integer>();
+ String[] topics = args[2].split(",");
+ for (String topic: topics) {
+ topicMap.put(topic, numThreads);
+ }
+
+ JavaPairReceiverInputDStream<String, String> messages =
+ KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
+
+ JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> tuple2) {
+ return tuple2._2();
+ }
+ });
+
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ jssc.start();
+ jssc.awaitTermination();
+ }
+}
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
new file mode 100644
index 0000000000..c9e1511278
--- /dev/null
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.util.Properties
+
+import kafka.producer._
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
+ * my-consumer-group topic1,topic2 1`
+ */
+object KafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ val Array(zkQuorum, group, topics, numThreads) = args
+ val sparkConf = new SparkConf().setAppName("KafkaWordCount")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+ ssc.checkpoint("checkpoint")
+
+ val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
+ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1L))
+ .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
+ wordCounts.print()
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
+ "<messagesPerSec> <wordsPerMessage>")
+ System.exit(1)
+ }
+
+ val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
+
+ // Zookeper connection properties
+ val props = new Properties()
+ props.put("metadata.broker.list", brokers)
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+
+ // Send some messages
+ while(true) {
+ val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+ val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
+ .mkString(" ")
+
+ new KeyedMessage[String, String](topic, str)
+ }.toArray
+
+ producer.send(messages: _*)
+ Thread.sleep(100)
+ }
+ }
+
+}
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
new file mode 100644
index 0000000000..683752ac96
--- /dev/null
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.twitter._
+
+// scalastyle:off
+/**
+ * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
+ * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
+ * <br>
+ * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
+ * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
+ * the same approach could be used for computing popular topics for example.
+ * <p>
+ * <p>
+ * <a href=
+ * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
+ * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
+ * of any given element, etc), that uses space sub-linear in the number of elements in the
+ * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
+ * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
+ * count.
+ * <p><p>
+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
+ * reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdCMS {
+ def main(args: Array[String]) {
+ StreamingExamples.setStreamingLogLevels()
+
+ // CMS parameters
+ val DELTA = 1E-3
+ val EPS = 0.01
+ val SEED = 1
+ val PERC = 0.001
+ // K highest frequency elements to take
+ val TOPK = 10
+
+ val filters = args
+ val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
+ val ssc = new StreamingContext(sparkConf, Seconds(10))
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
+ var globalCMS = cms.zero
+ val mm = new MapMonoid[Long, Int]()
+ var globalExact = Map[Long, Int]()
+
+ val approxTopUsers = users.mapPartitions(ids => {
+ ids.map(id => cms.create(id))
+ }).reduce(_ ++ _)
+
+ val exactTopUsers = users.map(id => (id, 1))
+ .reduceByKey((a, b) => a + b)
+
+ approxTopUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ val partialTopK = partial.heavyHitters.map(id =>
+ (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ globalCMS ++= partial
+ val globalTopK = globalCMS.heavyHitters.map(id =>
+ (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
+ partialTopK.mkString("[", ",", "]")))
+ println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
+ globalTopK.mkString("[", ",", "]")))
+ }
+ })
+
+ exactTopUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partialMap = rdd.collect().toMap
+ val partialTopK = rdd.map(
+ {case (id, count) => (count, id)})
+ .sortByKey(ascending = false).take(TOPK)
+ globalExact = mm.plus(globalExact.toMap, partialMap)
+ val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+ println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
+ println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
+ }
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
new file mode 100644
index 0000000000..62db5e663b
--- /dev/null
+++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.twitter._
+import org.apache.spark.SparkConf
+
+// scalastyle:off
+/**
+ * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
+ * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
+ * <p>
+ * <p>
+ * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
+ * blog post</a> and this
+ * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
+ * blog post</a>
+ * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
+ * estimating the cardinality of a data stream, i.e. the number of unique elements.
+ * <p><p>
+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
+ * reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdHLL {
+ def main(args: Array[String]) {
+
+ StreamingExamples.setStreamingLogLevels()
+
+ /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
+ val BIT_SIZE = 12
+ val filters = args
+ val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
+ val ssc = new StreamingContext(sparkConf, Seconds(5))
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
+
+ val users = stream.map(status => status.getUser.getId)
+
+ val hll = new HyperLogLogMonoid(BIT_SIZE)
+ var globalHll = hll.zero
+ var userSet: Set[Long] = Set()
+
+ val approxUsers = users.mapPartitions(ids => {
+ ids.map(id => hll(id))
+ }).reduce(_ + _)
+
+ val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+ approxUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ globalHll += partial
+ println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
+ println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
+ }
+ })
+
+ exactUsers.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val partial = rdd.first()
+ userSet ++= partial
+ println("Exact distinct users this batch: %d".format(partial.size))
+ println("Exact distinct users overall: %d".format(userSet.size))
+ println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
+ ) * 100))
+ }
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}