diff options
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 18605cac70..9dc26dc6b3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -21,7 +21,7 @@ package org.apache.spark.streaming.api.java import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import java.io.InputStream +import java.io.{Closeable, InputStream} import java.util.{List => JList, Map => JMap} import akka.actor.{Props, SupervisorStrategy} @@ -49,7 +49,7 @@ import org.apache.spark.streaming.receiver.Receiver * respectively. `context.awaitTransformation()` allows the current thread to wait for the * termination of a context by `stop()` or by an exception. */ -class JavaStreamingContext(val ssc: StreamingContext) { +class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { /** * Create a StreamingContext. @@ -540,6 +540,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = { ssc.stop(stopSparkContext, stopGracefully) } + + override def close(): Unit = stop() + } /** |