diff options
author | wm624@hotmail.com <wm624@hotmail.com> | 2016-11-10 10:54:36 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-11-10 10:54:36 +0000 |
commit | 22a9d064e95af71f757113f1869f754cc862df35 (patch) | |
tree | 84ef9a282f54df6be1e44642863db65d2f4f8512 /streaming | |
parent | 96a59109a912db9d5f6fc07dedd9d8a3eee97b96 (diff) | |
download | spark-22a9d064e95af71f757113f1869f754cc862df35.tar.gz spark-22a9d064e95af71f757113f1869f754cc862df35.tar.bz2 spark-22a9d064e95af71f757113f1869f754cc862df35.zip |
[SPARK-14914][CORE] Fix Resource not closed after using, for unit tests and example
## What changes were proposed in this pull request?
This is a follow-up work of #15618.
Close file source;
For any newly created streaming context outside the withContext, explicitly close the context.
## How was this patch tested?
Existing unit tests.
Author: wm624@hotmail.com <wm624@hotmail.com>
Closes #15818 from wangmiao1981/rtest.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 1 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala | 1 |
2 files changed, 2 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 41f16bfa5f..a1e9d1e023 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -815,6 +815,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester val ois = new ObjectInputStreamWithLoader( new ByteArrayInputStream(bos.toByteArray), loader) assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;") + ois.close() } test("SPARK-11267: the race condition of two checkpoints in a batch") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala index a2dbae149f..5f7f7fa5e6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala @@ -123,6 +123,7 @@ class JobGeneratorSuite extends TestSuiteBase { assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already deleted") assert(batchCounter.getNumCompletedBatches < longBatchNumber) waitLatch.countDown() + ssc.stop() } } } |