diff options
author | Hyukjin Kwon <gurwls223@gmail.com> | 2016-11-07 12:47:39 -0800 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2016-11-07 12:47:39 -0800 |
commit | 8f0ea011a7294679ec4275b2fef349ef45b6eb81 (patch) | |
tree | 9599da11d56741c69b408771fa59255541f0e57d /streaming/src | |
parent | 0d95662e7fff26669d4f70e88fdac7a4128a4f49 (diff) | |
download | spark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.tar.gz spark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.tar.bz2 spark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.zip |
[SPARK-14914][CORE] Fix Resource not closed after using, mostly for unit tests
## What changes were proposed in this pull request?
Close `FileStreams`, `ZipFiles` etc to release the resources after using. Not closing the resources will cause IO Exception to be raised while deleting temp files.
## How was this patch tested?
Existing tests
Author: U-FAREAST\tl <tl@microsoft.com>
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Tao LI <tl@microsoft.com>
Closes #15618 from HyukjinKwon/SPARK-14914-1.
Diffstat (limited to 'streaming/src')
5 files changed, 17 insertions, 7 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 3d54abd903..648a5abe0b 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1805,6 +1805,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // will be re-processed after recovery List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); + ssc.stop(); Utils.deleteRecursively(tempDir); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index b79cc65d8b..41f16bfa5f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -642,16 +642,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val fileStream = ssc.textFileStream(testDir.toString) // Make value 3 take a large time to process, to ensure that the driver // shuts down in the middle of processing the 3rd batch - CheckpointSuite.batchThreeShouldBlockIndefinitely = true - val mappedStream = fileStream.map(s => { + CheckpointSuite.batchThreeShouldBlockALongTime = true + val mappedStream = fileStream.map { s => val i = s.toInt if (i == 3) { - while (CheckpointSuite.batchThreeShouldBlockIndefinitely) { - Thread.sleep(Long.MaxValue) + if (CheckpointSuite.batchThreeShouldBlockALongTime) { + // It's not a good idea to let the thread run forever + // as resource won't be correctly released + Thread.sleep(6000) } } i - }) + } // Reducing over a large window to ensure that recovery from driver failure // requires reprocessing of all the files seen before the failure @@ -691,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } // The original StreamingContext has now been stopped. - CheckpointSuite.batchThreeShouldBlockIndefinitely = false + CheckpointSuite.batchThreeShouldBlockALongTime = false // Create files while the streaming driver is down for (i <- Seq(4, 5, 6)) { @@ -928,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } private object CheckpointSuite extends Serializable { - var batchThreeShouldBlockIndefinitely: Boolean = true + var batchThreeShouldBlockALongTime: Boolean = true } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 60c8e70235..fff2d6fbac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -164,6 +164,7 @@ object MasterFailureTest extends Logging { val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun) fileGeneratingThread.join() + ssc.stop() fs.delete(checkpointDir, true) fs.delete(testDir, true) logInfo("Finished test after " + killCount + " failures") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 851013bb1e..107c3f5dcc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -134,6 +134,7 @@ class ReceivedBlockTrackerSuite val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent) getWrittenLogData() shouldEqual expectedWrittenData1 getWriteAheadLogFiles() should have size 1 + tracker1.stop() incrementTime() @@ -141,6 +142,7 @@ class ReceivedBlockTrackerSuite val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false) tracker1_.getUnallocatedBlocks(streamId) shouldBe empty tracker1_.hasUnallocatedReceivedBlocks should be (false) + tracker1_.stop() // Restart tracker and verify recovered list of unallocated blocks val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true) @@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite val blockInfos2 = addBlockInfos(tracker2) tracker2.allocateBlocksToBatch(batchTime2) tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 + tracker2.stop() // Verify whether log has correct contents val expectedWrittenData2 = expectedWrittenData1 ++ @@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite getWriteAheadLogFiles() should not contain oldestLogFile } printLogFiles("After clean") + tracker3.stop() // Restart tracker and verify recovered state, specifically whether info about the first // batch has been removed, but not the second batch @@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite tracker4.getUnallocatedBlocks(streamId) shouldBe empty tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2 + tracker4.stop() } test("disable write ahead log when checkpoint directory is not set") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 24cb5afee3..4bec52b9fe 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests( assert(getLogFilesInDirectory(testDir).size < logFiles.size) } } + writeAheadLog.close() } test(testPrefix + "handling file errors while reading rotating logs") { |