diff options
Diffstat (limited to 'external/kafka-0-10')
2 files changed, 31 insertions, 12 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index e73823e898..8273c2b49f 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kafka010 -import java.io.File +import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress import java.util.{Map => JMap, Properties} @@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging { if (server != null) { server.shutdown() + server.awaitShutdown() server = null } - brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) } + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + } if (zkUtils != null) { zkUtils.close() @@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging { def shutdown() { factory.shutdown() - Utils.deleteRecursively(snapshotDir) - Utils.deleteRecursively(logDir) + // The directories are not closed even if the ZooKeeper server is shut down. + // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures + // on Windows if the directory deletion failure throws an exception. + try { + Utils.deleteRecursively(snapshotDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + try { + Utils.deleteRecursively(logDir) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index fde3714d3d..88a312a189 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -53,7 +53,6 @@ class DirectKafkaStreamSuite .setMaster("local[4]") .setAppName(this.getClass.getSimpleName) - private var sc: SparkContext = _ private var ssc: StreamingContext = _ private var testDir: File = _ @@ -73,11 +72,7 @@ class DirectKafkaStreamSuite after { if (ssc != null) { - ssc.stop() - sc = null - } - if (sc != null) { - sc.stop() + ssc.stop(stopSparkContext = true) } if (testDir != null) { Utils.deleteRecursively(testDir) @@ -372,7 +367,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum) } @@ -411,7 +406,7 @@ class DirectKafkaStreamSuite sendData(i) } - eventually(timeout(10 seconds), interval(50 milliseconds)) { + eventually(timeout(20 seconds), interval(50 milliseconds)) { assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum) } ssc.stop() |