aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala3
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java5
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala1
5 files changed, 11 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index c48a38590e..b3ed302db6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -21,6 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
import StreamingContext._
import scala.util.Random
@@ -380,6 +381,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
override def run() {
val localTestDir = Files.createTempDir()
+ localTestDir.deleteOnExit()
var fs = testDir.getFileSystem(new Configuration())
val maxTries = 3
try {
@@ -421,6 +423,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
case e: Exception => logWarning("File generating in killing thread", e)
} finally {
fs.close()
+ Utils.deleteRecursively(localTestDir)
}
}
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index f9bfb9b744..ce58cb12a4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.streaming;
-import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.junit.Assert;
@@ -37,6 +36,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.util.Utils;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -1606,6 +1607,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(8,7));
File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -1627,6 +1629,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+ Utils.deleteRecursively(tempDir);
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 063529a9cb..45d2ec676d 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 25739956cb..d20a7b728c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -232,6 +232,7 @@ class CheckpointSuite extends TestSuiteBase {
test("recovery with file input stream") {
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
+ testDir.deleteOnExit()
var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
@@ -326,6 +327,7 @@ class CheckpointSuite extends TestSuiteBase {
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
+ Utils.deleteRecursively(testDir)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 3fa254065c..cd0aa4d0dc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -98,6 +98,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
+ testDir.deleteOnExit()
val ssc = new StreamingContext(conf, batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]