aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala32
-rw-r--r--external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala11
2 files changed, 31 insertions, 12 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index e73823e898..8273c2b49f 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kafka010
-import java.io.File
+import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap, Properties}
@@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
if (server != null) {
server.shutdown()
+ server.awaitShutdown()
server = null
}
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+ // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
+ // in some cases. It leads to test failures on Windows if the directory deletion failure
+ // throws an exception.
+ brokerConf.logDirs.foreach { f =>
+ try {
+ Utils.deleteRecursively(new File(f))
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ }
if (zkUtils != null) {
zkUtils.close()
@@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
def shutdown() {
factory.shutdown()
- Utils.deleteRecursively(snapshotDir)
- Utils.deleteRecursively(logDir)
+ // The directories are not closed even if the ZooKeeper server is shut down.
+ // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
+ // on Windows if the directory deletion failure throws an exception.
+ try {
+ Utils.deleteRecursively(snapshotDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ try {
+ Utils.deleteRecursively(logDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
}
}
}
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index fde3714d3d..88a312a189 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -53,7 +53,6 @@ class DirectKafkaStreamSuite
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
- private var sc: SparkContext = _
private var ssc: StreamingContext = _
private var testDir: File = _
@@ -73,11 +72,7 @@ class DirectKafkaStreamSuite
after {
if (ssc != null) {
- ssc.stop()
- sc = null
- }
- if (sc != null) {
- sc.stop()
+ ssc.stop(stopSparkContext = true)
}
if (testDir != null) {
Utils.deleteRecursively(testDir)
@@ -372,7 +367,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
@@ -411,7 +406,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20 seconds), interval(50 milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()