diff options
author | Sean Owen <sowen@cloudera.com> | 2014-05-12 14:16:19 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-05-12 14:16:19 -0700 |
commit | 7120a2979d0a9f0f54a88b2416be7ca10e74f409 (patch) | |
tree | d3db2f178f003fc79cee2ec3fe60508e56f29f8d /core | |
parent | 1e4a65e69489ff877e6da6f78b1c1306335e373c (diff) | |
download | spark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.tar.gz spark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.tar.bz2 spark-7120a2979d0a9f0f54a88b2416be7ca10e74f409.zip |
SPARK-1798. Tests should clean up temp files
Three issues related to temp files that tests generate – these should be touched up for hygiene but are not urgent.
Modules have a log4j.properties which directs the unit-test.log output file to a directory like `[module]/target/unit-test.log`. But this ends up creating `[module]/[module]/target/unit-test.log` instead of former.
The `work/` directory is not deleted by "mvn clean", in the parent and in modules. Neither is the `checkpoint/` directory created under the various external modules.
Many tests create a temp directory, which is not usually deleted. This can be largely resolved by calling `deleteOnExit()` at creation and trying to call `Utils.deleteRecursively` consistently to clean up, sometimes in an `@After` method.
_If anyone seconds the motion, I can create a more significant change that introduces a new test trait along the lines of `LocalSparkContext`, which provides management of temp directories for subclasses to take advantage of._
Author: Sean Owen <sowen@cloudera.com>
Closes #732 from srowen/SPARK-1798 and squashes the following commits:
5af578e [Sean Owen] Try to consistently delete test temp dirs and files, and set deleteOnExit() for each
b21b356 [Sean Owen] Remove work/ and checkpoint/ dirs with mvn clean
bdd0f41 [Sean Owen] Remove duplicate module dir in log4j.properties output path for tests
Diffstat (limited to 'core')
14 files changed, 116 insertions, 85 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 8ae0215482..885c6829a2 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -43,6 +43,7 @@ private[spark] object TestUtils { */ def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = { val tempDir = Files.createTempDir() + tempDir.deleteOnExit() val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value) val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis())) createJar(files, jarFile) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8f7594ada2..0631e54237 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -586,15 +586,17 @@ private[spark] object Utils extends Logging { * Don't follow directories if they are symlinks. */ def deleteRecursively(file: File) { - if ((file.isDirectory) && !isSymlink(file)) { - for (child <- listFilesSafely(file)) { - deleteRecursively(child) + if (file != null) { + if ((file.isDirectory) && !isSymlink(file)) { + for (child <- listFilesSafely(file)) { + deleteRecursively(child) + } } - } - if (!file.delete()) { - // Delete can also fail if the file simply did not exist - if (file.exists()) { - throw new IOException("Failed to delete: " + file.getAbsolutePath) + if (!file.delete()) { + // Delete can also fail if the file simply did not exist + if (file.exists()) { + throw new IOException("Failed to delete: " + file.getAbsolutePath) + } } } } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 1912015827..3dd79243ab 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -18,7 +18,6 @@ package org.apache.spark; import java.io.*; -import java.lang.StringBuilder; import java.util.*; import scala.Tuple2; @@ -49,16 +48,20 @@ import org.apache.spark.partial.BoundedDouble; import org.apache.spark.partial.PartialResult; import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.StatCounter; +import org.apache.spark.util.Utils; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite implements Serializable { private transient JavaSparkContext sc; + private transient File tempDir; @Before public void setUp() { sc = new JavaSparkContext("local", "JavaAPISuite"); + tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); } @After @@ -67,6 +70,7 @@ public class JavaAPISuite implements Serializable { sc = null; // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port"); + Utils.deleteRecursively(tempDir); } static class ReverseIntComparator implements Comparator<Integer>, Serializable { @@ -611,7 +615,6 @@ public class JavaAPISuite implements Serializable { @Test public void textFiles() throws IOException { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); rdd.saveAsTextFile(outputDir); @@ -630,7 +633,6 @@ public class JavaAPISuite implements Serializable { byte[] content1 = "spark is easy to use.\n".getBytes("utf-8"); byte[] content2 = "spark is also easy to use.\n".getBytes("utf-8"); - File tempDir = Files.createTempDir(); String tempDirName = tempDir.getAbsolutePath(); DataOutputStream ds = new DataOutputStream(new FileOutputStream(tempDirName + "/part-00000")); ds.write(content1); @@ -653,7 +655,6 @@ public class JavaAPISuite implements Serializable { @Test public void textFilesCompressed() throws IOException { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); rdd.saveAsTextFile(outputDir, DefaultCodec.class); @@ -667,7 +668,6 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") @Test public void sequenceFile() { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( new Tuple2<Integer, String>(1, "a"), @@ -697,7 +697,6 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") @Test public void writeWithNewAPIHadoopFile() { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( new Tuple2<Integer, String>(1, "a"), @@ -728,7 +727,6 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") @Test public void readWithNewAPIHadoopFile() throws IOException { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( new Tuple2<Integer, String>(1, "a"), @@ -758,7 +756,6 @@ public class JavaAPISuite implements Serializable { @Test public void objectFilesOfInts() { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); rdd.saveAsObjectFile(outputDir); @@ -771,7 +768,6 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") @Test public void objectFilesOfComplexTypes() { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( new Tuple2<Integer, String>(1, "a"), @@ -788,7 +784,6 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") @Test public void hadoopFile() { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( new Tuple2<Integer, String>(1, "a"), @@ -818,7 +813,6 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") @Test public void hadoopFileCompressed() { - File tempDir = Files.createTempDir(); String outputDir = new File(tempDir, "output_compressed").getAbsolutePath(); List<Tuple2<Integer, String>> pairs = Arrays.asList( new Tuple2<Integer, String>(1, "a"), @@ -948,7 +942,6 @@ public class JavaAPISuite implements Serializable { @Test public void checkpointAndComputation() { - File tempDir = Files.createTempDir(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); @@ -960,7 +953,6 @@ public class JavaAPISuite implements Serializable { @Test public void checkpointAndRestore() { - File tempDir = Files.createTempDir(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index f6fef03689..26b73a1b39 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -19,7 +19,7 @@ log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=false -log4j.appender.file.file=core/target/unit-tests.log +log4j.appender.file.file=target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index d2555b7c05..64933f4b10 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -35,6 +35,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { override def beforeEach() { super.beforeEach() checkpointDir = File.createTempFile("temp", "") + checkpointDir.deleteOnExit() checkpointDir.delete() sc = new SparkContext("local", "test") sc.setCheckpointDir(checkpointDir.toString) @@ -42,9 +43,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { override def afterEach() { super.afterEach() - if (checkpointDir != null) { - checkpointDir.delete() - } + Utils.deleteRecursively(checkpointDir) } test("basic checkpointing") { diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index d651fbbac4..7e18f45de7 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -24,9 +24,11 @@ import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.SparkContext._ +import org.apache.spark.util.Utils class FileServerSuite extends FunSuite with LocalSparkContext { + @transient var tmpDir: File = _ @transient var tmpFile: File = _ @transient var tmpJarUrl: String = _ @@ -38,15 +40,18 @@ class FileServerSuite extends FunSuite with LocalSparkContext { override def beforeAll() { super.beforeAll() - val tmpDir = new File(Files.createTempDir(), "test") - tmpDir.mkdir() - val textFile = new File(tmpDir, "FileServerSuite.txt") + tmpDir = Files.createTempDir() + tmpDir.deleteOnExit() + val testTempDir = new File(tmpDir, "test") + testTempDir.mkdir() + + val textFile = new File(testTempDir, "FileServerSuite.txt") val pw = new PrintWriter(textFile) pw.println("100") pw.close() - val jarFile = new File(tmpDir, "test.jar") + val jarFile = new File(testTempDir, "test.jar") val jarStream = new FileOutputStream(jarFile) val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) System.setProperty("spark.authenticate", "false") @@ -70,6 +75,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext { tmpJarUrl = jarFile.toURI.toURL.toString } + override def afterAll() { + super.afterAll() + Utils.deleteRecursively(tmpDir) + } + test("Distributing files locally") { sc = new SparkContext("local[4]", "test") sc.addFile(tmpFile.toString) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index b9b668d3cc..1f2206b1f0 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -30,12 +30,24 @@ import org.apache.hadoop.mapreduce.Job import org.scalatest.FunSuite import org.apache.spark.SparkContext._ +import org.apache.spark.util.Utils class FileSuite extends FunSuite with LocalSparkContext { + var tempDir: File = _ + + override def beforeEach() { + super.beforeEach() + tempDir = Files.createTempDir() + tempDir.deleteOnExit() + } + + override def afterEach() { + super.afterEach() + Utils.deleteRecursively(tempDir) + } test("text files") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 4) nums.saveAsTextFile(outputDir) @@ -49,7 +61,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("text files (compressed)") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val normalDir = new File(tempDir, "output_normal").getAbsolutePath val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath val codec = new DefaultCodec() @@ -71,7 +82,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("SequenceFiles") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa) nums.saveAsSequenceFile(outputDir) @@ -82,7 +92,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("SequenceFile (compressed)") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val normalDir = new File(tempDir, "output_normal").getAbsolutePath val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath val codec = new DefaultCodec() @@ -104,7 +113,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("SequenceFile with writable key") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) nums.saveAsSequenceFile(outputDir) @@ -115,7 +123,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("SequenceFile with writable value") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x))) nums.saveAsSequenceFile(outputDir) @@ -126,7 +133,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("SequenceFile with writable key and value") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) nums.saveAsSequenceFile(outputDir) @@ -137,7 +143,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("implicit conversions in reading SequenceFiles") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa) nums.saveAsSequenceFile(outputDir) @@ -154,7 +159,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("object files of ints") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 4) nums.saveAsObjectFile(outputDir) @@ -165,7 +169,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("object files of complex types") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) nums.saveAsObjectFile(outputDir) @@ -177,7 +180,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("write SequenceFile using new Hadoop API") { import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) nums.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, Text]]( @@ -189,7 +191,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("read SequenceFile using new Hadoop API") { import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val outputDir = new File(tempDir, "output").getAbsolutePath val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) nums.saveAsSequenceFile(outputDir) @@ -200,7 +201,6 @@ class FileSuite extends FunSuite with LocalSparkContext { test("file caching") { sc = new SparkContext("local", "test") - val tempDir = Files.createTempDir() val out = new FileWriter(tempDir + "/input") out.write("Hello world!\n") out.write("What's up?\n") @@ -214,67 +214,61 @@ class FileSuite extends FunSuite with LocalSparkContext { test ("prevent user from overwriting the empty directory (old Hadoop API)") { sc = new SparkContext("local", "test") - val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) intercept[FileAlreadyExistsException] { - randomRDD.saveAsTextFile(tempdir.getPath) + randomRDD.saveAsTextFile(tempDir.getPath) } } test ("prevent user from overwriting the non-empty directory (old Hadoop API)") { sc = new SparkContext("local", "test") - val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) - randomRDD.saveAsTextFile(tempdir.getPath + "/output") - assert(new File(tempdir.getPath + "/output/part-00000").exists() === true) + randomRDD.saveAsTextFile(tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-00000").exists() === true) intercept[FileAlreadyExistsException] { - randomRDD.saveAsTextFile(tempdir.getPath + "/output") + randomRDD.saveAsTextFile(tempDir.getPath + "/output") } } test ("prevent user from overwriting the empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") - val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath) } } test ("prevent user from overwriting the non-empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") - val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) - randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output") - assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath + "/output") + assert(new File(tempDir.getPath + "/output/part-r-00000").exists() === true) intercept[FileAlreadyExistsException] { - randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath) + randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath) } } test ("save Hadoop Dataset through old Hadoop API") { sc = new SparkContext("local", "test") - val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) val job = new JobConf() job.setOutputKeyClass(classOf[String]) job.setOutputValueClass(classOf[String]) job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName) - job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old") + job.set("mapred.output.dir", tempDir.getPath + "/outputDataset_old") randomRDD.saveAsHadoopDataset(job) - assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true) + assert(new File(tempDir.getPath + "/outputDataset_old/part-00000").exists() === true) } test ("save Hadoop Dataset through new Hadoop API") { sc = new SparkContext("local", "test") - val tempdir = Files.createTempDir() val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) val job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[String]) job.setOutputValueClass(classOf[String]) job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]]) - job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new") + job.getConfiguration.set("mapred.output.dir", tempDir.getPath + "/outputDataset_new") randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration) - assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true) + assert(new File(tempDir.getPath + "/outputDataset_new/part-r-00000").exists() === true) } } diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index 33d6de9a76..d5ebfb3f3f 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.FunSuite import org.apache.hadoop.io.Text import org.apache.spark.SparkContext +import org.apache.spark.util.Utils /** * Tests the correctness of @@ -67,6 +68,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { test("Correctness of WholeTextFileRecordReader.") { val dir = Files.createTempDir() + dir.deleteOnExit() println(s"Local disk address is ${dir.toString}.") WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) => @@ -86,7 +88,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { s"file $filename contents can not match.") } - dir.delete() + Utils.deleteRecursively(dir) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 8f3e6bd21b..1230565ea5 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -236,11 +236,13 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { test("zero-partition RDD") { val emptyDir = Files.createTempDir() + emptyDir.deleteOnExit() val file = sc.textFile(emptyDir.getAbsolutePath) assert(file.partitions.size == 0) assert(file.collect().toList === Nil) // Test that a shuffle on the file works, because this used to be a bug assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + emptyDir.delete() } test("keys and values") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 95f5bcd855..21e3db34b8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler import scala.collection.mutable import scala.io.Source -import scala.util.Try import com.google.common.io.Files import org.apache.hadoop.fs.{FileStatus, Path} @@ -30,6 +29,8 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} +import java.io.File + /** * Test whether EventLoggingListener logs events properly. * @@ -43,11 +44,17 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) - private val testDir = Files.createTempDir() - private val logDirPath = Utils.getFilePath(testDir, "spark-events") + private var testDir: File = _ + private var logDirPath: Path = _ + + before { + testDir = Files.createTempDir() + testDir.deleteOnExit() + logDirPath = Utils.getFilePath(testDir, "spark-events") + } after { - Try { fileSystem.delete(logDirPath, true) } + Utils.deleteRecursively(testDir) } test("Parse names of special files") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d1fe1fc348..d81499ac6a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -17,9 +17,7 @@ package org.apache.spark.scheduler -import java.io.PrintWriter - -import scala.util.Try +import java.io.{File, PrintWriter} import com.google.common.io.Files import org.json4s.jackson.JsonMethods._ @@ -39,11 +37,15 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) - private val testDir = Files.createTempDir() + private var testDir: File = _ + + before { + testDir = Files.createTempDir() + testDir.deleteOnExit() + } after { - Try { fileSystem.delete(Utils.getFilePath(testDir, "events.txt"), true) } - Try { fileSystem.delete(Utils.getFilePath(testDir, "test-replay"), true) } + Utils.deleteRecursively(testDir) } test("Simple replay") { diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 42bfbf1bdf..2167718fd2 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -23,18 +23,16 @@ import scala.collection.mutable import scala.language.reflectiveCalls import com.google.common.io.Files -import org.scalatest.{BeforeAndAfterEach, FunSuite} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.SparkConf +import org.apache.spark.util.Utils -class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { +class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll { private val testConf = new SparkConf(false) - val rootDir0 = Files.createTempDir() - rootDir0.deleteOnExit() - val rootDir1 = Files.createTempDir() - rootDir1.deleteOnExit() - val rootDirs = rootDir0.getName + "," + rootDir1.getName - println("Created root dirs: " + rootDirs) + private var rootDir0: File = _ + private var rootDir1: File = _ + private var rootDirs: String = _ // This suite focuses primarily on consolidation features, // so we coerce consolidation if not already enabled. @@ -48,6 +46,22 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { var diskBlockManager: DiskBlockManager = _ + override def beforeAll() { + super.beforeAll() + rootDir0 = Files.createTempDir() + rootDir0.deleteOnExit() + rootDir1 = Files.createTempDir() + rootDir1.deleteOnExit() + rootDirs = rootDir0.getName + "," + rootDir1.getName + println("Created root dirs: " + rootDirs) + } + + override def afterAll() { + super.afterAll() + Utils.deleteRecursively(rootDir0) + Utils.deleteRecursively(rootDir1) + } + override def beforeEach() { diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs) shuffleBlockManager.idToSegmentMap.clear() diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index f675e1e5b4..44332fc8db 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -17,10 +17,9 @@ package org.apache.spark.util -import java.io.IOException +import java.io.{File, IOException} import scala.io.Source -import scala.util.Try import com.google.common.io.Files import org.apache.hadoop.fs.Path @@ -38,12 +37,18 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { "org.apache.spark.io.LZFCompressionCodec", "org.apache.spark.io.SnappyCompressionCodec" ) - private val testDir = Files.createTempDir() - private val logDirPath = Utils.getFilePath(testDir, "test-file-logger") - private val logDirPathString = logDirPath.toString + private var testDir: File = _ + private var logDirPath: Path = _ + private var logDirPathString: String = _ + + before { + testDir = Files.createTempDir() + logDirPath = Utils.getFilePath(testDir, "test-file-logger") + logDirPathString = logDirPath.toString + } after { - Try { fileSystem.delete(logDirPath, true) } + Utils.deleteRecursively(testDir) } test("Simple logging") { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index eb7fb63182..cf9e20d347 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -112,6 +112,7 @@ class UtilsSuite extends FunSuite { test("reading offset bytes of a file") { val tmpDir2 = Files.createTempDir() + tmpDir2.deleteOnExit() val f1Path = tmpDir2 + "/f1" val f1 = new FileOutputStream(f1Path) f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8)) |