From e55cc4bae52a3de728939244780abc662713b768 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 10 Apr 2014 00:37:21 -0700 Subject: SPARK-1446: Spark examples should not do a System.exit Spark examples should exit nice using SparkContext.stop() method, rather than System.exit System.exit can cause issues like in SPARK-1407 Author: Sandeep Closes #370 from techaddict/1446 and squashes the following commits: e9234cf [Sandeep] SPARK-1446: Spark examples should not do a System.exit Spark examples should exit nice using SparkContext.stop() method, rather than System.exit System.exit can cause issues like in SPARK-1407 --- .../java/org/apache/spark/examples/JavaHdfsLR.java | 2 +- .../java/org/apache/spark/examples/JavaLogQuery.java | 2 +- .../java/org/apache/spark/examples/JavaPageRank.java | 2 +- .../main/java/org/apache/spark/examples/JavaTC.java | 2 +- .../org/apache/spark/examples/JavaWordCount.java | 6 +++--- .../org/apache/spark/mllib/examples/JavaALS.java | 2 +- .../org/apache/spark/mllib/examples/JavaKMeans.java | 2 +- .../java/org/apache/spark/mllib/examples/JavaLR.java | 2 +- .../org/apache/spark/examples/BroadcastTest.scala | 2 +- .../org/apache/spark/examples/CassandraCQLTest.scala | 2 +- .../spark/examples/ExceptionHandlingTest.scala | 2 +- .../org/apache/spark/examples/GroupByTest.scala | 5 ++--- .../scala/org/apache/spark/examples/HBaseTest.scala | 6 +++--- .../scala/org/apache/spark/examples/HdfsTest.scala | 2 +- .../scala/org/apache/spark/examples/LocalALS.scala | 2 +- .../apache/spark/examples/MultiBroadcastTest.scala | 2 +- .../spark/examples/SimpleSkewedGroupByTest.scala | 7 +++---- .../apache/spark/examples/SkewedGroupByTest.scala | 7 +++---- .../scala/org/apache/spark/examples/SparkALS.scala | 4 ++-- .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- .../org/apache/spark/examples/SparkKMeans.scala | 20 ++++++++++---------- .../scala/org/apache/spark/examples/SparkLR.scala | 2 +- .../org/apache/spark/examples/SparkPageRank.scala | 3 +-- .../scala/org/apache/spark/examples/SparkTC.scala | 2 +- .../apache/spark/examples/SparkTachyonHdfsLR.scala | 2 +- .../org/apache/spark/examples/SparkTachyonPi.scala | 6 +++--- .../examples/bagel/WikipediaPageRankStandalone.scala | 2 +- .../apache/spark/examples/mllib/TallSkinnyPCA.scala | 2 +- .../apache/spark/examples/mllib/TallSkinnySVD.scala | 2 +- .../spark/streaming/examples/HdfsWordCount.scala | 1 - .../spark/streaming/examples/KafkaWordCount.scala | 5 ++--- .../spark/streaming/examples/MQTTWordCount.scala | 2 +- .../spark/streaming/examples/QueueStream.scala | 11 +++++------ .../examples/StatefulNetworkWordCount.scala | 2 +- .../spark/streaming/examples/ZeroMQWordCount.scala | 2 +- 35 files changed, 60 insertions(+), 67 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index 6b49244ba4..bd96274021 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -138,6 +138,6 @@ public final class JavaHdfsLR { System.out.print("Final w: "); printWeights(w); - System.exit(0); + sc.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 617e4a6d04..2a4278d3c3 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -126,6 +126,6 @@ public final class JavaLogQuery { for (Tuple2 t : output) { System.out.println(t._1() + "\t" + t._2()); } - System.exit(0); + jsc.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index 8513ba07e7..e31f676f5f 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -117,6 +117,6 @@ public final class JavaPageRank { System.out.println(tuple._1() + " has rank: " + tuple._2() + "."); } - System.exit(0); + ctx.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index 6cfe25c80e..1d776940f0 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -96,6 +96,6 @@ public final class JavaTC { } while (nextCount != oldCount); System.out.println("TC has " + tc.count() + " edges."); - System.exit(0); + sc.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 3ae1d8f7ca..87c1b80981 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -48,14 +48,14 @@ public final class JavaWordCount { return Arrays.asList(SPACE.split(s)); } }); - + JavaPairRDD ones = words.mapToPair(new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }); - + JavaPairRDD counts = ones.reduceByKey(new Function2() { @Override public Integer call(Integer i1, Integer i2) { @@ -67,6 +67,6 @@ public final class JavaWordCount { for (Tuple2 tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } - System.exit(0); + ctx.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java index 64a3a04fb7..c516199d61 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java @@ -85,6 +85,6 @@ public final class JavaALS { outputDir + "/productFeatures"); System.out.println("Final user/product features written to " + outputDir); - System.exit(0); + sc.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java index 7b0ec36424..7461609ab9 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java @@ -79,6 +79,6 @@ public final class JavaKMeans { double cost = model.computeCost(points.rdd()); System.out.println("Cost: " + cost); - System.exit(0); + sc.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java index cd8879ff88..e3ab87cc72 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java @@ -77,6 +77,6 @@ public final class JavaLR { System.out.print("Final w: " + model.weights()); - System.exit(0); + sc.stop(); } } diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 4d2f45df85..c8c916bb45 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -56,6 +56,6 @@ object BroadcastTest { println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) } - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index ee283ce6ab..1f8d7cb599 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -58,7 +58,7 @@ import org.apache.spark.SparkContext._ prod_id, quantity) VALUES ('charlie', 1385983649000, 'iphone', 2); */ - + /** * This example demonstrates how to read and write to cassandra column family created using CQL3 * using Spark. diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala index fdb976dfc6..be7d39549a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala @@ -34,6 +34,6 @@ object ExceptionHandlingTest { } } - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index 36534e5935..29114c6dab 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -28,7 +28,7 @@ object GroupByTest { "Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) } - + var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 @@ -52,7 +52,6 @@ object GroupByTest { println(pairs1.groupByKey(numReducers).count) - System.exit(0) + sc.stop() } } - diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index 65d67356be..700121d16d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -30,7 +30,7 @@ object HBaseTest { val conf = HBaseConfiguration.create() - // Other options for configuring scan behavior are available. More information available at + // Other options for configuring scan behavior are available. More information available at // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html conf.set(TableInputFormat.INPUT_TABLE, args(1)) @@ -41,12 +41,12 @@ object HBaseTest { admin.createTable(tableDesc) } - val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], + val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala index c3597d94a2..dd6d520513 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala @@ -32,6 +32,6 @@ object HdfsTest { val end = System.currentTimeMillis() println("Iteration " + iter + " took " + (end-start) + " ms") } - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 0095cb8425..37ad4bd099 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -120,7 +120,7 @@ object LocalALS { } } printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) - + val R = generateR() // Initialize m and u randomly diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index 4aef04fc06..97321ab8f4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -51,6 +51,6 @@ object MultiBroadcastTest { // Collect the small RDD so we can print the observed sizes locally. observedSizes.collect().foreach(i => println(i)) - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index 1fdb324b89..d05eedd31c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -27,7 +27,7 @@ object SimpleSkewedGroupByTest { System.err.println("Usage: SimpleSkewedGroupByTest " + "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]") System.exit(1) - } + } var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 @@ -58,14 +58,13 @@ object SimpleSkewedGroupByTest { }.cache // Enforce that everything has been calculated and in cache pairs1.count - + println("RESULT: " + pairs1.groupByKey(numReducers).count) // Print how many keys each reducer got (for debugging) // println("RESULT: " + pairs1.groupByKey(numReducers) // .map{case (k,v) => (k, v.size)} // .collectAsMap) - System.exit(0) + sc.stop() } } - diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index 966478fe4a..fd9f043247 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -27,7 +27,7 @@ object SkewedGroupByTest { System.err.println( "Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) - } + } var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 @@ -53,10 +53,9 @@ object SkewedGroupByTest { }.cache() // Enforce that everything has been calculated and in cache pairs1.count() - + println(pairs1.groupByKey(numReducers).count()) - System.exit(0) + sc.stop() } } - diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index f59ab7e7cc..68f151a2c4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -112,7 +112,7 @@ object SparkALS { val sc = new SparkContext(host, "SparkALS", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) - + val R = generateR() // Initialize m and u randomly @@ -137,6 +137,6 @@ object SparkALS { println() } - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 038afbcba8..d8de8745c1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -52,7 +52,7 @@ object SparkHdfsLR { val inputPath = args(1) val conf = SparkHadoopUtil.get.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", - System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(), + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(), InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) )) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 9fe2465235..1a8b21618e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -28,16 +28,16 @@ import org.apache.spark.SparkContext._ object SparkKMeans { val R = 1000 // Scaling factor val rand = new Random(42) - + def parseVector(line: String): Vector = { new Vector(line.split(' ').map(_.toDouble)) } - + def closestPoint(p: Vector, centers: Array[Vector]): Int = { var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity - + for (i <- 0 until centers.length) { val tempDist = p.squaredDist(centers(i)) if (tempDist < closest) { @@ -45,7 +45,7 @@ object SparkKMeans { bestIndex = i } } - + bestIndex } @@ -60,22 +60,22 @@ object SparkKMeans { val data = lines.map(parseVector _).cache() val K = args(2).toInt val convergeDist = args(3).toDouble - + val kPoints = data.takeSample(withReplacement = false, K, 42).toArray var tempDist = 1.0 while(tempDist > convergeDist) { val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - + val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)} - + val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap() - + tempDist = 0.0 for (i <- 0 until K) { tempDist += kPoints(i).squaredDist(newPoints(i)) } - + for (newP <- newPoints) { kPoints(newP._1) = newP._2 } @@ -84,6 +84,6 @@ object SparkKMeans { println("Final centers:") kPoints.foreach(println) - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index c54a55bdb4..3a2699d4d9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -66,6 +66,6 @@ object SparkLR { } println("Final w: " + w) - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala index d203f4d20e..45b6e10f3e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala @@ -57,7 +57,6 @@ object SparkPageRank { val output = ranks.collect() output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + ".")) - System.exit(0) + ctx.stop() } } - diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index 24e8afa26b..eb47cf027c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -70,6 +70,6 @@ object SparkTC { } while (nextCount != oldCount) println("TC has " + tc.count() + " edges.") - System.exit(0) + spark.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala index 53b303d658..5698d47464 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -75,6 +75,6 @@ object SparkTachyonHdfsLR { } println("Final w: " + w) - System.exit(0) + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala index ce78f0876e..2b207fd8d3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala @@ -34,10 +34,10 @@ object SparkTachyonPi { } val spark = new SparkContext(args(0), "SparkTachyonPi", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) - + val slices = if (args.length > 1) args(1).toInt else 2 val n = 100000 * slices - + val rdd = spark.parallelize(1 to n, slices) rdd.persist(StorageLevel.OFF_HEAP) val count = rdd.map { i => @@ -46,7 +46,7 @@ object SparkTachyonPi { if (x * x + y * y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) - + spark.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala index 7aac6a1359..dee3cb6c0a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -79,7 +79,7 @@ object WikipediaPageRankStandalone { val time = (System.currentTimeMillis - startTime) / 1000.0 println("Completed %d iterations in %f seconds: %f seconds per iteration" .format(numIterations, time, time / numIterations)) - System.exit(0) + sc.stop() } def parseArticle(line: String): (String, Array[String]) = { diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala index a177435e60..61b9655cd3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.examples.mllib - + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.Vectors diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala index 49d09692c8..9aeebf58ea 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.examples.mllib - + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg.Vectors diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala index 954bcc9b6e..1c0ce3111e 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala @@ -53,4 +53,3 @@ object HdfsWordCount { ssc.awaitTermination() } } - diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 6bccd1d884..cca0be2cbb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -61,7 +61,7 @@ object KafkaWordCount { val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() - + ssc.start() ssc.awaitTermination() } @@ -83,7 +83,7 @@ object KafkaWordCountProducer { val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") - + val config = new ProducerConfig(props) val producer = new Producer[String, String](config) @@ -102,4 +102,3 @@ object KafkaWordCountProducer { } } - diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 0a68ac84c2..656222e0c1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -26,7 +26,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.mqtt._ /** - * A simple Mqtt publisher for demonstration purposes, repeatedly publishes + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" */ object MQTTPublisher { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala index 4d4968ba6a..612ecf7b78 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala @@ -24,7 +24,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ object QueueStream { - + def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: QueueStream ") @@ -37,23 +37,22 @@ object QueueStream { val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - // Create the queue through which RDDs can be pushed to + // Create the queue through which RDDs can be pushed to // a QueueInputDStream val rddQueue = new SynchronizedQueue[RDD[Int]]() - + // Create the QueueInputDStream and use it do some processing val inputStream = ssc.queueStream(rddQueue) val mappedStream = inputStream.map(x => (x % 10, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) - reducedStream.print() + reducedStream.print() ssc.start() - + // Create and push some RDDs into for (i <- 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) Thread.sleep(1000) } ssc.stop() - System.exit(0) } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala index c2d84a8e08..14f65a2f8d 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -58,7 +58,7 @@ object StatefulNetworkWordCount { ssc.checkpoint(".") // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(1), args(2).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 35f8f885f8..445d202858 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -60,7 +60,7 @@ object SimpleZeroMQPublisher { * To work with zeroMQ, some native libraries have to be installed. * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] * (http://www.zeromq.org/intro:get-the-software) - * + * * Usage: ZeroMQWordCount * In local mode, should be 'local[n]' with n > 1 * and describe where zeroMq publisher is running. -- cgit v1.2.3