diff options
author | Sean Owen <sowen@cloudera.com> | 2014-10-01 11:28:22 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-10-01 11:28:22 -0700 |
commit | dcb2f73f1cf1f6efd5175267e135ad6cf4bf6e3d (patch) | |
tree | 3722ed4f7abbff0587f584f827e98cc10ead4897 /examples | |
parent | abf588f47a26d0066f0b75d52b200a87bb085064 (diff) | |
download | spark-dcb2f73f1cf1f6efd5175267e135ad6cf4bf6e3d.tar.gz spark-dcb2f73f1cf1f6efd5175267e135ad6cf4bf6e3d.tar.bz2 spark-dcb2f73f1cf1f6efd5175267e135ad6cf4bf6e3d.zip |
SPARK-2626 [DOCS] Stop SparkContext in all examples
Call SparkContext.stop() in all examples (and touch up minor nearby code style issues while at it)
Author: Sean Owen <sowen@cloudera.com>
Closes #2575 from srowen/SPARK-2626 and squashes the following commits:
5b2baae [Sean Owen] Call SparkContext.stop() in all examples (and touch up minor nearby code style issues while at it)
Diffstat (limited to 'examples')
11 files changed, 34 insertions, 11 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 11157d7573..0f07cb4098 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -31,7 +31,6 @@ import java.util.List; * Usage: JavaSparkPi [slices] */ public final class JavaSparkPi { - public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); @@ -61,5 +60,7 @@ public final class JavaSparkPi { }); System.out.println("Pi is roughly " + 4.0 * count / n); + + jsc.stop(); } } diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index 898297dc65..01c77bd443 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -61,7 +61,8 @@ public class JavaSparkSQL { // Load a text file and convert each line to a Java Bean. JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { - public Person call(String line) throws Exception { + @Override + public Person call(String line) { String[] parts = line.split(","); Person person = new Person(); @@ -82,6 +83,7 @@ public class JavaSparkSQL { // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List<String> teenagerNames = teenagers.map(new Function<Row, String>() { + @Override public String call(Row row) { return "Name: " + row.getString(0); } @@ -104,6 +106,7 @@ public class JavaSparkSQL { JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); teenagerNames = teenagers2.map(new Function<Row, String>() { + @Override public String call(Row row) { return "Name: " + row.getString(0); } @@ -136,6 +139,7 @@ public class JavaSparkSQL { // The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. teenagerNames = teenagers3.map(new Function<Row, String>() { + @Override public String call(Row row) { return "Name: " + row.getString(0); } }).collect(); for (String name: teenagerNames) { @@ -162,6 +166,7 @@ public class JavaSparkSQL { JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2"); List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() { + @Override public String call(Row row) { return "Name: " + row.getString(0) + ", City: " + row.getString(1); } @@ -169,5 +174,7 @@ public class JavaSparkSQL { for (String name: nameAndCity) { System.out.println(name); } + + ctx.stop(); } } diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py index cfda8d8327..4626bbb7e3 100644 --- a/examples/src/main/python/avro_inputformat.py +++ b/examples/src/main/python/avro_inputformat.py @@ -78,3 +78,5 @@ if __name__ == "__main__": output = avro_rdd.map(lambda x: x[0]).collect() for k in output: print k + + sc.stop() diff --git a/examples/src/main/python/parquet_inputformat.py b/examples/src/main/python/parquet_inputformat.py index c9b08f878a..fa4c20ab20 100644 --- a/examples/src/main/python/parquet_inputformat.py +++ b/examples/src/main/python/parquet_inputformat.py @@ -57,3 +57,5 @@ if __name__ == "__main__": output = parquet_rdd.map(lambda x: x[1]).collect() for k in output: print k + + sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 71f53af68f..11d5c92c59 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -136,5 +136,7 @@ object CassandraCQLTest { classOf[CqlOutputFormat], job.getConfiguration() ) + + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index 91ba364a34..ec689474ae 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -126,6 +126,8 @@ object CassandraTest { } }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]], classOf[ColumnFamilyOutputFormat], job.getConfiguration) + + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index efd91bb054..15f6678648 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -44,11 +44,11 @@ object GroupByTest { arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) } arr1 - }.cache + }.cache() // Enforce that everything has been calculated and in cache - pairs1.count + pairs1.count() - println(pairs1.groupByKey(numReducers).count) + println(pairs1.groupByKey(numReducers).count()) sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index 4c655b84fd..74620ad007 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -79,5 +79,7 @@ object LogQuery { .reduceByKey((a, b) => a.merge(b)) .collect().foreach{ case (user, query) => println("%s\t%s".format(user, query))} + + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index 235c3bf820..e4db3ec513 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -21,7 +21,6 @@ import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.bagel._ -import org.apache.spark.bagel.Bagel._ import scala.xml.{XML,NodeSeq} @@ -78,9 +77,9 @@ object WikipediaPageRank { (id, new PRVertex(1.0 / numVertices, outEdges)) }) if (usePartitioner) { - vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache + vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache() } else { - vertices = vertices.cache + vertices = vertices.cache() } println("Done parsing input file.") @@ -100,7 +99,9 @@ object WikipediaPageRank { (result .filter { case (id, vertex) => vertex.value >= threshold } .map { case (id, vertex) => "%s\t%s\n".format(id, vertex.value) } - .collect.mkString) + .collect().mkString) println(top) + + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index d56d64c564..2e98b2dc30 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -51,7 +51,7 @@ object RDDRelation { val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10") println("Result of RDD.map:") - rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println) + rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println) // Queries can also be written using a LINQ-like Scala DSL. rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println) @@ -68,5 +68,7 @@ object RDDRelation { // These files can also be registered as tables. parquetFile.registerTempTable("parquetFile") sql("SELECT * FROM parquetFile").collect().foreach(println) + + sc.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 3423fac0ad..e26f213e8a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -39,7 +39,7 @@ object HiveFromSpark { // Queries are expressed in HiveQL println("Result of 'SELECT *': ") - sql("SELECT * FROM src").collect.foreach(println) + sql("SELECT * FROM src").collect().foreach(println) // Aggregation queries are also supported. val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0) @@ -61,5 +61,7 @@ object HiveFromSpark { // Queries can then join RDD data with data stored in Hive. println("Result of SELECT *:") sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println) + + sc.stop() } } |