aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-01-08 21:19:08 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-01-08 21:19:08 -0800
commit91227566bc9d8aabaec3f2a37a09a17afa20989c (patch)
tree80d19aac29217005b3f1cb08ca95fa08bbb9d946 /examples
parent7210257ba3038d5e22d4b60fe9c3113dc45c3dff (diff)
parent04d83fc37f9eef89c20331c85291a0a169f75e6d (diff)
downloadspark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.gz
spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.bz2
spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.zip
Merge remote-tracking branch 'spark-upstream/master' into HEAD
Conflicts: README.md core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala pom.xml project/SparkBuild.scala repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml67
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java31
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaKMeans.java25
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java24
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java16
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java12
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaTC.java18
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java14
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java21
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java18
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java21
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java18
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java28
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java18
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java13
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LogQuery.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkALS.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPi.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTC.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala10
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala16
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala5
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala18
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala5
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala23
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala6
48 files changed, 295 insertions, 213 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index aee371fbc7..cb4f7ee33b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -26,7 +26,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-examples_2.9.3</artifactId>
+ <artifactId>spark-examples_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Examples</name>
<url>http://spark.incubator.apache.org/</url>
@@ -49,29 +49,54 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.9.3</artifactId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib_2.9.3</artifactId>
+ <artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel_2.9.3</artifactId>
+ <artifactId>spark-bagel_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
@@ -87,43 +112,28 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.9.2</artifactId>
- <version>0.8.0-beta1</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
- <artifactId>algebird-core_2.9.2</artifactId>
+ <artifactId>algebird-core_${scala.binary.version}</artifactId>
<version>0.1.11</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.9.3</artifactId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
- <version>1.2.5</version>
+ <version>1.2.6</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
@@ -166,15 +176,15 @@
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/scala-${scala.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile>
+ <outputFile>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
@@ -203,6 +213,9 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>log4j.properties</resource>
+ </transformer>
</transformers>
</configuration>
</execution>
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index be0d38589c..d552c47b22 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -24,19 +24,19 @@ import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.StringTokenizer;
import java.util.Random;
+import java.util.regex.Pattern;
/**
* Logistic regression based classification.
*/
-public class JavaHdfsLR {
+public final class JavaHdfsLR {
- static int D = 10; // Number of dimensions
- static Random rand = new Random(42);
+ private static final int D = 10; // Number of dimensions
+ private static final Random rand = new Random(42);
static class DataPoint implements Serializable {
- public DataPoint(double[] x, double y) {
+ DataPoint(double[] x, double y) {
this.x = x;
this.y = y;
}
@@ -46,20 +46,22 @@ public class JavaHdfsLR {
}
static class ParsePoint extends Function<String, DataPoint> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public DataPoint call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- double y = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(line);
+ double y = Double.parseDouble(tok[0]);
double[] x = new double[D];
- int i = 0;
- while (i < D) {
- x[i] = Double.parseDouble(tok.nextToken());
- i += 1;
+ for (int i = 0; i < D; i++) {
+ x[i] = Double.parseDouble(tok[i + 1]);
}
return new DataPoint(x, y);
}
}
static class VectorSum extends Function2<double[], double[], double[]> {
+ @Override
public double[] call(double[] a, double[] b) {
double[] result = new double[D];
for (int j = 0; j < D; j++) {
@@ -70,12 +72,13 @@ public class JavaHdfsLR {
}
static class ComputeGradient extends Function<DataPoint, double[]> {
- double[] weights;
+ private final double[] weights;
- public ComputeGradient(double[] weights) {
+ ComputeGradient(double[] weights) {
this.weights = weights;
}
+ @Override
public double[] call(DataPoint p) {
double[] gradient = new double[D];
for (int i = 0; i < D; i++) {
@@ -106,7 +109,7 @@ public class JavaHdfsLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 5a6afe7eae..0dc879275a 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -27,19 +27,24 @@ import org.apache.spark.util.Vector;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
/**
* K-means clustering using Java API.
*/
-public class JavaKMeans {
+public final class JavaKMeans {
+
+ private static final Pattern SPACE = Pattern.compile(" ");
/** Parses numbers split by whitespace to a vector */
static Vector parseVector(String line) {
- String[] splits = line.split(" ");
+ String[] splits = SPACE.split(line);
double[] data = new double[splits.length];
int i = 0;
- for (String s : splits)
- data[i] = Double.parseDouble(splits[i++]);
+ for (String s : splits) {
+ data[i] = Double.parseDouble(s);
+ i++;
+ }
return new Vector(data);
}
@@ -74,7 +79,7 @@ public class JavaKMeans {
System.exit(1);
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
String path = args[1];
int K = Integer.parseInt(args[2]);
double convergeDist = Double.parseDouble(args[3]);
@@ -82,7 +87,7 @@ public class JavaKMeans {
JavaRDD<Vector> data = sc.textFile(path).map(
new Function<String, Vector>() {
@Override
- public Vector call(String line) throws Exception {
+ public Vector call(String line) {
return parseVector(line);
}
}
@@ -96,7 +101,7 @@ public class JavaKMeans {
JavaPairRDD<Integer, Vector> closest = data.map(
new PairFunction<Vector, Integer, Vector>() {
@Override
- public Tuple2<Integer, Vector> call(Vector vector) throws Exception {
+ public Tuple2<Integer, Vector> call(Vector vector) {
return new Tuple2<Integer, Vector>(
closestPoint(vector, centroids), vector);
}
@@ -107,7 +112,8 @@ public class JavaKMeans {
JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey();
Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
new Function<List<Vector>, Vector>() {
- public Vector call(List<Vector> ps) throws Exception {
+ @Override
+ public Vector call(List<Vector> ps) {
return average(ps);
}
}).collectAsMap();
@@ -122,8 +128,9 @@ public class JavaKMeans {
} while (tempDist > convergeDist);
System.out.println("Final centers:");
- for (Vector c : centroids)
+ for (Vector c : centroids) {
System.out.println(c);
+ }
System.exit(0);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 152f029213..9eb1cadd71 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -35,9 +35,9 @@ import java.util.regex.Pattern;
/**
* Executes a roll up-style query against Apache logs.
*/
-public class JavaLogQuery {
+public final class JavaLogQuery {
- public static List<String> exampleApacheLogs = Lists.newArrayList(
+ public static final List<String> exampleApacheLogs = Lists.newArrayList(
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
"Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
@@ -51,14 +51,14 @@ public class JavaLogQuery {
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +
"0 73.23.2.15 images.com 1358492557 - Whatup");
- public static Pattern apacheLogRegex = Pattern.compile(
+ public static final Pattern apacheLogRegex = Pattern.compile(
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
/** Tracks the total query count and number of aggregate bytes for a particular group. */
public static class Stats implements Serializable {
- private int count;
- private int numBytes;
+ private final int count;
+ private final int numBytes;
public Stats(int count, int numBytes) {
this.count = count;
@@ -92,38 +92,38 @@ public class JavaLogQuery {
if (m.find()) {
int bytes = Integer.parseInt(m.group(7));
return new Stats(1, bytes);
- }
- else
+ } else {
return new Stats(1, 0);
+ }
}
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
- public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
+ public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
}
});
JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
@Override
- public Stats call(Stats stats, Stats stats2) throws Exception {
+ public Stats call(Stats stats, Stats stats2) {
return stats.merge(stats2);
}
});
List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
- for (Tuple2 t : output) {
+ for (Tuple2<?,?> t : output) {
System.out.println(t._1 + "\t" + t._2);
}
System.exit(0);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index c5603a639b..a84245b0c7 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -21,7 +21,6 @@ import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -29,6 +28,7 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.List;
import java.util.ArrayList;
+import java.util.regex.Pattern;
/**
* Computes the PageRank of URLs from an input file. Input file should
@@ -39,7 +39,9 @@ import java.util.ArrayList;
* ...
* where URL and their neighbors are separated by space(s).
*/
-public class JavaPageRank {
+public final class JavaPageRank {
+ private static final Pattern SPACES = Pattern.compile("\\s+");
+
private static class Sum extends Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
@@ -54,7 +56,7 @@ public class JavaPageRank {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
// Loads in input file. It should be in format of:
// URL neighbor URL
@@ -67,7 +69,7 @@ public class JavaPageRank {
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
- String[] parts = s.split("\\s+");
+ String[] parts = SPACES.split(s);
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
@@ -75,7 +77,7 @@ public class JavaPageRank {
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
- public Double call(List<String> rs) throws Exception {
+ public Double call(List<String> rs) {
return 1.0;
}
});
@@ -98,7 +100,7 @@ public class JavaPageRank {
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
@Override
- public Double call(Double sum) throws Exception {
+ public Double call(Double sum) {
return 0.15 + sum * 0.85;
}
});
@@ -106,7 +108,7 @@ public class JavaPageRank {
// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
- for (Tuple2 tuple : output) {
+ for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 4a2380caf5..3ec4a58d48 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -26,8 +26,7 @@ import java.util.ArrayList;
import java.util.List;
/** Computes an approximation to pi */
-public class JavaSparkPi {
-
+public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
@@ -36,26 +35,27 @@ public class JavaSparkPi {
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
- for (int i = 0; i < n; i++)
+ for (int i = 0; i < n; i++) {
l.add(i);
+ }
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
- public Integer call(Integer integer) throws Exception {
+ public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
+ public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 17f21f6b77..2ceb0fd94b 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -31,11 +31,11 @@ import java.util.Set;
/**
* Transitive closure on a graph, implemented in Java.
*/
-public class JavaTC {
+public final class JavaTC {
- static int numEdges = 200;
- static int numVertices = 100;
- static Random rand = new Random(42);
+ private static final int numEdges = 200;
+ private static final int numVertices = 100;
+ private static final Random rand = new Random(42);
static List<Tuple2<Integer, Integer>> generateGraph() {
Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
@@ -43,15 +43,18 @@ public class JavaTC {
int from = rand.nextInt(numVertices);
int to = rand.nextInt(numVertices);
Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
- if (from != to) edges.add(e);
+ if (from != to) {
+ edges.add(e);
+ }
}
return new ArrayList<Tuple2<Integer, Integer>>(edges);
}
static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
Integer, Integer> {
- static ProjectFn INSTANCE = new ProjectFn();
+ static final ProjectFn INSTANCE = new ProjectFn();
+ @Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
}
@@ -64,7 +67,7 @@ public class JavaTC {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
@@ -76,6 +79,7 @@ public class JavaTC {
// Because join() joins on keys, the edges are stored in reversed order.
JavaPairRDD<Integer, Integer> edges = tc.map(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ @Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
return new Tuple2<Integer, Integer>(e._2(), e._1());
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 07d32ad659..6651f98d56 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -27,8 +27,11 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
+import java.util.regex.Pattern;
+
+public final class JavaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
-public class JavaWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");
@@ -36,29 +39,32 @@ public class JavaWordCount {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
public Iterable<String> call(String s) {
- return Arrays.asList(s.split(" "));
+ return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
+ @Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
- for (Tuple2 tuple : output) {
+ for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1 + ": " + tuple._2);
}
System.exit(0);
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 628cb892b6..435a86e62a 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -25,30 +25,33 @@ import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
-import java.io.Serializable;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
import scala.Tuple2;
/**
* Example using MLLib ALS from Java.
*/
-public class JavaALS {
+public final class JavaALS {
static class ParseRating extends Function<String, Rating> {
+ private static final Pattern COMMA = Pattern.compile(",");
+
+ @Override
public Rating call(String line) {
- StringTokenizer tok = new StringTokenizer(line, ",");
- int x = Integer.parseInt(tok.nextToken());
- int y = Integer.parseInt(tok.nextToken());
- double rating = Double.parseDouble(tok.nextToken());
+ String[] tok = COMMA.split(line);
+ int x = Integer.parseInt(tok[0]);
+ int y = Integer.parseInt(tok[1]);
+ double rating = Double.parseDouble(tok[2]);
return new Rating(x, y, rating);
}
}
static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
+ @Override
public String call(Tuple2<Object, double[]> element) {
- return element._1().toString() + "," + Arrays.toString(element._2());
+ return element._1() + "," + Arrays.toString(element._2());
}
}
@@ -69,7 +72,7 @@ public class JavaALS {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<Rating> ratings = lines.map(new ParseRating());
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index cd59a139b9..4b2658f257 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -25,20 +25,22 @@ import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
/**
* Example using MLLib KMeans from Java.
*/
-public class JavaKMeans {
+public final class JavaKMeans {
static class ParsePoint extends Function<String, double[]> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public double[] call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- int numTokens = tok.countTokens();
- double[] point = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- point[i] = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(line);
+ double[] point = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ point[i] = Double.parseDouble(tok[i]);
}
return point;
}
@@ -62,7 +64,7 @@ public class JavaKMeans {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<double[]> points = lines.map(new ParsePoint());
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 258061c8e6..21586ce817 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -27,22 +27,25 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
/**
* Logistic regression based classification using ML Lib.
*/
-public class JavaLR {
+public final class JavaLR {
static class ParsePoint extends Function<String, LabeledPoint> {
+ private static final Pattern COMMA = Pattern.compile(",");
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public LabeledPoint call(String line) {
- String[] parts = line.split(",");
+ String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
- StringTokenizer tok = new StringTokenizer(parts[1], " ");
- int numTokens = tok.countTokens();
- double[] x = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- x[i] = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(parts[1]);
+ double[] x = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, x);
}
@@ -59,7 +62,7 @@ public class JavaLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 261813bf2f..b11cfa667e 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
* Produces a count of events received from Flume.
@@ -36,7 +37,10 @@ import org.apache.spark.streaming.dstream.SparkFlumeEvent;
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*/
-public class JavaFlumeEventCount {
+public final class JavaFlumeEventCount {
+ private JavaFlumeEventCount() {
+ }
+
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
@@ -49,10 +53,10 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
+ JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
+ JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
flumeStream.count();
@@ -63,6 +67,6 @@ public class JavaFlumeEventCount {
}
}).print();
- sc.start();
+ ssc.start();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 9a8e4209ed..16b8a948e6 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples;
import java.util.Map;
import java.util.HashMap;
+import java.util.regex.Pattern;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -29,6 +30,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
/**
@@ -41,11 +43,16 @@ import scala.Tuple2;
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
- * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
+ * `./bin/run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
* zoo03 my-consumer-group topic1,topic2 1`
*/
-public class JavaKafkaWordCount {
+public final class JavaKafkaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaKafkaWordCount() {
+ }
+
public static void main(String[] args) {
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
@@ -53,8 +60,9 @@ public class JavaKafkaWordCount {
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
- new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
+ new Duration(2000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
int numThreads = Integer.parseInt(args[4]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
@@ -63,11 +71,11 @@ public class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
+ JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
+ public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
@@ -75,24 +83,24 @@ public class JavaKafkaWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String s) throws Exception {
+ public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
- ssc.start();
+ jssc.start();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index def87c199b..1e2efd359c 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -27,6 +27,8 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import java.util.regex.Pattern;
+
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
* Usage: NetworkWordCount <master> <hostname> <port>
@@ -38,7 +40,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
* and then run the example
* `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
*/
-public class JavaNetworkWordCount {
+public final class JavaNetworkWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaNetworkWordCount() {
+ }
+
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
@@ -48,7 +55,8 @@ public class JavaNetworkWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ new Duration(1000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
@@ -56,18 +64,18 @@ public class JavaNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String s) throws Exception {
+ public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index c8c7389dd1..e05551ab83 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -31,8 +31,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-public class JavaQueueStream {
- public static void main(String[] args) throws InterruptedException {
+public final class JavaQueueStream {
+ private JavaQueueStream() {
+ }
+
+ public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaQueueStream <master>");
System.exit(1);
@@ -40,7 +43,7 @@ public class JavaQueueStream {
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
@@ -62,14 +65,14 @@ public class JavaQueueStream {
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
new PairFunction<Integer, Integer, Integer>() {
@Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ public Tuple2<Integer, Integer> call(Integer i) {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index a119980992..0097dade19 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -33,7 +33,7 @@ object BroadcastTest {
System.setProperty("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(args(0), "Broadcast Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
index 92eb96bd8e..b3eb611dd2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
@@ -27,7 +27,7 @@ object ExceptionHandlingTest {
}
val sc = new SparkContext(args(0), "ExceptionHandlingTest",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
if (math.random > 0.75)
throw new Exception("Testing exception handling")
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
index 42c2e0e8e1..39752fdd0e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
@@ -34,7 +34,7 @@ object GroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
index efe2e93b0d..65d67356be 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat
object HBaseTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HBaseTest",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val conf = HBaseConfiguration.create()
diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
index d6a88d3032..c3597d94a2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
@@ -22,7 +22,7 @@ import org.apache.spark._
object HdfsTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HdfsTest",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val file = sc.textFile(args(1))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 17ff3ce764..bddb54b39c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -45,7 +45,7 @@ object LogQuery {
}
val sc = new SparkContext(args(0), "Log Query",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val dataSet =
if (args.length == 2) sc.textFile(args(1))
diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
index e1afc29f9a..4aef04fc06 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
@@ -28,7 +28,7 @@ object MultiBroadcastTest {
}
val sc = new SparkContext(args(0), "Multi-Broadcast Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
index 37ddfb5db7..73b0e216ca 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
@@ -36,7 +36,7 @@ object SimpleSkewedGroupByTest {
var ratio = if (args.length > 5) args(5).toInt else 5.0
val sc = new SparkContext(args(0), "GroupBy Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
index 9c954b2b5b..31c6d108f3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
@@ -34,7 +34,7 @@ object SkewedGroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index 814944ba1c..30c86d83e6 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -112,7 +112,7 @@ object SparkALS {
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
val sc = new SparkContext(host, "SparkALS",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val R = generateR()
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 86dd9ca1b3..ff72532db1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -54,7 +54,7 @@ object SparkHdfsLR {
val inputPath = args(1)
val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
val lines = sc.textFile(inputPath)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index bc2db39c12..8c99025eaa 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -55,7 +55,7 @@ object SparkKMeans {
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLocalKMeans",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
index 9ed9fe4d76..c54a55bdb4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
@@ -49,7 +49,7 @@ object SparkLR {
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLR",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val numSlices = if (args.length > 1) args(1).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
index a508c0df57..d203f4d20e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
@@ -38,7 +38,7 @@ object SparkPageRank {
}
var iters = args(2).toInt
val ctx = new SparkContext(args(0), "PageRank",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = ctx.textFile(args(1), 1)
val links = lines.map{ s =>
val parts = s.split("\\s+")
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
index a689e5a360..e5a09ecec0 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
@@ -29,7 +29,7 @@ object SparkPi {
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkPi",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
index 8543ce0e32..24e8afa26b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
@@ -46,7 +46,7 @@ object SparkTC {
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkTC",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
var tc = spark.parallelize(generateGraph, slices).cache()
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
index 72b5c7b88e..4c0de46964 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala
@@ -36,16 +36,18 @@ object WikipediaPageRank {
System.err.println("Usage: WikipediaPageRank <inputFile> <threshold> <numPartitions> <host> <usePartitioner>")
System.exit(-1)
}
-
- System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
val inputFile = args(0)
val threshold = args(1).toDouble
val numPartitions = args(2).toInt
val host = args(3)
val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRank")
+
+ sparkConf.setMaster(host).setAppName("WikipediaPageRank")
+ val sc = new SparkContext(sparkConf)
// Parse the Wikipedia page data into a graph
val input = sc.textFile(inputFile)
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index ddf6855325..2cf273a702 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -34,15 +34,19 @@ object WikipediaPageRankStandalone {
System.err.println("Usage: WikipediaPageRankStandalone <inputFile> <threshold> <numIterations> <host> <usePartitioner>")
System.exit(-1)
}
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.serializer", "spark.bagel.examples.WPRSerializer")
- System.setProperty("spark.serializer", "spark.bagel.examples.WPRSerializer")
val inputFile = args(0)
val threshold = args(1).toDouble
val numIterations = args(2).toInt
val host = args(3)
val usePartitioner = args(4).toBoolean
- val sc = new SparkContext(host, "WikipediaPageRankStandalone")
+
+ sparkConf.setMaster(host).setAppName("WikipediaPageRankStandalone")
+
+ val sc = new SparkContext(sparkConf)
val input = sc.textFile(inputFile)
val partitioner = new HashPartitioner(sc.defaultParallelism)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index af52b7e9a1..4e0058cd70 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -19,12 +19,14 @@ package org.apache.spark.streaming.examples
import scala.collection.mutable.LinkedList
import scala.util.Random
+import scala.reflect.ClassTag
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
+import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
@@ -82,10 +84,10 @@ class FeederActor extends Actor {
*
* @see [[org.apache.spark.streaming.examples.FeederActor]]
*/
-class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends Actor with Receiver {
- lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+ lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
@@ -115,7 +117,7 @@ object FeederActor {
val Seq(host, port) = args.toSeq
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = new SparkConf)._1
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
@@ -132,9 +134,9 @@ object FeederActor {
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
- * `$ ./run-example spark.streaming.examples.FeederActor 127.0.1.1 9999`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999`
* and then run the example
- * `$ ./run-example spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
@@ -149,7 +151,7 @@ object ActorWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
/*
* Following is the use of actorStream to plug in custom actor as receiver
@@ -164,7 +166,7 @@ object ActorWordCount {
*/
val lines = ssc.actorStream[String](
- Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
+ Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
//compute wordcount
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index 9f6e163454..ae3709b3d9 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
/**
* Produces a count of events received from Flume.
@@ -48,10 +49,10 @@ object FlumeEventCount {
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a flume stream
- val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
+ val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index bc8564b3ba..ea6ea67419 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -28,7 +28,7 @@ import org.apache.spark.streaming.StreamingContext._
* <directory> is the directory that Spark Streaming will use to find and read new text files.
*
* To run this on your local machine on directory `localdir`, run this example
- * `$ ./run-example spark.streaming.examples.HdfsWordCount local[2] localdir`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.HdfsWordCount local[2] localdir`
* Then create a text file in `localdir` and the words in the file will get counted.
*/
object HdfsWordCount {
@@ -40,7 +40,7 @@ object HdfsWordCount {
// Create the context
val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 570ba4c81a..31a94bd224 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -24,6 +24,7 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.util.RawTextHelper._
+import org.apache.spark.streaming.kafka._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
@@ -35,7 +36,7 @@ import org.apache.spark.streaming.util.RawTextHelper._
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
- * `./run-example spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
+ * `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount {
def main(args: Array[String]) {
@@ -48,11 +49,11 @@ object KafkaWordCount {
val Array(master, zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2)
+ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l))
.reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index ff332a0282..325290b66f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -17,11 +17,6 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.streaming.{ Seconds, StreamingContext }
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.MQTTReceiver
-import org.apache.spark.storage.StorageLevel
-
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.mqtt._
+
/**
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes
* Space separated String Message "hello mqtt demo for spark streaming"
@@ -79,9 +79,9 @@ object MQTTPublisher {
* <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
*
* To run this example locally, you may run publisher as
- * `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
* and run the example as
- * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
*/
object MQTTWordCount {
@@ -96,8 +96,8 @@ object MQTTWordCount {
val Seq(master, brokerUrl, topic) = args.toSeq
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
- Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
+ StreamingContext.jarOfClass(this.getClass))
+ val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.toString.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index e2487dca5f..6a32c75373 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run-example spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
@@ -41,7 +41,7 @@ object NetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
index fad512eeba..9d640e716b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
@@ -33,7 +33,7 @@ object QueueStream {
// Create the context
val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index 0b45c30d20..c0706d0724 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -49,7 +49,7 @@ object RawNetworkGrep {
// Create the context
val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Warm up the JVMs on master and slave for JIT compilation to kick in
RawTextHelper.warmUp(ssc.sparkContext)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index cb30c4edb3..002db57d59 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run-example spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
@@ -49,7 +49,7 @@ object StatefulNetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 35b6329ab3..3ccdc908e2 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -23,6 +23,8 @@ import com.twitter.algebird._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
+
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
@@ -33,7 +35,7 @@ import org.apache.spark.SparkContext._
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
@@ -60,8 +62,8 @@ object TwitterAlgebirdCMS {
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 8bfde2a829..c7e83e76b0 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
-import org.apache.spark.streaming.dstream.TwitterInputDStream
+import org.apache.spark.streaming.twitter._
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
@@ -49,8 +49,8 @@ object TwitterAlgebirdHLL {
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 27aa6b14bf..e2b0418d55 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
@@ -38,8 +39,8 @@ object TwitterPopularTags {
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val stream = ssc.twitterStream(None, filters)
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
+ val stream = TwitterUtils.createStream(ssc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index c8743b9e25..03902ec353 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -20,9 +20,12 @@ package org.apache.spark.streaming.examples
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
-import org.apache.spark.streaming.{ Seconds, StreamingContext }
-import org.apache.spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
+import akka.util.ByteString
+
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.zeromq._
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
@@ -40,10 +43,11 @@ object SimpleZeroMQPublisher {
val acs: ActorSystem = ActorSystem()
val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
- val messages: Array[String] = Array("words ", "may ", "count ")
+ implicit def stringToByteString(x: String) = ByteString(x)
+ val messages: List[ByteString] = List("words ", "may ", "count ")
while (true) {
Thread.sleep(1000)
- pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
+ pubSocket ! ZMQMessage(ByteString(topic) :: messages)
}
acs.awaitTermination()
}
@@ -60,9 +64,9 @@ object SimpleZeroMQPublisher {
* <zeroMQurl> and <topic> describe where zeroMq publisher is running.
*
* To run this example locally, you may run publisher as
- * `$ ./run-example spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and run the example as
- * `$ ./run-example spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
+ * `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo`
*/
object ZeroMQWordCount {
def main(args: Array[String]) {
@@ -76,16 +80,15 @@ object ZeroMQWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+ def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
//For this stream, a zeroMQ publisher should be running.
- val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+ val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
-
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index de70c50473..4fe57de4a4 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -39,8 +39,8 @@ object PageView extends Serializable {
/** Generates streaming events to simulate page views on a website.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*
* When running this, you may want to set the root logging level to ERROR in
* conf/log4j.properties to reduce the verbosity of the output.
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index 8282cc9269..807af199f4 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -25,8 +25,8 @@ import org.apache.spark.SparkContext._
* operators available in Spark streaming.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10
- * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10
+ * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*/
object PageViewStream {
def main(args: Array[String]) {
@@ -42,7 +42,7 @@ object PageViewStream {
// Create the context
val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.socketTextStream(host, port)