aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-05-12 14:16:19 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-12 14:16:19 -0700
commit7120a2979d0a9f0f54a88b2416be7ca10e74f409 (patch)
treed3db2f178f003fc79cee2ec3fe60508e56f29f8d
parent1e4a65e69489ff877e6da6f78b1c1306335e373c (diff)
downloadspark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.tar.gz
spark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.tar.bz2
spark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.zip
SPARK-1798. Tests should clean up temp files
Three issues related to temp files that tests generate – these should be touched up for hygiene but are not urgent. Modules have a log4j.properties which directs the unit-test.log output file to a directory like `[module]/target/unit-test.log`. But this ends up creating `[module]/[module]/target/unit-test.log` instead of former. The `work/` directory is not deleted by "mvn clean", in the parent and in modules. Neither is the `checkpoint/` directory created under the various external modules. Many tests create a temp directory, which is not usually deleted. This can be largely resolved by calling `deleteOnExit()` at creation and trying to call `Utils.deleteRecursively` consistently to clean up, sometimes in an `@After` method. _If anyone seconds the motion, I can create a more significant change that introduces a new test trait along the lines of `LocalSparkContext`, which provides management of temp directories for subclasses to take advantage of._ Author: Sean Owen <sowen@cloudera.com> Closes #732 from srowen/SPARK-1798 and squashes the following commits: 5af578e [Sean Owen] Try to consistently delete test temp dirs and files, and set deleteOnExit() for each b21b356 [Sean Owen] Remove work/ and checkpoint/ dirs with mvn clean bdd0f41 [Sean Owen] Remove duplicate module dir in log4j.properties output path for tests
-rw-r--r--bagel/src/test/resources/log4j.properties2
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala18
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java18
-rw-r--r--core/src/test/resources/log4j.properties2
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala56
-rw-r--r--core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala1
-rw-r--r--external/flume/src/test/resources/log4j.properties2
-rw-r--r--external/kafka/src/test/resources/log4j.properties2
-rw-r--r--external/mqtt/src/test/resources/log4j.properties2
-rw-r--r--external/twitter/src/test/resources/log4j.properties2
-rw-r--r--external/zeromq/src/test/resources/log4j.properties2
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java3
-rw-r--r--graphx/src/test/resources/log4j.properties2
-rw-r--r--mllib/src/test/resources/log4j.properties2
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala17
-rw-r--r--pom.xml15
-rw-r--r--repl/src/test/resources/log4j.properties2
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala24
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala3
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java5
-rw-r--r--streaming/src/test/resources/log4j.properties2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala1
35 files changed, 193 insertions, 114 deletions
diff --git a/bagel/src/test/resources/log4j.properties b/bagel/src/test/resources/log4j.properties
index 5cdcf35b23..30b4baa4d7 100644
--- a/bagel/src/test/resources/log4j.properties
+++ b/bagel/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=bagel/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 8ae0215482..885c6829a2 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -43,6 +43,7 @@ private[spark] object TestUtils {
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
val tempDir = Files.createTempDir()
+ tempDir.deleteOnExit()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8f7594ada2..0631e54237 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -586,15 +586,17 @@ private[spark] object Utils extends Logging {
* Don't follow directories if they are symlinks.
*/
def deleteRecursively(file: File) {
- if ((file.isDirectory) && !isSymlink(file)) {
- for (child <- listFilesSafely(file)) {
- deleteRecursively(child)
+ if (file != null) {
+ if ((file.isDirectory) && !isSymlink(file)) {
+ for (child <- listFilesSafely(file)) {
+ deleteRecursively(child)
+ }
}
- }
- if (!file.delete()) {
- // Delete can also fail if the file simply did not exist
- if (file.exists()) {
- throw new IOException("Failed to delete: " + file.getAbsolutePath)
+ if (!file.delete()) {
+ // Delete can also fail if the file simply did not exist
+ if (file.exists()) {
+ throw new IOException("Failed to delete: " + file.getAbsolutePath)
+ }
}
}
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 1912015827..3dd79243ab 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -18,7 +18,6 @@
package org.apache.spark;
import java.io.*;
-import java.lang.StringBuilder;
import java.util.*;
import scala.Tuple2;
@@ -49,16 +48,20 @@ import org.apache.spark.partial.BoundedDouble;
import org.apache.spark.partial.PartialResult;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.StatCounter;
+import org.apache.spark.util.Utils;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite implements Serializable {
private transient JavaSparkContext sc;
+ private transient File tempDir;
@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaAPISuite");
+ tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
}
@After
@@ -67,6 +70,7 @@ public class JavaAPISuite implements Serializable {
sc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
+ Utils.deleteRecursively(tempDir);
}
static class ReverseIntComparator implements Comparator<Integer>, Serializable {
@@ -611,7 +615,6 @@ public class JavaAPISuite implements Serializable {
@Test
public void textFiles() throws IOException {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
rdd.saveAsTextFile(outputDir);
@@ -630,7 +633,6 @@ public class JavaAPISuite implements Serializable {
byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8");
- File tempDir = Files.createTempDir();
String tempDirName = tempDir.getAbsolutePath();
DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000"));
ds.write(content1);
@@ -653,7 +655,6 @@ public class JavaAPISuite implements Serializable {
@Test
public void textFilesCompressed() throws IOException {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
rdd.saveAsTextFile(outputDir, DefaultCodec.class);
@@ -667,7 +668,6 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
public void sequenceFile() {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
@@ -697,7 +697,6 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
public void writeWithNewAPIHadoopFile() {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
@@ -728,7 +727,6 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
@@ -758,7 +756,6 @@ public class JavaAPISuite implements Serializable {
@Test
public void objectFilesOfInts() {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
rdd.saveAsObjectFile(outputDir);
@@ -771,7 +768,6 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
public void objectFilesOfComplexTypes() {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
@@ -788,7 +784,6 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
public void hadoopFile() {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
@@ -818,7 +813,6 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
public void hadoopFileCompressed() {
- File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
@@ -948,7 +942,6 @@ public class JavaAPISuite implements Serializable {
@Test
public void checkpointAndComputation() {
- File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
@@ -960,7 +953,6 @@ public class JavaAPISuite implements Serializable {
@Test
public void checkpointAndRestore() {
- File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath());
Assert.assertEquals(false, rdd.isCheckpointed());
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index f6fef03689..26b73a1b39 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=core/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index d2555b7c05..64933f4b10 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -35,6 +35,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
override def beforeEach() {
super.beforeEach()
checkpointDir = File.createTempFile("temp", "")
+ checkpointDir.deleteOnExit()
checkpointDir.delete()
sc = new SparkContext("local", "test")
sc.setCheckpointDir(checkpointDir.toString)
@@ -42,9 +43,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
override def afterEach() {
super.afterEach()
- if (checkpointDir != null) {
- checkpointDir.delete()
- }
+ Utils.deleteRecursively(checkpointDir)
}
test("basic checkpointing") {
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index d651fbbac4..7e18f45de7 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -24,9 +24,11 @@ import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
+import org.apache.spark.util.Utils
class FileServerSuite extends FunSuite with LocalSparkContext {
+ @transient var tmpDir: File = _
@transient var tmpFile: File = _
@transient var tmpJarUrl: String = _
@@ -38,15 +40,18 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
override def beforeAll() {
super.beforeAll()
- val tmpDir = new File(Files.createTempDir(), "test")
- tmpDir.mkdir()
- val textFile = new File(tmpDir, "FileServerSuite.txt")
+ tmpDir = Files.createTempDir()
+ tmpDir.deleteOnExit()
+ val testTempDir = new File(tmpDir, "test")
+ testTempDir.mkdir()
+
+ val textFile = new File(testTempDir, "FileServerSuite.txt")
val pw = new PrintWriter(textFile)
pw.println("100")
pw.close()
- val jarFile = new File(tmpDir, "test.jar")
+ val jarFile = new File(testTempDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
System.setProperty("spark.authenticate", "false")
@@ -70,6 +75,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
tmpJarUrl = jarFile.toURI.toURL.toString
}
+ override def afterAll() {
+ super.afterAll()
+ Utils.deleteRecursively(tmpDir)
+ }
+
test("Distributing files locally") {
sc = new SparkContext("local[4]", "test")
sc.addFile(tmpFile.toString)
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index b9b668d3cc..1f2206b1f0 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -30,12 +30,24 @@ import org.apache.hadoop.mapreduce.Job
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
+import org.apache.spark.util.Utils
class FileSuite extends FunSuite with LocalSparkContext {
+ var tempDir: File = _
+
+ override def beforeEach() {
+ super.beforeEach()
+ tempDir = Files.createTempDir()
+ tempDir.deleteOnExit()
+ }
+
+ override def afterEach() {
+ super.afterEach()
+ Utils.deleteRecursively(tempDir)
+ }
test("text files") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
nums.saveAsTextFile(outputDir)
@@ -49,7 +61,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("text files (compressed)") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
val codec = new DefaultCodec()
@@ -71,7 +82,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("SequenceFiles") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
nums.saveAsSequenceFile(outputDir)
@@ -82,7 +92,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("SequenceFile (compressed)") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val normalDir = new File(tempDir, "output_normal").getAbsolutePath
val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
val codec = new DefaultCodec()
@@ -104,7 +113,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("SequenceFile with writable key") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
nums.saveAsSequenceFile(outputDir)
@@ -115,7 +123,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("SequenceFile with writable value") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
nums.saveAsSequenceFile(outputDir)
@@ -126,7 +133,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("SequenceFile with writable key and value") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsSequenceFile(outputDir)
@@ -137,7 +143,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("implicit conversions in reading SequenceFiles") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
nums.saveAsSequenceFile(outputDir)
@@ -154,7 +159,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("object files of ints") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
nums.saveAsObjectFile(outputDir)
@@ -165,7 +169,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("object files of complex types") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
nums.saveAsObjectFile(outputDir)
@@ -177,7 +180,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, Text]](
@@ -189,7 +191,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("read SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
nums.saveAsSequenceFile(outputDir)
@@ -200,7 +201,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
test("file caching") {
sc = new SparkContext("local", "test")
- val tempDir = Files.createTempDir()
val out = new FileWriter(tempDir + "/input")
out.write("Hello world!\n")
out.write("What's up?\n")
@@ -214,67 +214,61 @@ class FileSuite extends FunSuite with LocalSparkContext {
test ("prevent user from overwriting the empty directory (old Hadoop API)") {
sc = new SparkContext("local", "test")
- val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsTextFile(tempdir.getPath)
+ randomRDD.saveAsTextFile(tempDir.getPath)
}
}
test ("prevent user from overwriting the non-empty directory (old Hadoop API)") {
sc = new SparkContext("local", "test")
- val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1)
- randomRDD.saveAsTextFile(tempdir.getPath + "/output")
- assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
+ randomRDD.saveAsTextFile(tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-00000").exists() === true)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsTextFile(tempdir.getPath + "/output")
+ randomRDD.saveAsTextFile(tempDir.getPath + "/output")
}
}
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
- val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
}
}
test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
- val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
- randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
- assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output")
+ assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
}
}
test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
- val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
- job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
+ job.set("mapred.output.dir", tempDir.getPath + "/outputDataset_old")
randomRDD.saveAsHadoopDataset(job)
- assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
+ assert(new File(tempDir.getPath + "/outputDataset_old/part-00000").exists() === true)
}
test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
- val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
- job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
+ job.getConfiguration.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new")
randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
- assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
+ assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
}
}
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index 33d6de9a76..d5ebfb3f3f 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.FunSuite
import org.apache.hadoop.io.Text
import org.apache.spark.SparkContext
+import org.apache.spark.util.Utils
/**
* Tests the correctness of
@@ -67,6 +68,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
test("Correctness of WholeTextFileRecordReader.") {
val dir = Files.createTempDir()
+ dir.deleteOnExit()
println(s"Local disk address is ${dir.toString}.")
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
@@ -86,7 +88,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
s"file $filename contents can not match.")
}
- dir.delete()
+ Utils.deleteRecursively(dir)
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 8f3e6bd21b..1230565ea5 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -236,11 +236,13 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("zero-partition RDD") {
val emptyDir = Files.createTempDir()
+ emptyDir.deleteOnExit()
val file = sc.textFile(emptyDir.getAbsolutePath)
assert(file.partitions.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ emptyDir.delete()
}
test("keys and values") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 95f5bcd855..21e3db34b8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler
import scala.collection.mutable
import scala.io.Source
-import scala.util.Try
import com.google.common.io.Files
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -30,6 +29,8 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}
+import java.io.File
+
/**
* Test whether EventLoggingListener logs events properly.
*
@@ -43,11 +44,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
- private val testDir = Files.createTempDir()
- private val logDirPath = Utils.getFilePath(testDir, "spark-events")
+ private var testDir: File = _
+ private var logDirPath: Path = _
+
+ before {
+ testDir = Files.createTempDir()
+ testDir.deleteOnExit()
+ logDirPath = Utils.getFilePath(testDir, "spark-events")
+ }
after {
- Try { fileSystem.delete(logDirPath, true) }
+ Utils.deleteRecursively(testDir)
}
test("Parse names of special files") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index d1fe1fc348..d81499ac6a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -17,9 +17,7 @@
package org.apache.spark.scheduler
-import java.io.PrintWriter
-
-import scala.util.Try
+import java.io.{File, PrintWriter}
import com.google.common.io.Files
import org.json4s.jackson.JsonMethods._
@@ -39,11 +37,15 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
- private val testDir = Files.createTempDir()
+ private var testDir: File = _
+
+ before {
+ testDir = Files.createTempDir()
+ testDir.deleteOnExit()
+ }
after {
- Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) }
- Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) }
+ Utils.deleteRecursively(testDir)
}
test("Simple replay") {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 42bfbf1bdf..2167718fd2 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -23,18 +23,16 @@ import scala.collection.mutable
import scala.language.reflectiveCalls
import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterEach, FunSuite}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
-class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
+class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
private val testConf = new SparkConf(false)
- val rootDir0 = Files.createTempDir()
- rootDir0.deleteOnExit()
- val rootDir1 = Files.createTempDir()
- rootDir1.deleteOnExit()
- val rootDirs = rootDir0.getName + "," + rootDir1.getName
- println("Created root dirs: " + rootDirs)
+ private var rootDir0: File = _
+ private var rootDir1: File = _
+ private var rootDirs: String = _
// This suite focuses primarily on consolidation features,
// so we coerce consolidation if not already enabled.
@@ -48,6 +46,22 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
var diskBlockManager: DiskBlockManager = _
+ override def beforeAll() {
+ super.beforeAll()
+ rootDir0 = Files.createTempDir()
+ rootDir0.deleteOnExit()
+ rootDir1 = Files.createTempDir()
+ rootDir1.deleteOnExit()
+ rootDirs = rootDir0.getName + "," + rootDir1.getName
+ println("Created root dirs: " + rootDirs)
+ }
+
+ override def afterAll() {
+ super.afterAll()
+ Utils.deleteRecursively(rootDir0)
+ Utils.deleteRecursively(rootDir1)
+ }
+
override def beforeEach() {
diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
shuffleBlockManager.idToSegmentMap.clear()
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
index f675e1e5b4..44332fc8db 100644
--- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -17,10 +17,9 @@
package org.apache.spark.util
-import java.io.IOException
+import java.io.{File, IOException}
import scala.io.Source
-import scala.util.Try
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
@@ -38,12 +37,18 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
"org.apache.spark.io.LZFCompressionCodec",
"org.apache.spark.io.SnappyCompressionCodec"
)
- private val testDir = Files.createTempDir()
- private val logDirPath = Utils.getFilePath(testDir, "test-file-logger")
- private val logDirPathString = logDirPath.toString
+ private var testDir: File = _
+ private var logDirPath: Path = _
+ private var logDirPathString: String = _
+
+ before {
+ testDir = Files.createTempDir()
+ logDirPath = Utils.getFilePath(testDir, "test-file-logger")
+ logDirPathString = logDirPath.toString
+ }
after {
- Try { fileSystem.delete(logDirPath, true) }
+ Utils.deleteRecursively(testDir)
}
test("Simple logging") {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index eb7fb63182..cf9e20d347 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -112,6 +112,7 @@ class UtilsSuite extends FunSuite {
test("reading offset bytes of a file") {
val tmpDir2 = Files.createTempDir()
+ tmpDir2.deleteOnExit()
val f1Path = tmpDir2 + "/f1"
val f1 = new FileOutputStream(f1Path)
f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
index d1bd73a843..45d2ec676d 100644
--- a/external/flume/src/test/resources/log4j.properties
+++ b/external/flume/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=external/flume/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties
index 38910d1130..45d2ec676d 100644
--- a/external/kafka/src/test/resources/log4j.properties
+++ b/external/kafka/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=external/kafka/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/external/mqtt/src/test/resources/log4j.properties b/external/mqtt/src/test/resources/log4j.properties
index d0462c7336..45d2ec676d 100644
--- a/external/mqtt/src/test/resources/log4j.properties
+++ b/external/mqtt/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=external/mqtt/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/external/twitter/src/test/resources/log4j.properties b/external/twitter/src/test/resources/log4j.properties
index c918335fcd..45d2ec676d 100644
--- a/external/twitter/src/test/resources/log4j.properties
+++ b/external/twitter/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=external/twitter/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
index 304683dd0b..45d2ec676d 100644
--- a/external/zeromq/src/test/resources/log4j.properties
+++ b/external/zeromq/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=external/zeromq/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index feabca6733..84d3b6f243 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
+import org.apache.spark.util.Utils;
/**
* Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
@@ -249,6 +250,7 @@ public class Java8APISuite implements Serializable {
@Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
@@ -265,6 +267,7 @@ public class Java8APISuite implements Serializable {
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
.mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
Assert.assertEquals(pairs, readRDD.collect());
+ Utils.deleteRecursively(tempDir);
}
@Test
diff --git a/graphx/src/test/resources/log4j.properties b/graphx/src/test/resources/log4j.properties
index 85e57f0c4b..26b73a1b39 100644
--- a/graphx/src/test/resources/log4j.properties
+++ b/graphx/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=graphx/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/mllib/src/test/resources/log4j.properties b/mllib/src/test/resources/log4j.properties
index 4265ba6e5d..ddfc4ac6b2 100644
--- a/mllib/src/test/resources/log4j.properties
+++ b/mllib/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=mllib/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 3f64baf6fe..3d05fb6898 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -32,6 +32,7 @@ import com.google.common.io.Files
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils._
+import org.apache.spark.util.Utils
class MLUtilsSuite extends FunSuite with LocalSparkContext {
@@ -67,6 +68,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
|-1 2:4.0 4:5.0 6:6.0
""".stripMargin
val tempDir = Files.createTempDir()
+ tempDir.deleteOnExit()
val file = new File(tempDir.getPath, "part-00000")
Files.write(lines, file, Charsets.US_ASCII)
val path = tempDir.toURI.toString
@@ -90,7 +92,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
assert(multiclassPoints(1).label === -1.0)
assert(multiclassPoints(2).label === -1.0)
- deleteQuietly(tempDir)
+ Utils.deleteRecursively(tempDir)
}
test("saveAsLibSVMFile") {
@@ -107,7 +109,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
.toSet
val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
assert(lines === expected)
- deleteQuietly(tempDir)
+ Utils.deleteRecursively(tempDir)
}
test("appendBias") {
@@ -158,16 +160,5 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
}
}
- /** Delete a file/directory quietly. */
- def deleteQuietly(f: File) {
- if (f.isDirectory) {
- f.listFiles().foreach(deleteQuietly)
- }
- try {
- f.delete()
- } catch {
- case _: Throwable =>
- }
- }
}
diff --git a/pom.xml b/pom.xml
index dd1d262881..5542a32a91 100644
--- a/pom.xml
+++ b/pom.xml
@@ -796,6 +796,21 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>work</directory>
+ </fileset>
+ <fileset>
+ <directory>checkpoint</directory>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
</plugins>
</pluginManagement>
diff --git a/repl/src/test/resources/log4j.properties b/repl/src/test/resources/log4j.properties
index a6d33e69d2..9c4896e496 100644
--- a/repl/src/test/resources/log4j.properties
+++ b/repl/src/test/resources/log4j.properties
@@ -19,7 +19,7 @@
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=repl/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 336df988a1..c0af7ceb6d 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.repl
import java.io.File
-import java.net.URLClassLoader
+import java.net.{URL, URLClassLoader}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
@@ -26,21 +26,35 @@ import org.scalatest.FunSuite
import com.google.common.io.Files
import org.apache.spark.TestUtils
+import org.apache.spark.util.Utils
class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
val childClassNames = List("ReplFakeClass1", "ReplFakeClass2")
val parentClassNames = List("ReplFakeClass1", "ReplFakeClass2", "ReplFakeClass3")
- val tempDir1 = Files.createTempDir()
- val tempDir2 = Files.createTempDir()
- val url1 = "file://" + tempDir1
- val urls2 = List(tempDir2.toURI.toURL).toArray
+ var tempDir1: File = _
+ var tempDir2: File = _
+ var url1: String = _
+ var urls2: Array[URL] = _
override def beforeAll() {
+ super.beforeAll()
+ tempDir1 = Files.createTempDir()
+ tempDir1.deleteOnExit()
+ tempDir2 = Files.createTempDir()
+ tempDir2.deleteOnExit()
+ url1 = "file://" + tempDir1
+ urls2 = List(tempDir2.toURI.toURL).toArray
childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
parentClassNames.foreach(TestUtils.createCompiledClass(_, tempDir2, "2"))
}
+ override def afterAll() {
+ super.afterAll()
+ Utils.deleteRecursively(tempDir1)
+ Utils.deleteRecursively(tempDir2)
+ }
+
test("child first") {
val parentLoader = new URLClassLoader(urls2, null)
val classLoader = new ExecutorClassLoader(url1, parentLoader, true)
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 566d96e16e..95460aa205 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -26,6 +26,7 @@ import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.util.Utils
class ReplSuite extends FunSuite {
@@ -178,6 +179,7 @@ class ReplSuite extends FunSuite {
test("interacting with files") {
val tempDir = Files.createTempDir()
+ tempDir.deleteOnExit()
val out = new FileWriter(tempDir + "/input")
out.write("Hello world!\n")
out.write("What's up?\n")
@@ -196,6 +198,7 @@ class ReplSuite extends FunSuite {
assertContains("res0: Long = 3", output)
assertContains("res1: Long = 3", output)
assertContains("res2: Long = 3", output)
+ Utils.deleteRecursively(tempDir)
}
test("local-cluster mode") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
index 73d87963b3..4f0b85f262 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
@@ -29,6 +29,7 @@ class InsertIntoSuite extends QueryTest {
test("insertInto() created parquet file") {
val testFilePath = File.createTempFile("sparkSql", "pqt")
testFilePath.delete()
+ testFilePath.deleteOnExit()
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
testFile.registerAsTable("createAndInsertTest")
@@ -76,11 +77,14 @@ class InsertIntoSuite extends QueryTest {
sql("SELECT * FROM createAndInsertTest"),
testData.collect().toSeq
)
+
+ testFilePath.delete()
}
test("INSERT INTO parquet table") {
val testFilePath = File.createTempFile("sparkSql", "pqt")
testFilePath.delete()
+ testFilePath.deleteOnExit()
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
testFile.registerAsTable("createAndInsertSQLTest")
@@ -126,23 +130,31 @@ class InsertIntoSuite extends QueryTest {
sql("SELECT * FROM createAndInsertSQLTest"),
testData.collect().toSeq
)
+
+ testFilePath.delete()
}
test("Double create fails when allowExisting = false") {
val testFilePath = File.createTempFile("sparkSql", "pqt")
testFilePath.delete()
+ testFilePath.deleteOnExit()
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
intercept[RuntimeException] {
createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false)
}
+
+ testFilePath.delete()
}
test("Double create does not fail when allowExisting = true") {
val testFilePath = File.createTempFile("sparkSql", "pqt")
testFilePath.delete()
+ testFilePath.deleteOnExit()
val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true)
+
+ testFilePath.delete()
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 3ad66a3d7f..fa7d010459 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -99,6 +99,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
val hiveFilesTemp = File.createTempFile("catalystHiveFiles", "")
hiveFilesTemp.delete()
hiveFilesTemp.mkdir()
+ hiveFilesTemp.deleteOnExit()
val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index c48a38590e..b3ed302db6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -21,6 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.util.Utils
import StreamingContext._
import scala.util.Random
@@ -380,6 +381,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
override def run() {
val localTestDir = Files.createTempDir()
+ localTestDir.deleteOnExit()
var fs = testDir.getFileSystem(new Configuration())
val maxTries = 3
try {
@@ -421,6 +423,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
case e: Exception => logWarning("File generating in killing thread", e)
} finally {
fs.close()
+ Utils.deleteRecursively(localTestDir)
}
}
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index f9bfb9b744..ce58cb12a4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.streaming;
-import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.junit.Assert;
@@ -37,6 +36,8 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.util.Utils;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -1606,6 +1607,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(8,7));
File tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
@@ -1627,6 +1629,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
+ Utils.deleteRecursively(tempDir);
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index 063529a9cb..45d2ec676d 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootCategory=INFO, file
# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
-log4j.appender.file.file=streaming/target/unit-tests.log
+log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 25739956cb..d20a7b728c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -232,6 +232,7 @@ class CheckpointSuite extends TestSuiteBase {
test("recovery with file input stream") {
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
+ testDir.deleteOnExit()
var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
@@ -326,6 +327,7 @@ class CheckpointSuite extends TestSuiteBase {
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
+ Utils.deleteRecursively(testDir)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 3fa254065c..cd0aa4d0dc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -98,6 +98,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val testDir = Files.createTempDir()
+ testDir.deleteOnExit()
val ssc = new StreamingContext(conf, batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]