aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-11-11 21:36:48 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-11 21:37:00 -0800
commit12f56334bb308c19d1c6c017fe1ec10808bde12a (patch)
tree1984f9c15cb0ef568f45fde2976455abaa0450a4 /examples/src/main
parent307b69d73c37b5a580a1079843b13aeac1f6f6f4 (diff)
downloadspark-12f56334bb308c19d1c6c017fe1ec10808bde12a.tar.gz
spark-12f56334bb308c19d1c6c017fe1ec10808bde12a.tar.bz2
spark-12f56334bb308c19d1c6c017fe1ec10808bde12a.zip
Support cross building for Scala 2.11
Let's give this another go using a version of Hive that shades its JLine dependency. Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #3159 from pwendell/scala-2.11-prashant and squashes the following commits: e93aa3e [Patrick Wendell] Restoring -Phive-thriftserver profile and cleaning up build script. f65d17d [Patrick Wendell] Fixing build issue due to merge conflict a8c41eb [Patrick Wendell] Reverting dev/run-tests back to master state. 7a6eb18 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into scala-2.11-prashant 583aa07 [Prashant Sharma] REVERT ME: removed hive thirftserver 3680e58 [Prashant Sharma] Revert "REVERT ME: Temporarily removing some Cli tests." 935fb47 [Prashant Sharma] Revert "Fixed by disabling a few tests temporarily." 925e90f [Prashant Sharma] Fixed by disabling a few tests temporarily. 2fffed3 [Prashant Sharma] Exclude groovy from sbt build, and also provide a way for such instances in future. 8bd4e40 [Prashant Sharma] Switched to gmaven plus, it fixes random failures observer with its predecessor gmaven. 5272ce5 [Prashant Sharma] SPARK_SCALA_VERSION related bugs. 2121071 [Patrick Wendell] Migrating version detection to PySpark b1ed44d [Patrick Wendell] REVERT ME: Temporarily removing some Cli tests. 1743a73 [Patrick Wendell] Removing decimal test that doesn't work with Scala 2.11 f5cad4e [Patrick Wendell] Add Scala 2.11 docs 210d7e1 [Patrick Wendell] Revert "Testing new Hive version with shaded jline" 48518ce [Patrick Wendell] Remove association of Hive and Thriftserver profiles. e9d0a06 [Patrick Wendell] Revert "Enable thritfserver for Scala 2.10 only" 67ec364 [Patrick Wendell] Guard building of thriftserver around Scala 2.10 check 8502c23 [Patrick Wendell] Enable thritfserver for Scala 2.10 only e22b104 [Patrick Wendell] Small fix in pom file ec402ab [Patrick Wendell] Various fixes 0be5a9d [Patrick Wendell] Testing new Hive version with shaded jline 4eaec65 [Prashant Sharma] Changed scripts to ignore target. 5167bea [Prashant Sharma] small correction a4fcac6 [Prashant Sharma] Run against scala 2.11 on jenkins. 80285f4 [Prashant Sharma] MAven equivalent of setting spark.executor.extraClasspath during tests. 034b369 [Prashant Sharma] Setting test jars on executor classpath during tests from sbt. d4874cb [Prashant Sharma] Fixed Python Runner suite. null check should be first case in scala 2.11. 6f50f13 [Prashant Sharma] Fixed build after rebasing with master. We should use ${scala.binary.version} instead of just 2.10 e56ca9d [Prashant Sharma] Print an error if build for 2.10 and 2.11 is spotted. 937c0b8 [Prashant Sharma] SCALA_VERSION -> SPARK_SCALA_VERSION cb059b0 [Prashant Sharma] Code review 0476e5e [Prashant Sharma] Scala 2.11 support with repl and all build changes. (cherry picked from commit daaca14c16dc2c1abc98f15ab8c6f7c14761b627) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
Diffstat (limited to 'examples/src/main')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java113
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala102
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala114
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala92
4 files changed, 0 insertions, 421 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
deleted file mode 100644
index 16ae9a3319..0000000000
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.regex.Pattern;
-
-
-import scala.Tuple2;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- *
- * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- * <zkQuorum> is a list of one or more zookeeper servers that make quorum
- * <group> is the name of kafka consumer group
- * <topics> is a list of one or more kafka topics to consume from
- * <numThreads> is the number of threads the kafka consumer should use
- *
- * To run this example:
- * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
- * zoo03 my-consumer-group topic1,topic2 1`
- */
-
-public final class JavaKafkaWordCount {
- private static final Pattern SPACE = Pattern.compile(" ");
-
- private JavaKafkaWordCount() {
- }
-
- public static void main(String[] args) {
- if (args.length < 4) {
- System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
- SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
- // Create the context with a 1 second batch size
- JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
-
- int numThreads = Integer.parseInt(args[3]);
- Map<String, Integer> topicMap = new HashMap<String, Integer>();
- String[] topics = args[2].split(",");
- for (String topic: topics) {
- topicMap.put(topic, numThreads);
- }
-
- JavaPairReceiverInputDStream<String, String> messages =
- KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
-
- JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- });
-
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
- }
- });
-
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- wordCounts.print();
- jssc.start();
- jssc.awaitTermination();
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
deleted file mode 100644
index c9e1511278..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import java.util.Properties
-
-import kafka.producer._
-
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.kafka._
-import org.apache.spark.SparkConf
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- * <zkQuorum> is a list of one or more zookeeper servers that make quorum
- * <group> is the name of kafka consumer group
- * <topics> is a list of one or more kafka topics to consume from
- * <numThreads> is the number of threads the kafka consumer should use
- *
- * Example:
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
- * my-consumer-group topic1,topic2 1`
- */
-object KafkaWordCount {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- val Array(zkQuorum, group, topics, numThreads) = args
- val sparkConf = new SparkConf().setAppName("KafkaWordCount")
- val ssc = new StreamingContext(sparkConf, Seconds(2))
- ssc.checkpoint("checkpoint")
-
- val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1L))
- .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
- wordCounts.print()
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
-
-// Produces some random words between 1 and 100.
-object KafkaWordCountProducer {
-
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
- "<messagesPerSec> <wordsPerMessage>")
- System.exit(1)
- }
-
- val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
-
- // Zookeper connection properties
- val props = new Properties()
- props.put("metadata.broker.list", brokers)
- props.put("serializer.class", "kafka.serializer.StringEncoder")
-
- val config = new ProducerConfig(props)
- val producer = new Producer[String, String](config)
-
- // Send some messages
- while(true) {
- val messages = (1 to messagesPerSec.toInt).map { messageNum =>
- val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
- .mkString(" ")
-
- new KeyedMessage[String, String](topic, str)
- }.toArray
-
- producer.send(messages: _*)
- Thread.sleep(100)
- }
- }
-
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
deleted file mode 100644
index 683752ac96..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird._
-
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
- * windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
- * <br>
- * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs,
- * the example operates on Long IDs. Once the implementation supports other inputs (such as String),
- * the same approach could be used for computing popular topics for example.
- * <p>
- * <p>
- * <a href=
- * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data
- * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency
- * of any given element, etc), that uses space sub-linear in the number of elements in the
- * stream. Once elements are added to the CMS, the estimated count of an element can be computed,
- * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total
- * count.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdCMS {
- def main(args: Array[String]) {
- StreamingExamples.setStreamingLogLevels()
-
- // CMS parameters
- val DELTA = 1E-3
- val EPS = 0.01
- val SEED = 1
- val PERC = 0.001
- // K highest frequency elements to take
- val TOPK = 10
-
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
- val ssc = new StreamingContext(sparkConf, Seconds(10))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
-
- val users = stream.map(status => status.getUser.getId)
-
- val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
- var globalCMS = cms.zero
- val mm = new MapMonoid[Long, Int]()
- var globalExact = Map[Long, Int]()
-
- val approxTopUsers = users.mapPartitions(ids => {
- ids.map(id => cms.create(id))
- }).reduce(_ ++ _)
-
- val exactTopUsers = users.map(id => (id, 1))
- .reduceByKey((a, b) => a + b)
-
- approxTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- val partialTopK = partial.heavyHitters.map(id =>
- (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- globalCMS ++= partial
- val globalTopK = globalCMS.heavyHitters.map(id =>
- (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC,
- partialTopK.mkString("[", ",", "]")))
- println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC,
- globalTopK.mkString("[", ",", "]")))
- }
- })
-
- exactTopUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partialMap = rdd.collect().toMap
- val partialTopK = rdd.map(
- {case (id, count) => (count, id)})
- .sortByKey(ascending = false).take(TOPK)
- globalExact = mm.plus(globalExact.toMap, partialMap)
- val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
- println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]")))
- println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]")))
- }
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
deleted file mode 100644
index 62db5e663b..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird.HyperLogLogMonoid
-import com.twitter.algebird.HyperLogLog._
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-import org.apache.spark.SparkConf
-
-// scalastyle:off
-/**
- * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
- * a windowed and global estimate of the unique user IDs occurring in a Twitter stream.
- * <p>
- * <p>
- * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * blog post</a> and this
- * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html">
- * blog post</a>
- * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for
- * estimating the cardinality of a data stream, i.e. the number of unique elements.
- * <p><p>
- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the
- * reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdHLL {
- def main(args: Array[String]) {
-
- StreamingExamples.setStreamingLogLevels()
-
- /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
- val BIT_SIZE = 12
- val filters = args
- val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
- val ssc = new StreamingContext(sparkConf, Seconds(5))
- val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
-
- val users = stream.map(status => status.getUser.getId)
-
- val hll = new HyperLogLogMonoid(BIT_SIZE)
- var globalHll = hll.zero
- var userSet: Set[Long] = Set()
-
- val approxUsers = users.mapPartitions(ids => {
- ids.map(id => hll(id))
- }).reduce(_ + _)
-
- val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
-
- approxUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- globalHll += partial
- println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
- println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt))
- }
- })
-
- exactUsers.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val partial = rdd.first()
- userSet ++= partial
- println("Exact distinct users this batch: %d".format(partial.size))
- println("Exact distinct users overall: %d".format(userSet.size))
- println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1
- ) * 100))
- }
- })
-
- ssc.start()
- ssc.awaitTermination()
- }
-}