diff options
author | Sean Owen <sowen@cloudera.com> | 2014-10-09 18:21:59 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-10-09 18:21:59 -0700 |
commit | 363baacaded56047bcc63276d729ab911e0336cf (patch) | |
tree | c6114d94f6b6f04c386df0414f17d3a0e6c87a33 /core | |
parent | 2837bf8548db7e9d43f6eefedf5a73feb22daedb (diff) | |
download | spark-363baacaded56047bcc63276d729ab911e0336cf.tar.gz spark-363baacaded56047bcc63276d729ab911e0336cf.tar.bz2 spark-363baacaded56047bcc63276d729ab911e0336cf.zip |
SPARK-3811 [CORE] More robust / standard Utils.deleteRecursively, Utils.createTempDir
I noticed a few issues with how temp directories are created and deleted:
*Minor*
* Guava's `Files.createTempDir()` plus `File.deleteOnExit()` is used in many tests to make a temp dir, but `Utils.createTempDir()` seems to be the standard Spark mechanism
* Call to `File.deleteOnExit()` could be pushed into `Utils.createTempDir()` as well, along with this replacement
* _I messed up the message in an exception in `Utils` in SPARK-3794; fixed here_
*Bit Less Minor*
* `Utils.deleteRecursively()` fails immediately if any `IOException` occurs, instead of trying to delete any remaining files and subdirectories. I've observed this leave temp dirs around. I suggest changing it to continue in the face of an exception and throw one of the possibly several exceptions that occur at the end.
* `Utils.createTempDir()` will add a JVM shutdown hook every time the method is called. Even if the subdir is the parent of another parent dir, since this check is inside the hook. However `Utils` manages a set of all dirs to delete on shutdown already, called `shutdownDeletePaths`. A single hook can be registered to delete all of these on exit. This is how Tachyon temp paths are cleaned up in `TachyonBlockManager`.
I noticed a few other things that might be changed but wanted to ask first:
* Shouldn't the set of dirs to delete be `File`, not just `String` paths?
* `Utils` manages the set of `TachyonFile` that have been registered for deletion, but the shutdown hook is managed in `TachyonBlockManager`. Should this logic not live together, and not in `Utils`? it's more specific to Tachyon, and looks a slight bit odd to import in such a generic place.
Author: Sean Owen <sowen@cloudera.com>
Closes #2670 from srowen/SPARK-3811 and squashes the following commits:
071ae60 [Sean Owen] Update per @vanzin's review
da0146d [Sean Owen] Make Utils.deleteRecursively try to delete all paths even when an exception occurs; use one shutdown hook instead of one per method call to delete temp dirs
3a0faa4 [Sean Owen] Standardize on Utils.createTempDir instead of Files.createTempDir
Diffstat (limited to 'core')
12 files changed, 89 insertions, 65 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 8ca731038e..e72826dc25 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -26,6 +26,8 @@ import scala.collection.JavaConversions._ import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} import com.google.common.io.Files +import org.apache.spark.util.Utils + /** * Utilities for tests. Included in main codebase since it's used by multiple * projects. @@ -42,8 +44,7 @@ private[spark] object TestUtils { * in order to avoid interference between tests. */ def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = { - val tempDir = Files.createTempDir() - tempDir.deleteOnExit() + val tempDir = Utils.createTempDir() val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value) val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis())) createJar(files, jarFile) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3d307b3c16..07477dd460 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -168,6 +168,20 @@ private[spark] object Utils extends Logging { private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() + // Add a shutdown hook to delete the temp dirs when the JVM exits + Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") { + override def run(): Unit = Utils.logUncaughtExceptions { + logDebug("Shutdown hook called") + shutdownDeletePaths.foreach { dirPath => + try { + Utils.deleteRecursively(new File(dirPath)) + } catch { + case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) + } + } + } + }) + // Register the path to be deleted via shutdown hook def registerShutdownDeleteDir(file: File) { val absolutePath = file.getAbsolutePath() @@ -252,14 +266,6 @@ private[spark] object Utils extends Logging { } registerShutdownDeleteDir(dir) - - // Add a shutdown hook to delete the temp dir when the JVM exits - Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) { - override def run() { - // Attempt to delete if some patch which is parent of this is not already registered. - if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir) - } - }) dir } @@ -666,15 +672,30 @@ private[spark] object Utils extends Logging { */ def deleteRecursively(file: File) { if (file != null) { - if (file.isDirectory() && !isSymlink(file)) { - for (child <- listFilesSafely(file)) { - deleteRecursively(child) + try { + if (file.isDirectory && !isSymlink(file)) { + var savedIOException: IOException = null + for (child <- listFilesSafely(file)) { + try { + deleteRecursively(child) + } catch { + // In case of multiple exceptions, only last one will be thrown + case ioe: IOException => savedIOException = ioe + } + } + if (savedIOException != null) { + throw savedIOException + } + shutdownDeletePaths.synchronized { + shutdownDeletePaths.remove(file.getAbsolutePath) + } } - } - if (!file.delete()) { - // Delete can also fail if the file simply did not exist - if (file.exists()) { - throw new IOException("Failed to delete: " + file.getAbsolutePath) + } finally { + if (!file.delete()) { + // Delete can also fail if the file simply did not exist + if (file.exists()) { + throw new IOException("Failed to delete: " + file.getAbsolutePath) + } } } } @@ -713,7 +734,7 @@ private[spark] object Utils extends Logging { */ def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = { if (!dir.isDirectory) { - throw new IllegalArgumentException("$dir is not a directory!") + throw new IllegalArgumentException(s"$dir is not a directory!") } val filesAndDirs = dir.listFiles() val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000) diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index 7e18f45de7..a8867020e4 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark import java.io._ import java.util.jar.{JarEntry, JarOutputStream} -import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.SparkContext._ @@ -41,8 +40,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { override def beforeAll() { super.beforeAll() - tmpDir = Files.createTempDir() - tmpDir.deleteOnExit() + tmpDir = Utils.createTempDir() val testTempDir = new File(tmpDir, "test") testTempDir.mkdir() diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 4a53d25012..a2b74c4419 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -21,7 +21,6 @@ import java.io.{File, FileWriter} import scala.io.Source -import com.google.common.io.Files import org.apache.hadoop.io._ import org.apache.hadoop.io.compress.DefaultCodec import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat} @@ -39,8 +38,7 @@ class FileSuite extends FunSuite with LocalSparkContext { override def beforeEach() { super.beforeEach() - tempDir = Files.createTempDir() - tempDir.deleteOnExit() + tempDir = Utils.createTempDir() } override def afterEach() { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 4cba90e8f2..1cdf50d5c0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.deploy.SparkSubmit._ import org.apache.spark.util.Utils import org.scalatest.FunSuite import org.scalatest.Matchers -import com.google.common.io.Files class SparkSubmitSuite extends FunSuite with Matchers { def beforeAll() { @@ -332,7 +331,7 @@ class SparkSubmitSuite extends FunSuite with Matchers { } def forConfDir(defaults: Map[String, String]) (f: String => Unit) = { - val tmpDir = Files.createTempDir() + val tmpDir = Utils.createTempDir() val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf") val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf)) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index d5ebfb3f3f..12d1c7b2fa 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -23,8 +23,6 @@ import java.io.FileOutputStream import scala.collection.immutable.IndexedSeq -import com.google.common.io.Files - import org.scalatest.BeforeAndAfterAll import org.scalatest.FunSuite @@ -66,9 +64,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { * 3) Does the contents be the same. */ test("Correctness of WholeTextFileRecordReader.") { - - val dir = Files.createTempDir() - dir.deleteOnExit() + val dir = Utils.createTempDir() println(s"Local disk address is ${dir.toString}.") WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) => diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 75b0119190..3620e251cc 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -24,13 +24,14 @@ import org.apache.hadoop.util.Progressable import scala.collection.mutable.{ArrayBuffer, HashSet} import scala.util.Random -import com.google.common.io.Files import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext} import org.apache.spark.{Partitioner, SharedSparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.util.Utils + import org.scalatest.FunSuite class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { @@ -381,14 +382,16 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { } test("zero-partition RDD") { - val emptyDir = Files.createTempDir() - emptyDir.deleteOnExit() - val file = sc.textFile(emptyDir.getAbsolutePath) - assert(file.partitions.size == 0) - assert(file.collect().toList === Nil) - // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) - emptyDir.delete() + val emptyDir = Utils.createTempDir() + try { + val file = sc.textFile(emptyDir.getAbsolutePath) + assert(file.partitions.isEmpty) + assert(file.collect().toList === Nil) + // Test that a shuffle on the file works, because this used to be a bug + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + } finally { + Utils.deleteRecursively(emptyDir) + } } test("keys and values") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 3efa854318..abc300fcff 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import scala.collection.mutable import scala.io.Source -import com.google.common.io.Files import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} @@ -51,8 +50,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { private var logDirPath: Path = _ before { - testDir = Files.createTempDir() - testDir.deleteOnExit() + testDir = Utils.createTempDir() logDirPath = Utils.getFilePath(testDir, "spark-events") } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 48114feee6..e05f373392 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.scheduler import java.io.{File, PrintWriter} -import com.google.common.io.Files import org.json4s.jackson.JsonMethods._ import org.scalatest.{BeforeAndAfter, FunSuite} @@ -39,8 +38,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { private var testDir: File = _ before { - testDir = Files.createTempDir() - testDir.deleteOnExit() + testDir = Utils.createTempDir() } after { diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index e4522e00a6..bc5c74c126 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -19,22 +19,13 @@ package org.apache.spark.storage import java.io.{File, FileWriter} -import org.apache.spark.network.nio.NioBlockTransferService -import org.apache.spark.shuffle.hash.HashShuffleManager - -import scala.collection.mutable import scala.language.reflectiveCalls -import akka.actor.Props -import com.google.common.io.Files import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} import org.apache.spark.SparkConf -import org.apache.spark.scheduler.LiveListenerBus -import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.util.{AkkaUtils, Utils} -import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.util.Utils class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll { private val testConf = new SparkConf(false) @@ -48,10 +39,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before override def beforeAll() { super.beforeAll() - rootDir0 = Files.createTempDir() - rootDir0.deleteOnExit() - rootDir1 = Files.createTempDir() - rootDir1.deleteOnExit() + rootDir0 = Utils.createTempDir() + rootDir1 = Utils.createTempDir() rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath } diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala index c3dd156b40..dc2a05631d 100644 --- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala @@ -21,7 +21,6 @@ import java.io.{File, IOException} import scala.io.Source -import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.scalatest.{BeforeAndAfter, FunSuite} @@ -44,7 +43,7 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter { private var logDirPathString: String = _ before { - testDir = Files.createTempDir() + testDir = Utils.createTempDir() logDirPath = Utils.getFilePath(testDir, "test-file-logger") logDirPathString = logDirPath.toString } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index e63d9d085e..0344da60da 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -112,7 +112,7 @@ class UtilsSuite extends FunSuite { } test("reading offset bytes of a file") { - val tmpDir2 = Files.createTempDir() + val tmpDir2 = Utils.createTempDir() tmpDir2.deleteOnExit() val f1Path = tmpDir2 + "/f1" val f1 = new FileOutputStream(f1Path) @@ -141,7 +141,7 @@ class UtilsSuite extends FunSuite { } test("reading offset bytes across multiple files") { - val tmpDir = Files.createTempDir() + val tmpDir = Utils.createTempDir() tmpDir.deleteOnExit() val files = (1 to 3).map(i => new File(tmpDir, i.toString)) Files.write("0123456789", files(0), Charsets.UTF_8) @@ -308,4 +308,28 @@ class UtilsSuite extends FunSuite { } } + test("deleteRecursively") { + val tempDir1 = Utils.createTempDir() + assert(tempDir1.exists()) + Utils.deleteRecursively(tempDir1) + assert(!tempDir1.exists()) + + val tempDir2 = Utils.createTempDir() + val tempFile1 = new File(tempDir2, "foo.txt") + Files.touch(tempFile1) + assert(tempFile1.exists()) + Utils.deleteRecursively(tempFile1) + assert(!tempFile1.exists()) + + val tempDir3 = new File(tempDir2, "subdir") + assert(tempDir3.mkdir()) + val tempFile2 = new File(tempDir3, "bar.txt") + Files.touch(tempFile2) + assert(tempFile2.exists()) + Utils.deleteRecursively(tempDir2) + assert(!tempDir2.exists()) + assert(!tempDir3.exists()) + assert(!tempFile2.exists()) + } + } |