aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorwm624@hotmail.com <wm624@hotmail.com>2016-11-10 10:54:36 +0000
committerSean Owen <sowen@cloudera.com>2016-11-10 10:54:36 +0000
commit22a9d064e95af71f757113f1869f754cc862df35 (patch)
tree84ef9a282f54df6be1e44642863db65d2f4f8512 /external
parent96a59109a912db9d5f6fc07dedd9d8a3eee97b96 (diff)
downloadspark-22a9d064e95af71f757113f1869f754cc862df35.tar.gz
spark-22a9d064e95af71f757113f1869f754cc862df35.tar.bz2
spark-22a9d064e95af71f757113f1869f754cc862df35.zip
[SPARK-14914][CORE] Fix Resource not closed after using, for unit tests and example
## What changes were proposed in this pull request? This is a follow-up work of #15618. Close file source; For any newly created streaming context outside the withContext, explicitly close the context. ## How was this patch tested? Existing unit tests. Author: wm624@hotmail.com <wm624@hotmail.com> Closes #15818 from wangmiao1981/rtest.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala2
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala2
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala1
3 files changed, 5 insertions, 0 deletions
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 02aec43c3b..c81836da3c 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -272,6 +272,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
@@ -324,6 +325,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
// Test to verify the offset ranges can be recovered from the checkpoints
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index ab1c5055a2..8a747a5e29 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -184,6 +184,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
@@ -230,6 +231,7 @@ class DirectKafkaStreamSuite
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
+ ssc.stop()
}
// Test to verify the offset ranges can be recovered from the checkpoints
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 6a35ac14a8..426cd83b4d 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -80,5 +80,6 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(result.synchronized { sent === result })
}
+ ssc.stop()
}
}