aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-05 11:43:00 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-05 11:49:42 -0800
commit79f52809c836d08023aa5ca99a467d3a311a7359 (patch)
tree3ae603f0187bcc285aa4844f220c1f3254ad1923 /streaming
parentd43ad3ef2c3d4b690ba1d053729daefb507cd23c (diff)
downloadspark-79f52809c836d08023aa5ca99a467d3a311a7359.tar.gz
spark-79f52809c836d08023aa5ca99a467d3a311a7359.tar.bz2
spark-79f52809c836d08023aa5ca99a467d3a311a7359.zip
Removing SPARK_EXAMPLES_JAR in the code
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala25
2 files changed, 21 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 304986f187..b3a7cf08b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -595,6 +595,12 @@ object StreamingContext {
new PairDStreamFunctions[K, V](stream)
}
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls)
+
protected[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
// Set the default cleaner delay to an hour if not already set.
// This should be sufficient for even 1 second batch intervals.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b79173c6aa..7dec4b3ad7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,29 +17,27 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Integer => JInt}
import java.io.InputStream
-import java.util.{Map => JMap, List => JList}
+import java.lang.{Integer => JInt}
+import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import akka.actor.{Props, SupervisorStrategy}
+import akka.util.ByteString
+import akka.zeromq.Subscribe
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import twitter4j.Status
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
-import akka.zeromq.Subscribe
-import akka.util.ByteString
-
import twitter4j.auth.Authorization
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.SparkConf
import org.apache.spark.streaming.scheduler.StreamingListener
/**
@@ -716,5 +714,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* Sstops the execution of the streams.
*/
def stop() = ssc.stop()
+}
+object JavaStreamingContext {
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext.
+ */
+ def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
}