aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorHyukjin Kwon <gurwls223@gmail.com>2016-11-07 12:47:39 -0800
committerMridul Muralidharan <mridul@gmail.com>2016-11-07 12:47:39 -0800
commit8f0ea011a7294679ec4275b2fef349ef45b6eb81 (patch)
tree9599da11d56741c69b408771fa59255541f0e57d /streaming/src
parent0d95662e7fff26669d4f70e88fdac7a4128a4f49 (diff)
downloadspark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.tar.gz
spark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.tar.bz2
spark-8f0ea011a7294679ec4275b2fef349ef45b6eb81.zip
[SPARK-14914][CORE] Fix Resource not closed after using, mostly for unit tests
## What changes were proposed in this pull request? Close `FileStreams`, `ZipFiles` etc to release the resources after using. Not closing the resources will cause IO Exception to be raised while deleting temp files. ## How was this patch tested? Existing tests Author: U-FAREAST\tl <tl@microsoft.com> Author: hyukjinkwon <gurwls223@gmail.com> Author: Tao LI <tl@microsoft.com> Closes #15618 from HyukjinKwon/SPARK-14914-1.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala5
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala1
5 files changed, 17 insertions, 7 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 3d54abd903..648a5abe0b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1805,6 +1805,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+ ssc.stop();
Utils.deleteRecursively(tempDir);
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index b79cc65d8b..41f16bfa5f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -642,16 +642,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val fileStream = ssc.textFileStream(testDir.toString)
// Make value 3 take a large time to process, to ensure that the driver
// shuts down in the middle of processing the 3rd batch
- CheckpointSuite.batchThreeShouldBlockIndefinitely = true
- val mappedStream = fileStream.map(s => {
+ CheckpointSuite.batchThreeShouldBlockALongTime = true
+ val mappedStream = fileStream.map { s =>
val i = s.toInt
if (i == 3) {
- while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
- Thread.sleep(Long.MaxValue)
+ if (CheckpointSuite.batchThreeShouldBlockALongTime) {
+ // It's not a good idea to let the thread run forever
+ // as resource won't be correctly released
+ Thread.sleep(6000)
}
}
i
- })
+ }
// Reducing over a large window to ensure that recovery from driver failure
// requires reprocessing of all the files seen before the failure
@@ -691,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}
// The original StreamingContext has now been stopped.
- CheckpointSuite.batchThreeShouldBlockIndefinitely = false
+ CheckpointSuite.batchThreeShouldBlockALongTime = false
// Create files while the streaming driver is down
for (i <- Seq(4, 5, 6)) {
@@ -928,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}
private object CheckpointSuite extends Serializable {
- var batchThreeShouldBlockIndefinitely: Boolean = true
+ var batchThreeShouldBlockALongTime: Boolean = true
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 60c8e70235..fff2d6fbac 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -164,6 +164,7 @@ object MasterFailureTest extends Logging {
val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)
fileGeneratingThread.join()
+ ssc.stop()
fs.delete(checkpointDir, true)
fs.delete(testDir, true)
logInfo("Finished test after " + killCount + " failures")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 851013bb1e..107c3f5dcc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -134,6 +134,7 @@ class ReceivedBlockTrackerSuite
val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1
+ tracker1.stop()
incrementTime()
@@ -141,6 +142,7 @@ class ReceivedBlockTrackerSuite
val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
tracker1_.hasUnallocatedReceivedBlocks should be (false)
+ tracker1_.stop()
// Restart tracker and verify recovered list of unallocated blocks
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
@@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite
val blockInfos2 = addBlockInfos(tracker2)
tracker2.allocateBlocksToBatch(batchTime2)
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+ tracker2.stop()
// Verify whether log has correct contents
val expectedWrittenData2 = expectedWrittenData1 ++
@@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite
getWriteAheadLogFiles() should not contain oldestLogFile
}
printLogFiles("After clean")
+ tracker3.stop()
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
@@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
+ tracker4.stop()
}
test("disable write ahead log when checkpoint directory is not set") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 24cb5afee3..4bec52b9fe 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests(
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
}
}
+ writeAheadLog.close()
}
test(testPrefix + "handling file errors while reading rotating logs") {