aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-05-12 14:16:19 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-12 14:16:19 -0700
commit7120a2979d0a9f0f54a88b2416be7ca10e74f409 (patch)
treed3db2f178f003fc79cee2ec3fe60508e56f29f8d /streaming
parent1e4a65e69489ff877e6da6f78b1c1306335e373c (diff)
downloadspark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.tar.gz
spark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.tar.bz2
spark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.zip
SPARK-1798. Tests should clean up temp files
Three issues related to temp files that tests generate – these should be touched up for hygiene but are not urgent. Modules have a log4j.properties which directs the unit-test.log output file to a directory like `[module]/target/unit-test.log`. But this ends up creating `[module]/[module]/target/unit-test.log` instead of former. The `work/` directory is not deleted by "mvn clean", in the parent and in modules. Neither is the `checkpoint/` directory created under the various external modules. Many tests create a temp directory, which is not usually deleted. This can be largely resolved by calling `deleteOnExit()` at creation and trying to call `Utils.deleteRecursively` consistently to clean up, sometimes in an `@After` method. _If anyone seconds the motion, I can create a more significant change that introduces a new test trait along the lines of `LocalSparkContext`, which provides management of temp directories for subclasses to take advantage of._ Author: Sean Owen <sowen@cloudera.com> Closes #732 from srowen/SPARK-1798 and squashes the following commits: 5af578e [Sean Owen] Try to consistently delete test temp dirs and files, and set deleteOnExit() for each b21b356 [Sean Owen] Remove work/ and checkpoint/ dirs with mvn clean bdd0f41 [Sean Owen] Remove duplicate module dir in log4j.properties output path for tests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala3
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java5
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala1
5 files changed, 11 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index c48a38590e..b3ed302db6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -21,6 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
import StreamingContext._
import scala.util.Random
@@ -380,6 +381,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
override def run() {
val localTestDir = Files.createTempDir()
+ localTestDir.deleteOnExit()
var fs = testDir.getFileSystem(new Configuration())
val maxTries = 3
try {
@@ -421,6 +423,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
case e: Exception => logWarning("File generating in killing thread", e)
} finally {
fs.close()
+ Utils.deleteRecursively(localTestDir)
}
}
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index f9bfb9b744..ce58cb12a4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.streaming;
-import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.junit.Assert;
@@ -37,6 +36,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.util.Utils;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -1606,6 +1607,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(8,7));
File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -1627,6 +1629,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+ Utils.deleteRecursively(tempDir);
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 063529a9cb..45d2ec676d 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 25739956cb..d20a7b728c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -232,6 +232,7 @@ class CheckpointSuite extends TestSuiteBase {
test("recovery with file input stream") {
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
+ testDir.deleteOnExit()
var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
@@ -326,6 +327,7 @@ class CheckpointSuite extends TestSuiteBase {
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
+ Utils.deleteRecursively(testDir)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 3fa254065c..cd0aa4d0dc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -98,6 +98,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
+ testDir.deleteOnExit()
val ssc = new StreamingContext(conf, batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]