From 7120a2979d0a9f0f54a88b2416be7ca10e74f409 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 12 May 2014 14:16:19 -0700 Subject: SPARK-1798. Tests should clean up temp files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three issues related to temp files that tests generate – these should be touched up for hygiene but are not urgent. Modules have a log4j.properties which directs the unit-test.log output file to a directory like `[module]/target/unit-test.log`. But this ends up creating `[module]/[module]/target/unit-test.log` instead of former. The `work/` directory is not deleted by "mvn clean", in the parent and in modules. Neither is the `checkpoint/` directory created under the various external modules. Many tests create a temp directory, which is not usually deleted. This can be largely resolved by calling `deleteOnExit()` at creation and trying to call `Utils.deleteRecursively` consistently to clean up, sometimes in an `@After` method. _If anyone seconds the motion, I can create a more significant change that introduces a new test trait along the lines of `LocalSparkContext`, which provides management of temp directories for subclasses to take advantage of._ Author: Sean Owen Closes #732 from srowen/SPARK-1798 and squashes the following commits: 5af578e [Sean Owen] Try to consistently delete test temp dirs and files, and set deleteOnExit() for each b21b356 [Sean Owen] Remove work/ and checkpoint/ dirs with mvn clean bdd0f41 [Sean Owen] Remove duplicate module dir in log4j.properties output path for tests --- .../scala/org/apache/spark/streaming/util/MasterFailureTest.scala | 3 +++ streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 5 ++++- streaming/src/test/resources/log4j.properties | 2 +- .../src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 2 ++ .../test/scala/org/apache/spark/streaming/InputStreamsSuite.scala | 1 + 5 files changed, 11 insertions(+), 2 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index c48a38590e..b3ed302db6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -21,6 +21,7 @@ import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} +import org.apache.spark.util.Utils import StreamingContext._ import scala.util.Random @@ -380,6 +381,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) override def run() { val localTestDir = Files.createTempDir() + localTestDir.deleteOnExit() var fs = testDir.getFileSystem(new Configuration()) val maxTries = 3 try { @@ -421,6 +423,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) case e: Exception => logWarning("File generating in killing thread", e) } finally { fs.close() + Utils.deleteRecursively(localTestDir) } } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index f9bfb9b744..ce58cb12a4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,7 +17,6 @@ package org.apache.spark.streaming; -import org.apache.spark.streaming.api.java.*; import scala.Tuple2; import org.junit.Assert; @@ -37,6 +36,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.util.Utils; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -1606,6 +1607,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(8,7)); File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); ssc.checkpoint(tempDir.getAbsolutePath()); JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); @@ -1627,6 +1629,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa // will be re-processed after recovery List> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3); assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3)); + Utils.deleteRecursively(tempDir); } diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties index 063529a9cb..45d2ec676d 100644 --- a/streaming/src/test/resources/log4j.properties +++ b/streaming/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.file=target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 25739956cb..d20a7b728c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -232,6 +232,7 @@ class CheckpointSuite extends TestSuiteBase { test("recovery with file input stream") { // Set up the streaming context and input streams val testDir = Files.createTempDir() + testDir.deleteOnExit() var ssc = new StreamingContext(master, framework, Seconds(1)) ssc.checkpoint(checkpointDir) val fileStream = ssc.textFileStream(testDir.toString) @@ -326,6 +327,7 @@ class CheckpointSuite extends TestSuiteBase { ) // To ensure that all the inputs were received correctly assert(expectedOutput.last === output.last) + Utils.deleteRecursively(testDir) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 3fa254065c..cd0aa4d0dc 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -98,6 +98,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val testDir = Files.createTempDir() + testDir.deleteOnExit() val ssc = new StreamingContext(conf, batchDuration) val fileStream = ssc.textFileStream(testDir.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] -- cgit v1.2.3