aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-02 16:26:24 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-02 16:26:24 -0800
commit2ebd1df3f17993f3cb472ec44c8832213976d99a (patch)
tree27369ea3bbc025e1c43f282fea96129db7d879d9 /external/flume/src
parent9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff)
downloadspark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures. Author: Aaron Davidson <aaron@databricks.com> Closes #3053 from aarondav/leak and squashes the following commits: e676d18 [Aaron Davidson] Typo! 8f96475 [Aaron Davidson] Keep original ssc semantics 7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
Diffstat (limited to 'external/flume/src')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala15
1 files changed, 10 insertions, 5 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 32a19787a2..475026e8eb 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase {
outputStream.register()
ssc.start()
- writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
- assertChannelIsEmpty(channel)
- assertChannelIsEmpty(channel2)
- sink.stop()
- channel.stop()
+ try {
+ writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
+ assertChannelIsEmpty(channel)
+ assertChannelIsEmpty(channel2)
+ } finally {
+ sink.stop()
+ sink2.stop()
+ channel.stop()
+ channel2.stop()
+ }
}
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,