From a000b5c3b0438c17e9973df4832c320210c29c27 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 6 May 2014 17:27:52 -0700 Subject: SPARK-1637: Clean up examples for 1.0 - [x] Move all of them into subpackages of org.apache.spark.examples (right now some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib) - [x] Move Python examples into examples/src/main/python - [x] Update docs to reflect these changes Author: Sandeep This patch had conflicts when merged, resolved by Committer: Matei Zaharia Closes #571 from techaddict/SPARK-1637 and squashes the following commits: 47ef86c [Sandeep] Changes based on Discussions on PR, removing use of RawTextHelper from examples 8ed2d3f [Sandeep] Docs Updated for changes, Change for java examples 5f96121 [Sandeep] Move Python examples into examples/src/main/python 0a8dd77 [Sandeep] Move all Scala Examples to org.apache.spark.examples (some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib) --- .../org/apache/spark/examples/mllib/JavaALS.java | 90 +++++++++++ .../apache/spark/examples/mllib/JavaKMeans.java | 84 ++++++++++ .../org/apache/spark/examples/mllib/JavaLR.java | 82 ++++++++++ .../examples/streaming/JavaFlumeEventCount.java | 76 +++++++++ .../examples/streaming/JavaKafkaWordCount.java | 112 +++++++++++++ .../examples/streaming/JavaNetworkWordCount.java | 88 ++++++++++ .../spark/examples/streaming/JavaQueueStream.java | 86 ++++++++++ .../org/apache/spark/mllib/examples/JavaALS.java | 90 ----------- .../apache/spark/mllib/examples/JavaKMeans.java | 84 ---------- .../org/apache/spark/mllib/examples/JavaLR.java | 82 ---------- .../streaming/examples/JavaFlumeEventCount.java | 75 --------- .../streaming/examples/JavaKafkaWordCount.java | 111 ------------- .../streaming/examples/JavaNetworkWordCount.java | 87 ---------- .../spark/streaming/examples/JavaQueueStream.java | 85 ---------- examples/src/main/python/als.py | 87 ++++++++++ examples/src/main/python/kmeans.py | 73 +++++++++ examples/src/main/python/logistic_regression.py | 76 +++++++++ examples/src/main/python/mllib/kmeans.py | 44 +++++ .../src/main/python/mllib/logistic_regression.py | 50 ++++++ examples/src/main/python/pagerank.py | 70 ++++++++ examples/src/main/python/pi.py | 37 +++++ examples/src/main/python/sort.py | 36 +++++ examples/src/main/python/transitive_closure.py | 66 ++++++++ examples/src/main/python/wordcount.py | 35 ++++ .../apache/spark/examples/sql/RDDRelation.scala | 71 +++++++++ .../spark/examples/sql/hive/HiveFromSpark.scala | 64 ++++++++ .../spark/examples/streaming/ActorWordCount.scala | 177 +++++++++++++++++++++ .../spark/examples/streaming/FlumeEventCount.scala | 65 ++++++++ .../spark/examples/streaming/HdfsWordCount.scala | 55 +++++++ .../spark/examples/streaming/KafkaWordCount.scala | 103 ++++++++++++ .../spark/examples/streaming/MQTTWordCount.scala | 109 +++++++++++++ .../examples/streaming/NetworkWordCount.scala | 61 +++++++ .../spark/examples/streaming/QueueStream.scala | 58 +++++++ .../spark/examples/streaming/RawNetworkGrep.scala | 62 ++++++++ .../streaming/RecoverableNetworkWordCount.scala | 122 ++++++++++++++ .../streaming/StatefulNetworkWordCount.scala | 73 +++++++++ .../examples/streaming/StreamingExamples.scala | 38 +++++ .../examples/streaming/TwitterAlgebirdCMS.scala | 119 ++++++++++++++ .../examples/streaming/TwitterAlgebirdHLL.scala | 96 +++++++++++ .../examples/streaming/TwitterPopularTags.scala | 74 +++++++++ .../spark/examples/streaming/ZeroMQWordCount.scala | 101 ++++++++++++ .../streaming/clickstream/PageViewGenerator.scala | 109 +++++++++++++ .../streaming/clickstream/PageViewStream.scala | 107 +++++++++++++ .../apache/spark/sql/examples/HiveFromSpark.scala | 64 -------- .../apache/spark/sql/examples/RDDRelation.scala | 71 --------- .../spark/streaming/examples/ActorWordCount.scala | 177 --------------------- .../spark/streaming/examples/FlumeEventCount.scala | 65 -------- .../spark/streaming/examples/HdfsWordCount.scala | 55 ------- .../spark/streaming/examples/KafkaWordCount.scala | 104 ------------ .../spark/streaming/examples/MQTTWordCount.scala | 109 ------------- .../streaming/examples/NetworkWordCount.scala | 61 ------- .../spark/streaming/examples/QueueStream.scala | 58 ------- .../spark/streaming/examples/RawNetworkGrep.scala | 66 -------- .../examples/RecoverableNetworkWordCount.scala | 122 -------------- .../examples/StatefulNetworkWordCount.scala | 73 --------- .../streaming/examples/StreamingExamples.scala | 38 ----- .../streaming/examples/TwitterAlgebirdCMS.scala | 119 -------------- .../streaming/examples/TwitterAlgebirdHLL.scala | 96 ----------- .../streaming/examples/TwitterPopularTags.scala | 74 --------- .../spark/streaming/examples/ZeroMQWordCount.scala | 101 ------------ .../examples/clickstream/PageViewGenerator.scala | 109 ------------- .../examples/clickstream/PageViewStream.scala | 107 ------------- 62 files changed, 2856 insertions(+), 2283 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java delete mode 100644 examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java delete mode 100644 examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java delete mode 100644 examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java delete mode 100644 examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java delete mode 100644 examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java delete mode 100644 examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java delete mode 100644 examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java create mode 100755 examples/src/main/python/als.py create mode 100755 examples/src/main/python/kmeans.py create mode 100755 examples/src/main/python/logistic_regression.py create mode 100755 examples/src/main/python/mllib/kmeans.py create mode 100755 examples/src/main/python/mllib/logistic_regression.py create mode 100755 examples/src/main/python/pagerank.py create mode 100755 examples/src/main/python/pi.py create mode 100755 examples/src/main/python/sort.py create mode 100755 examples/src/main/python/transitive_closure.py create mode 100755 examples/src/main/python/wordcount.py create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala delete mode 100644 examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala delete mode 100644 examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala (limited to 'examples/src/main') diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java new file mode 100644 index 0000000000..4533c4c5f2 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import org.apache.spark.mllib.recommendation.ALS; +import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; +import org.apache.spark.mllib.recommendation.Rating; + +import java.util.Arrays; +import java.util.regex.Pattern; + +import scala.Tuple2; + +/** + * Example using MLLib ALS from Java. + */ +public final class JavaALS { + + static class ParseRating implements Function { + private static final Pattern COMMA = Pattern.compile(","); + + @Override + public Rating call(String line) { + String[] tok = COMMA.split(line); + int x = Integer.parseInt(tok[0]); + int y = Integer.parseInt(tok[1]); + double rating = Double.parseDouble(tok[2]); + return new Rating(x, y, rating); + } + } + + static class FeaturesToString implements Function, String> { + @Override + public String call(Tuple2 element) { + return element._1() + "," + Arrays.toString(element._2()); + } + } + + public static void main(String[] args) { + + if (args.length != 5 && args.length != 6) { + System.err.println( + "Usage: JavaALS []"); + System.exit(1); + } + + int rank = Integer.parseInt(args[2]); + int iterations = Integer.parseInt(args[3]); + String outputDir = args[4]; + int blocks = -1; + if (args.length == 6) { + blocks = Integer.parseInt(args[5]); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS", + System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class)); + JavaRDD lines = sc.textFile(args[1]); + + JavaRDD ratings = lines.map(new ParseRating()); + + MatrixFactorizationModel model = ALS.train(ratings.rdd(), rank, iterations, 0.01, blocks); + + model.userFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile( + outputDir + "/userFeatures"); + model.productFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile( + outputDir + "/productFeatures"); + System.out.println("Final user/product features written to " + outputDir); + + sc.stop(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java new file mode 100644 index 0000000000..0cfb8e69ed --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import java.util.regex.Pattern; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import org.apache.spark.mllib.clustering.KMeans; +import org.apache.spark.mllib.clustering.KMeansModel; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; + +/** + * Example using MLLib KMeans from Java. + */ +public final class JavaKMeans { + + private static class ParsePoint implements Function { + private static final Pattern SPACE = Pattern.compile(" "); + + @Override + public Vector call(String line) { + String[] tok = SPACE.split(line); + double[] point = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + point[i] = Double.parseDouble(tok[i]); + } + return Vectors.dense(point); + } + } + + public static void main(String[] args) { + + if (args.length < 4) { + System.err.println( + "Usage: JavaKMeans []"); + System.exit(1); + } + + String inputFile = args[1]; + int k = Integer.parseInt(args[2]); + int iterations = Integer.parseInt(args[3]); + int runs = 1; + + if (args.length >= 5) { + runs = Integer.parseInt(args[4]); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans", + System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class)); + JavaRDD lines = sc.textFile(inputFile); + + JavaRDD points = lines.map(new ParsePoint()); + + KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL()); + + System.out.println("Cluster centers:"); + for (Vector center : model.clusterCenters()) { + System.out.println(" " + center); + } + double cost = model.computeCost(points.rdd()); + System.out.println("Cost: " + cost); + + sc.stop(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java new file mode 100644 index 0000000000..f6e48b4987 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import java.util.regex.Pattern; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import org.apache.spark.mllib.classification.LogisticRegressionWithSGD; +import org.apache.spark.mllib.classification.LogisticRegressionModel; +import org.apache.spark.mllib.linalg.Vectors; +import org.apache.spark.mllib.regression.LabeledPoint; + +/** + * Logistic regression based classification using ML Lib. + */ +public final class JavaLR { + + static class ParsePoint implements Function { + private static final Pattern COMMA = Pattern.compile(","); + private static final Pattern SPACE = Pattern.compile(" "); + + @Override + public LabeledPoint call(String line) { + String[] parts = COMMA.split(line); + double y = Double.parseDouble(parts[0]); + String[] tok = SPACE.split(parts[1]); + double[] x = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + x[i] = Double.parseDouble(tok[i]); + } + return new LabeledPoint(y, Vectors.dense(x)); + } + } + + public static void main(String[] args) { + if (args.length != 4) { + System.err.println("Usage: JavaLR "); + System.exit(1); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR", + System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class)); + JavaRDD lines = sc.textFile(args[1]); + JavaRDD points = lines.map(new ParsePoint()).cache(); + double stepSize = Double.parseDouble(args[2]); + int iterations = Integer.parseInt(args[3]); + + // Another way to configure LogisticRegression + // + // LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD(); + // lr.optimizer().setNumIterations(iterations) + // .setStepSize(stepSize) + // .setMiniBatchFraction(1.0); + // lr.setIntercept(true); + // LogisticRegressionModel model = lr.train(points.rdd()); + + LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(), + iterations, stepSize); + + System.out.print("Final w: " + model.weights()); + + sc.stop(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java new file mode 100644 index 0000000000..a5ece68cef --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.flume.FlumeUtils; +import org.apache.spark.streaming.flume.SparkFlumeEvent; + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: JavaFlumeEventCount + * + * is a Spark master URL + * is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * is the port the Flume receiver will listen on. + */ +public final class JavaFlumeEventCount { + private JavaFlumeEventCount() { + } + + public static void main(String[] args) { + if (args.length != 3) { + System.err.println("Usage: JavaFlumeEventCount "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + String master = args[0]; + String host = args[1]; + int port = Integer.parseInt(args[2]); + + Duration batchInterval = new Duration(2000); + + JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), + JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); + JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(ssc, "localhost", port); + + flumeStream.count(); + + flumeStream.count().map(new Function() { + @Override + public String call(Long in) { + return "Received " + in + " flume events."; + } + }).print(); + + ssc.start(); + ssc.awaitTermination(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java new file mode 100644 index 0000000000..da51eb189a --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Map; +import java.util.HashMap; +import java.util.regex.Pattern; + +import com.google.common.collect.Lists; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; +import scala.Tuple2; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaKafkaWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use + * + * Example: + * `./bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount local[2] zoo01,zoo02, + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + + public static void main(String[] args) { + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + // Create the context with a 1 second batch size + JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", + new Duration(2000), System.getenv("SPARK_HOME"), + JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); + + int numThreads = Integer.parseInt(args[4]); + Map topicMap = new HashMap(); + String[] topics = args[3].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairReceiverInputDStream messages = + KafkaUtils.createStream(jssc, args[1], args[2], topicMap); + + JavaDStream lines = messages.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) { + return tuple2._2(); + } + }); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + jssc.start(); + jssc.awaitTermination(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java new file mode 100644 index 0000000000..ac84991d87 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import com.google.common.collect.Lists; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import scala.Tuple2; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +import java.util.regex.Pattern; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: JavaNetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999` + */ +public final class JavaNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 3) { + System.err.println("Usage: JavaNetworkWordCount \n" + + "In local mode, should be 'local[n]' with n > 1"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", + new Duration(1000), System.getenv("SPARK_HOME"), + JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class)); + + // Create a JavaReceiverInputDStream on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + JavaReceiverInputDStream lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + JavaPairDStream wordCounts = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + ssc.awaitTermination(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java new file mode 100644 index 0000000000..819311968f --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import com.google.common.collect.Lists; +import scala.Tuple2; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +public final class JavaQueueStream { + private JavaQueueStream() { + } + + public static void main(String[] args) throws Exception { + if (args.length < 1) { + System.err.println("Usage: JavaQueueStream "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + // Create the context + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), + System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class)); + + // Create the queue through which RDDs can be pushed to + // a QueueInputDStream + Queue> rddQueue = new LinkedList>(); + + // Create and push some RDDs into the queue + List list = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + list.add(i); + } + + for (int i = 0; i < 30; i++) { + rddQueue.add(ssc.sparkContext().parallelize(list)); + } + + // Create the QueueInputDStream and use it do some processing + JavaDStream inputStream = ssc.queueStream(rddQueue); + JavaPairDStream mappedStream = inputStream.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(Integer i) { + return new Tuple2(i % 10, 1); + } + }); + JavaPairDStream reducedStream = mappedStream.reduceByKey( + new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + reducedStream.print(); + ssc.start(); + ssc.awaitTermination(); + } +} diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java deleted file mode 100644 index c516199d61..0000000000 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.examples; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -import org.apache.spark.mllib.recommendation.ALS; -import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; -import org.apache.spark.mllib.recommendation.Rating; - -import java.util.Arrays; -import java.util.regex.Pattern; - -import scala.Tuple2; - -/** - * Example using MLLib ALS from Java. - */ -public final class JavaALS { - - static class ParseRating implements Function { - private static final Pattern COMMA = Pattern.compile(","); - - @Override - public Rating call(String line) { - String[] tok = COMMA.split(line); - int x = Integer.parseInt(tok[0]); - int y = Integer.parseInt(tok[1]); - double rating = Double.parseDouble(tok[2]); - return new Rating(x, y, rating); - } - } - - static class FeaturesToString implements Function, String> { - @Override - public String call(Tuple2 element) { - return element._1() + "," + Arrays.toString(element._2()); - } - } - - public static void main(String[] args) { - - if (args.length != 5 && args.length != 6) { - System.err.println( - "Usage: JavaALS []"); - System.exit(1); - } - - int rank = Integer.parseInt(args[2]); - int iterations = Integer.parseInt(args[3]); - String outputDir = args[4]; - int blocks = -1; - if (args.length == 6) { - blocks = Integer.parseInt(args[5]); - } - - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class)); - JavaRDD lines = sc.textFile(args[1]); - - JavaRDD ratings = lines.map(new ParseRating()); - - MatrixFactorizationModel model = ALS.train(ratings.rdd(), rank, iterations, 0.01, blocks); - - model.userFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile( - outputDir + "/userFeatures"); - model.productFeatures().toJavaRDD().map(new FeaturesToString()).saveAsTextFile( - outputDir + "/productFeatures"); - System.out.println("Final user/product features written to " + outputDir); - - sc.stop(); - } -} diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java deleted file mode 100644 index 7461609ab9..0000000000 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.examples; - -import java.util.regex.Pattern; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -import org.apache.spark.mllib.clustering.KMeans; -import org.apache.spark.mllib.clustering.KMeansModel; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; - -/** - * Example using MLLib KMeans from Java. - */ -public final class JavaKMeans { - - private static class ParsePoint implements Function { - private static final Pattern SPACE = Pattern.compile(" "); - - @Override - public Vector call(String line) { - String[] tok = SPACE.split(line); - double[] point = new double[tok.length]; - for (int i = 0; i < tok.length; ++i) { - point[i] = Double.parseDouble(tok[i]); - } - return Vectors.dense(point); - } - } - - public static void main(String[] args) { - - if (args.length < 4) { - System.err.println( - "Usage: JavaKMeans []"); - System.exit(1); - } - - String inputFile = args[1]; - int k = Integer.parseInt(args[2]); - int iterations = Integer.parseInt(args[3]); - int runs = 1; - - if (args.length >= 5) { - runs = Integer.parseInt(args[4]); - } - - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class)); - JavaRDD lines = sc.textFile(inputFile); - - JavaRDD points = lines.map(new ParsePoint()); - - KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL()); - - System.out.println("Cluster centers:"); - for (Vector center : model.clusterCenters()) { - System.out.println(" " + center); - } - double cost = model.computeCost(points.rdd()); - System.out.println("Cost: " + cost); - - sc.stop(); - } -} diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java deleted file mode 100644 index e3ab87cc72..0000000000 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.examples; - -import java.util.regex.Pattern; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; - -import org.apache.spark.mllib.classification.LogisticRegressionWithSGD; -import org.apache.spark.mllib.classification.LogisticRegressionModel; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.mllib.regression.LabeledPoint; - -/** - * Logistic regression based classification using ML Lib. - */ -public final class JavaLR { - - static class ParsePoint implements Function { - private static final Pattern COMMA = Pattern.compile(","); - private static final Pattern SPACE = Pattern.compile(" "); - - @Override - public LabeledPoint call(String line) { - String[] parts = COMMA.split(line); - double y = Double.parseDouble(parts[0]); - String[] tok = SPACE.split(parts[1]); - double[] x = new double[tok.length]; - for (int i = 0; i < tok.length; ++i) { - x[i] = Double.parseDouble(tok[i]); - } - return new LabeledPoint(y, Vectors.dense(x)); - } - } - - public static void main(String[] args) { - if (args.length != 4) { - System.err.println("Usage: JavaLR "); - System.exit(1); - } - - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR", - System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class)); - JavaRDD lines = sc.textFile(args[1]); - JavaRDD points = lines.map(new ParsePoint()).cache(); - double stepSize = Double.parseDouble(args[2]); - int iterations = Integer.parseInt(args[3]); - - // Another way to configure LogisticRegression - // - // LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD(); - // lr.optimizer().setNumIterations(iterations) - // .setStepSize(stepSize) - // .setMiniBatchFraction(1.0); - // lr.setIntercept(true); - // LogisticRegressionModel model = lr.train(points.rdd()); - - LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(), - iterations, stepSize); - - System.out.print("Final w: " + model.weights()); - - sc.stop(); - } -} diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java deleted file mode 100644 index c59f7538f8..0000000000 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples; - -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.*; -import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.flume.FlumeUtils; -import org.apache.spark.streaming.flume.SparkFlumeEvent; - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: JavaFlumeEventCount - * - * is a Spark master URL - * is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * is the port the Flume receiver will listen on. - */ -public final class JavaFlumeEventCount { - private JavaFlumeEventCount() { - } - - public static void main(String[] args) { - if (args.length != 3) { - System.err.println("Usage: JavaFlumeEventCount "); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - String master = args[0]; - String host = args[1]; - int port = Integer.parseInt(args[2]); - - Duration batchInterval = new Duration(2000); - - JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, - System.getenv("SPARK_HOME"), - JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); - JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(ssc, "localhost", port); - - flumeStream.count(); - - flumeStream.count().map(new Function() { - @Override - public String call(Long in) { - return "Received " + in + " flume events."; - } - }).print(); - - ssc.start(); - ssc.awaitTermination(); - } -} diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java deleted file mode 100644 index 8da9bcd05a..0000000000 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples; - -import java.util.Map; -import java.util.HashMap; -import java.util.regex.Pattern; - -import com.google.common.collect.Lists; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; -import scala.Tuple2; - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: JavaKafkaWordCount - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * is a list of one or more zookeeper servers that make quorum - * is the name of kafka consumer group - * is a list of one or more kafka topics to consume from - * is the number of threads the kafka consumer should use - * - * Example: - * `./bin/run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02, - * zoo03 my-consumer-group topic1,topic2 1` - */ - -public final class JavaKafkaWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - private JavaKafkaWordCount() { - } - - public static void main(String[] args) { - if (args.length < 5) { - System.err.println("Usage: KafkaWordCount "); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - // Create the context with a 1 second batch size - JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", - new Duration(2000), System.getenv("SPARK_HOME"), - JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); - - int numThreads = Integer.parseInt(args[4]); - Map topicMap = new HashMap(); - String[] topics = args[3].split(","); - for (String topic: topics) { - topicMap.put(topic, numThreads); - } - - JavaPairReceiverInputDStream messages = - KafkaUtils.createStream(jssc, args[1], args[2], topicMap); - - JavaDStream lines = messages.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); - - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - - wordCounts.print(); - jssc.start(); - jssc.awaitTermination(); - } -} diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java deleted file mode 100644 index 098c329ff6..0000000000 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples; - -import com.google.common.collect.Lists; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import scala.Tuple2; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -import java.util.regex.Pattern; - -/** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: JavaNetworkWordCount - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and describe the TCP server that Spark Streaming would connect to receive data. - * - * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` - */ -public final class JavaNetworkWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - public static void main(String[] args) { - if (args.length < 3) { - System.err.println("Usage: JavaNetworkWordCount \n" + - "In local mode, should be 'local[n]' with n > 1"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", - new Duration(1000), System.getenv("SPARK_HOME"), - JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class)); - - // Create a JavaReceiverInputDStream on target ip:port and count the - // words in input stream of \n delimited text (eg. generated by 'nc') - JavaReceiverInputDStream lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterable call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - JavaPairDStream wordCounts = words.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }).reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - - wordCounts.print(); - ssc.start(); - ssc.awaitTermination(); - } -} diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java deleted file mode 100644 index 88ad341641..0000000000 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples; - -import com.google.common.collect.Lists; -import scala.Tuple2; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; - -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; - -public final class JavaQueueStream { - private JavaQueueStream() { - } - - public static void main(String[] args) throws Exception { - if (args.length < 1) { - System.err.println("Usage: JavaQueueStream "); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - // Create the context - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), - System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class)); - - // Create the queue through which RDDs can be pushed to - // a QueueInputDStream - Queue> rddQueue = new LinkedList>(); - - // Create and push some RDDs into the queue - List list = Lists.newArrayList(); - for (int i = 0; i < 1000; i++) { - list.add(i); - } - - for (int i = 0; i < 30; i++) { - rddQueue.add(ssc.sparkContext().parallelize(list)); - } - - // Create the QueueInputDStream and use it do some processing - JavaDStream inputStream = ssc.queueStream(rddQueue); - JavaPairDStream mappedStream = inputStream.mapToPair( - new PairFunction() { - @Override - public Tuple2 call(Integer i) { - return new Tuple2(i % 10, 1); - } - }); - JavaPairDStream reducedStream = mappedStream.reduceByKey( - new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - - reducedStream.print(); - ssc.start(); - ssc.awaitTermination(); - } -} diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py new file mode 100755 index 0000000000..a77dfb2577 --- /dev/null +++ b/examples/src/main/python/als.py @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This example requires numpy (http://www.numpy.org/) +""" +from os.path import realpath +import sys + +import numpy as np +from numpy.random import rand +from numpy import matrix +from pyspark import SparkContext + +LAMBDA = 0.01 # regularization +np.random.seed(42) + +def rmse(R, ms, us): + diff = R - ms * us.T + return np.sqrt(np.sum(np.power(diff, 2)) / M * U) + +def update(i, vec, mat, ratings): + uu = mat.shape[0] + ff = mat.shape[1] + XtX = matrix(np.zeros((ff, ff))) + Xty = np.zeros((ff, 1)) + + for j in range(uu): + v = mat[j, :] + XtX += v.T * v + Xty += v.T * ratings[i, j] + XtX += np.eye(ff, ff) * LAMBDA * uu + return np.linalg.solve(XtX, Xty) + +if __name__ == "__main__": + if len(sys.argv) < 2: + print >> sys.stderr, "Usage: als " + exit(-1) + sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)]) + M = int(sys.argv[2]) if len(sys.argv) > 2 else 100 + U = int(sys.argv[3]) if len(sys.argv) > 3 else 500 + F = int(sys.argv[4]) if len(sys.argv) > 4 else 10 + ITERATIONS = int(sys.argv[5]) if len(sys.argv) > 5 else 5 + slices = int(sys.argv[6]) if len(sys.argv) > 6 else 2 + + print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \ + (M, U, F, ITERATIONS, slices) + + R = matrix(rand(M, F)) * matrix(rand(U, F).T) + ms = matrix(rand(M ,F)) + us = matrix(rand(U, F)) + + Rb = sc.broadcast(R) + msb = sc.broadcast(ms) + usb = sc.broadcast(us) + + for i in range(ITERATIONS): + ms = sc.parallelize(range(M), slices) \ + .map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \ + .collect() + ms = matrix(np.array(ms)[:, :, 0]) # collect() returns a list, so array ends up being + # a 3-d array, we take the first 2 dims for the matrix + msb = sc.broadcast(ms) + + us = sc.parallelize(range(U), slices) \ + .map(lambda x: update(x, usb.value[x, :], msb.value, Rb.value.T)) \ + .collect() + us = matrix(np.array(us)[:, :, 0]) + usb = sc.broadcast(us) + + error = rmse(R, ms, us) + print "Iteration %d:" % i + print "\nRMSE: %5.4f\n" % error diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py new file mode 100755 index 0000000000..e3596488fa --- /dev/null +++ b/examples/src/main/python/kmeans.py @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +The K-means algorithm written from scratch against PySpark. In practice, +one may prefer to use the KMeans algorithm in MLlib, as shown in +examples/src/main/python/mllib/kmeans.py. + +This example requires NumPy (http://www.numpy.org/). +""" + +import sys + +import numpy as np +from pyspark import SparkContext + + +def parseVector(line): + return np.array([float(x) for x in line.split(' ')]) + + +def closestPoint(p, centers): + bestIndex = 0 + closest = float("+inf") + for i in range(len(centers)): + tempDist = np.sum((p - centers[i]) ** 2) + if tempDist < closest: + closest = tempDist + bestIndex = i + return bestIndex + + +if __name__ == "__main__": + if len(sys.argv) < 5: + print >> sys.stderr, "Usage: kmeans " + exit(-1) + sc = SparkContext(sys.argv[1], "PythonKMeans") + lines = sc.textFile(sys.argv[2]) + data = lines.map(parseVector).cache() + K = int(sys.argv[3]) + convergeDist = float(sys.argv[4]) + + kPoints = data.takeSample(False, K, 1) + tempDist = 1.0 + + while tempDist > convergeDist: + closest = data.map( + lambda p : (closestPoint(p, kPoints), (p, 1))) + pointStats = closest.reduceByKey( + lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2)) + newPoints = pointStats.map( + lambda (x, (y, z)): (x, y / z)).collect() + + tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) + + for (x, y) in newPoints: + kPoints[x] = y + + print "Final centers: " + str(kPoints) diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py new file mode 100755 index 0000000000..fe5373cf79 --- /dev/null +++ b/examples/src/main/python/logistic_regression.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A logistic regression implementation that uses NumPy (http://www.numpy.org) +to act on batches of input data using efficient matrix operations. + +In practice, one may prefer to use the LogisticRegression algorithm in +MLlib, as shown in examples/src/main/python/mllib/logistic_regression.py. +""" + +from collections import namedtuple +from math import exp +from os.path import realpath +import sys + +import numpy as np +from pyspark import SparkContext + + +D = 10 # Number of dimensions + + +# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to +# make further computations faster. +# The data file contains lines of the form