aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-09-12 22:50:37 -0700
committerReynold Xin <rxin@apache.org>2014-09-12 22:50:37 -0700
commitfeaa3706f17e44efcdac9f0a543a5b91232771ce (patch)
tree13a8a31e2ce29ae253bb295bedc5e4dbe29b7225 /streaming
parente11eeb71fa3a5fe7ddacb94d5b93b173d4d901a8 (diff)
downloadspark-feaa3706f17e44efcdac9f0a543a5b91232771ce.tar.gz
spark-feaa3706f17e44efcdac9f0a543a5b91232771ce.tar.bz2
spark-feaa3706f17e44efcdac9f0a543a5b91232771ce.zip
SPARK-3470 [CORE] [STREAMING] Add Closeable / close() to Java context objects
... that expose a stop() lifecycle method. This doesn't add `AutoCloseable`, which is Java 7+ only. But it should be possible to use try-with-resources on a `Closeable` in Java 7, as long as the `close()` does not throw a checked exception, and these don't. Q.E.D. Author: Sean Owen <sowen@cloudera.com> Closes #2346 from srowen/SPARK-3470 and squashes the following commits: 612c21d [Sean Owen] Add Closeable / close() to Java context objects that expose a stop() lifecycle method
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
1 files changed, 5 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 18605cac70..9dc26dc6b3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -21,7 +21,7 @@ package org.apache.spark.streaming.api.java
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
-import java.io.InputStream
+import java.io.{Closeable, InputStream}
import java.util.{List => JList, Map => JMap}
import akka.actor.{Props, SupervisorStrategy}
@@ -49,7 +49,7 @@ import org.apache.spark.streaming.receiver.Receiver
* respectively. `context.awaitTransformation()` allows the current thread to wait for the
* termination of a context by `stop()` or by an exception.
*/
-class JavaStreamingContext(val ssc: StreamingContext) {
+class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Create a StreamingContext.
@@ -540,6 +540,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
ssc.stop(stopSparkContext, stopGracefully)
}
+
+ override def close(): Unit = stop()
+
}
/**