diff options
Diffstat (limited to 'external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala')
-rw-r--r-- | external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 32 |
1 files changed, 28 insertions, 4 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index e73823e898..8273c2b49f 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.io.File +import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress import java.util.{Map => JMap, Properties} @@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging { if (server != null) { server.shutdown() + server.awaitShutdown() server = null } - brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + } if (zkUtils != null) { zkUtils.close() @@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging { def shutdown() { factory.shutdown() - Utils.deleteRecursively(snapshotDir) - Utils.deleteRecursively(logDir) + // The directories are not closed even if the ZooKeeper server is shut down. + // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures + // on Windows if the directory deletion failure throws an exception. + try { + Utils.deleteRecursively(snapshotDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + try { + Utils.deleteRecursively(logDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } } } |