diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-11-02 16:26:24 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-11-02 16:26:24 -0800 |
commit | 2ebd1df3f17993f3cb472ec44c8832213976d99a (patch) | |
tree | 27369ea3bbc025e1c43f282fea96129db7d879d9 /external/flume/src | |
parent | 9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff) | |
download | spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2 spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip |
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures.
Author: Aaron Davidson <aaron@databricks.com>
Closes #3053 from aarondav/leak and squashes the following commits:
e676d18 [Aaron Davidson] Typo!
8f96475 [Aaron Davidson] Keep original ssc semantics
7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
Diffstat (limited to 'external/flume/src')
-rw-r--r-- | external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 32a19787a2..475026e8eb 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase { outputStream.register() ssc.start() - writeAndVerify(Seq(channel, channel2), ssc, outputBuffer) - assertChannelIsEmpty(channel) - assertChannelIsEmpty(channel2) - sink.stop() - channel.stop() + try { + writeAndVerify(Seq(channel, channel2), ssc, outputBuffer) + assertChannelIsEmpty(channel) + assertChannelIsEmpty(channel2) + } finally { + sink.stop() + sink2.stop() + channel.stop() + channel2.stop() + } } def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, |