aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala')
-rw-r--r--external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala11
1 files changed, 3 insertions, 8 deletions
diff --git a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 8a747a5e29..f8b34074f1 100644
--- a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -52,7 +52,6 @@ class DirectKafkaStreamSuite
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
- private var sc: SparkContext = _
private var ssc: StreamingContext = _
private var testDir: File = _
@@ -72,11 +71,7 @@ class DirectKafkaStreamSuite
after {
if (ssc != null) {
- ssc.stop()
- sc = null
- }
- if (sc != null) {
- sc.stop()
+ ssc.stop(stopSparkContext = true)
}
if (testDir != null) {
Utils.deleteRecursively(testDir)
@@ -276,7 +271,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
@@ -319,7 +314,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()