aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
2 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 8e178bc848..23f7e6be81 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -17,6 +17,7 @@
package org.apache.spark.api.java
+import java.io.Closeable
import java.util
import java.util.{Map => JMap}
@@ -40,7 +41,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*/
-class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
+class JavaSparkContext(val sc: SparkContext)
+ extends JavaSparkContextVarargsWorkaround with Closeable {
+
/**
* Create a JavaSparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
@@ -534,6 +537,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.stop()
}
+ override def close(): Unit = stop()
+
/**
* Get Spark's home location from either a value set through the constructor,
* or the spark.home Java property, or the SPARK_HOME environment variable
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 18605cac70..9dc26dc6b3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -21,7 +21,7 @@ package org.apache.spark.streaming.api.java
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import java.io.InputStream
+import java.io.{Closeable, InputStream}
import java.util.{List => JList, Map => JMap}
import akka.actor.{Props, SupervisorStrategy}
@@ -49,7 +49,7 @@ import org.apache.spark.streaming.receiver.Receiver
* respectively. `context.awaitTransformation()` allows the current thread to wait for the
* termination of a context by `stop()` or by an exception.
*/
-class JavaStreamingContext(val ssc: StreamingContext) {
+class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Create a StreamingContext.
@@ -540,6 +540,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
ssc.stop(stopSparkContext, stopGracefully)
}
+
+ override def close(): Unit = stop()
+
}
/**