aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-05 19:25:09 -0800
committerReynold Xin <rxin@apache.org>2014-01-05 19:25:09 -0800
commit5b0986a1d675f0f9d7d14b3d48fdadcb4f7055b7 (patch)
tree76adbc116b690647a42372633e2b501e4fa8bec0
parentf4b924f6623525d0f9287039b963fd5f7eaf1a7d (diff)
parent79f52809c836d08023aa5ca99a467d3a311a7359 (diff)
downloadspark-5b0986a1d675f0f9d7d14b3d48fdadcb4f7055b7.tar.gz
spark-5b0986a1d675f0f9d7d14b3d48fdadcb4f7055b7.tar.bz2
spark-5b0986a1d675f0f9d7d14b3d48fdadcb4f7055b7.zip
Merge pull request #334 from pwendell/examples-fix
Removing SPARK_EXAMPLES_JAR in the code This re-writes all of the examples to use the `SparkContext.jarOfClass` mechanism for loading the examples jar. This necessary for environments like YARN and the Standalone mode where example programs will be submit from inside the cluster rather than at the client using `./spark-example`. This still leaves SPARK_EXAMPLES_JAR in place in the shell scripts for setting up the classpath if `./spark-example` is run.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaKMeans.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java3
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaTC.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java2
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java2
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java3
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java3
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java3
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LogQuery.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkALS.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPi.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkTC.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala25
47 files changed, 75 insertions, 54 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 5be5317f40..e93b10fd7e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -431,4 +431,10 @@ object JavaSparkContext {
implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
+
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index be0d38589c..12f3355bc4 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -106,7 +106,7 @@ public class JavaHdfsLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 5a6afe7eae..63465a3bbf 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -74,7 +74,7 @@ public class JavaKMeans {
System.exit(1);
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
String path = args[1];
int K = Integer.parseInt(args[2]);
double convergeDist = Double.parseDouble(args[3]);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 407cd7ccfa..74e4d9291a 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -104,7 +104,7 @@ public class JavaLogQuery {
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 89aed8f279..f774f6a04e 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,6 +17,7 @@
package org.apache.spark.examples;
+import org.apache.spark.SparkContext;
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -53,7 +54,7 @@ public class JavaPageRank {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
// Loads in input file. It should be in format of:
// URL neighbor URL
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 4a2380caf5..5558ab7c03 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -36,7 +36,7 @@ public class JavaSparkPi {
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int n = 100000 * slices;
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 17f21f6b77..99e6ba347c 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -64,7 +64,7 @@ public class JavaTC {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index bd6383e13d..8a071caf13 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -36,7 +36,7 @@ public class JavaWordCount {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 45a0d237da..5e1a77baaa 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -68,7 +68,7 @@ public class JavaALS {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<Rating> ratings = lines.map(new ParseRating());
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index cd59a139b9..1f12f518a0 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -62,7 +62,7 @@ public class JavaKMeans {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<double[]> points = lines.map(new ParsePoint());
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 258061c8e6..593e4df111 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -59,7 +59,7 @@ public class JavaLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 261813bf2f..64ac72474b 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -50,7 +50,8 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 75b588e4b8..0a56e7abdf 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -54,7 +54,8 @@ public class JavaKafkaWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount",
- new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ new Duration(2000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
int numThreads = Integer.parseInt(args[4]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index def87c199b..ec6f6a8c56 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -48,7 +48,8 @@ public class JavaNetworkWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ new Duration(1000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index c8c7389dd1..4b9fd52713 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -40,7 +40,7 @@ public class JavaQueueStream {
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index a119980992..0097dade19 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -33,7 +33,7 @@ object BroadcastTest {
System.setProperty("spark.broadcast.blockSize", blockSize)
val sc = new SparkContext(args(0), "Broadcast Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
index 92eb96bd8e..b3eb611dd2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala
@@ -27,7 +27,7 @@ object ExceptionHandlingTest {
}
val sc = new SparkContext(args(0), "ExceptionHandlingTest",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
if (math.random > 0.75)
throw new Exception("Testing exception handling")
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
index 42c2e0e8e1..39752fdd0e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
@@ -34,7 +34,7 @@ object GroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
index efe2e93b0d..65d67356be 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat
object HBaseTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HBaseTest",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val conf = HBaseConfiguration.create()
diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
index d6a88d3032..c3597d94a2 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
@@ -22,7 +22,7 @@ import org.apache.spark._
object HdfsTest {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "HdfsTest",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val file = sc.textFile(args(1))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 17ff3ce764..bddb54b39c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -45,7 +45,7 @@ object LogQuery {
}
val sc = new SparkContext(args(0), "Log Query",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val dataSet =
if (args.length == 2) sc.textFile(args(1))
diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
index e1afc29f9a..4aef04fc06 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
@@ -28,7 +28,7 @@ object MultiBroadcastTest {
}
val sc = new SparkContext(args(0), "Multi-Broadcast Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
index 37ddfb5db7..73b0e216ca 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
@@ -36,7 +36,7 @@ object SimpleSkewedGroupByTest {
var ratio = if (args.length > 5) args(5).toInt else 5.0
val sc = new SparkContext(args(0), "GroupBy Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
index 9c954b2b5b..31c6d108f3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
@@ -34,7 +34,7 @@ object SkewedGroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
val sc = new SparkContext(args(0), "GroupBy Test",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index 814944ba1c..30c86d83e6 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -112,7 +112,7 @@ object SparkALS {
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
val sc = new SparkContext(host, "SparkALS",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val R = generateR()
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 86dd9ca1b3..ff72532db1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -54,7 +54,7 @@ object SparkHdfsLR {
val inputPath = args(1)
val conf = SparkHadoopUtil.get.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(),
InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
val lines = sc.textFile(inputPath)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index bc2db39c12..8c99025eaa 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -55,7 +55,7 @@ object SparkKMeans {
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLocalKMeans",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
index 9ed9fe4d76..c54a55bdb4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala
@@ -49,7 +49,7 @@ object SparkLR {
System.exit(1)
}
val sc = new SparkContext(args(0), "SparkLR",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val numSlices = if (args.length > 1) args(1).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
index a508c0df57..d203f4d20e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
@@ -38,7 +38,7 @@ object SparkPageRank {
}
var iters = args(2).toInt
val ctx = new SparkContext(args(0), "PageRank",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val lines = ctx.textFile(args(1), 1)
val links = lines.map{ s =>
val parts = s.split("\\s+")
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
index a689e5a360..e5a09ecec0 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
@@ -29,7 +29,7 @@ object SparkPi {
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkPi",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
index 8543ce0e32..24e8afa26b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
@@ -46,7 +46,7 @@ object SparkTC {
System.exit(1)
}
val spark = new SparkContext(args(0), "SparkTC",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
val slices = if (args.length > 1) args(1).toInt else 2
var tc = spark.parallelize(generateGraph, slices).cache()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 3641517934..546495357f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -151,7 +151,7 @@ object ActorWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
/*
* Following is the use of actorStream to plug in custom actor as receiver
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index 9f6e163454..5ef1928294 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -48,7 +48,7 @@ object FlumeEventCount {
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index 61be1ce4b1..1486d77d8a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -40,7 +40,7 @@ object HdfsWordCount {
// Create the context
val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 8dc8a3531a..172091be2e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -48,7 +48,7 @@ object KafkaWordCount {
val Array(master, zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index ea138f55e8..2d02ef77c0 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -96,7 +96,7 @@ object MQTTWordCount {
val Seq(master, brokerUrl, topic) = args.toSeq
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
- Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ StreamingContext.jarOfClass(this.getClass))
val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(x => x.toString.split(" "))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index ce8df8c502..74d76ec26c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -41,7 +41,7 @@ object NetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
index fad512eeba..9d640e716b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
@@ -33,7 +33,7 @@ object QueueStream {
// Create the context
val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index 0b45c30d20..c0706d0724 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -49,7 +49,7 @@ object RawNetworkGrep {
// Create the context
val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Warm up the JVMs on master and slave for JIT compilation to kick in
RawTextHelper.warmUp(ssc.sparkContext)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index e55d71edfc..f43c8ab61d 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -49,7 +49,7 @@ object StatefulNetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 35b6329ab3..9d21d3178f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -60,7 +60,7 @@ object TwitterAlgebirdCMS {
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 8bfde2a829..5111e6f62a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -49,7 +49,7 @@ object TwitterAlgebirdHLL {
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 27aa6b14bf..7a3df687b7 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -38,7 +38,7 @@ object TwitterPopularTags {
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
val stream = ssc.twitterStream(None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 2948aa7cc4..89d3042123 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -78,7 +78,7 @@ object ZeroMQWordCount {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index 968b578487..0569846f18 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -42,7 +42,7 @@ object PageViewStream {
// Create the context
val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.socketTextStream(host, port)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 304986f187..b3a7cf08b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -595,6 +595,12 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls)
+
protected[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second batch intervals.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b79173c6aa..7dec4b3ad7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,29 +17,27 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Integer => JInt}
import java.io.InputStream
-import java.util.{Map => JMap, List => JList}
+import java.lang.{Integer => JInt}
+import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import twitter4j.Status
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-import akka.util.ByteString
-
import twitter4j.auth.Authorization
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.SparkConf
import org.apache.spark.streaming.scheduler.StreamingListener
/**
@@ -716,5 +714,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Sstops the execution of the streams.
*/
def stop() = ssc.stop()
+}
+object JavaStreamingContext {
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}