aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala32
1 files changed, 28 insertions, 4 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index e73823e898..8273c2b49f 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.kafka010
-import java.io.File
+import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.util.{Map => JMap, Properties}
@@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
if (server != null) {
server.shutdown()
+ server.awaitShutdown()
server = null
}
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+ // On Windows, `logDirs` is left open even after Kafka server above is completely shut down
+ // in some cases. It leads to test failures on Windows if the directory deletion failure
+ // throws an exception.
+ brokerConf.logDirs.foreach { f =>
+ try {
+ Utils.deleteRecursively(new File(f))
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ }
if (zkUtils != null) {
zkUtils.close()
@@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
def shutdown() {
factory.shutdown()
- Utils.deleteRecursively(snapshotDir)
- Utils.deleteRecursively(logDir)
+ // The directories are not closed even if the ZooKeeper server is shut down.
+ // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
+ // on Windows if the directory deletion failure throws an exception.
+ try {
+ Utils.deleteRecursively(snapshotDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
+ try {
+ Utils.deleteRecursively(logDir)
+ } catch {
+ case e: IOException if Utils.isWindows =>
+ logWarning(e.getMessage)
+ }
}
}
}