aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-10-09 18:21:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-09 18:21:59 -0700
commit363baacaded56047bcc63276d729ab911e0336cf (patch)
treec6114d94f6b6f04c386df0414f17d3a0e6c87a33 /core
parent2837bf8548db7e9d43f6eefedf5a73feb22daedb (diff)
downloadspark-363baacaded56047bcc63276d729ab911e0336cf.tar.gz
spark-363baacaded56047bcc63276d729ab911e0336cf.tar.bz2
spark-363baacaded56047bcc63276d729ab911e0336cf.zip
SPARK-3811 [CORE] More robust / standard Utils.deleteRecursively, Utils.createTempDir
I noticed a few issues with how temp directories are created and deleted: *Minor* * Guava's `Files.createTempDir()` plus `File.deleteOnExit()` is used in many tests to make a temp dir, but `Utils.createTempDir()` seems to be the standard Spark mechanism * Call to `File.deleteOnExit()` could be pushed into `Utils.createTempDir()` as well, along with this replacement * _I messed up the message in an exception in `Utils` in SPARK-3794; fixed here_ *Bit Less Minor* * `Utils.deleteRecursively()` fails immediately if any `IOException` occurs, instead of trying to delete any remaining files and subdirectories. I've observed this leave temp dirs around. I suggest changing it to continue in the face of an exception and throw one of the possibly several exceptions that occur at the end. * `Utils.createTempDir()` will add a JVM shutdown hook every time the method is called. Even if the subdir is the parent of another parent dir, since this check is inside the hook. However `Utils` manages a set of all dirs to delete on shutdown already, called `shutdownDeletePaths`. A single hook can be registered to delete all of these on exit. This is how Tachyon temp paths are cleaned up in `TachyonBlockManager`. I noticed a few other things that might be changed but wanted to ask first: * Shouldn't the set of dirs to delete be `File`, not just `String` paths? * `Utils` manages the set of `TachyonFile` that have been registered for deletion, but the shutdown hook is managed in `TachyonBlockManager`. Should this logic not live together, and not in `Utils`? it's more specific to Tachyon, and looks a slight bit odd to import in such a generic place. Author: Sean Owen <sowen@cloudera.com> Closes #2670 from srowen/SPARK-3811 and squashes the following commits: 071ae60 [Sean Owen] Update per @vanzin's review da0146d [Sean Owen] Make Utils.deleteRecursively try to delete all paths even when an exception occurs; use one shutdown hook instead of one per method call to delete temp dirs 3a0faa4 [Sean Owen] Standardize on Utils.createTempDir instead of Files.createTempDir
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala55
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala28
12 files changed, 89 insertions, 65 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 8ca731038e..e72826dc25 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
import com.google.common.io.Files
+import org.apache.spark.util.Utils
+
/**
* Utilities for tests. Included in main codebase since it's used by multiple
* projects.
@@ -42,8 +44,7 @@ private[spark] object TestUtils {
* in order to avoid interference between tests.
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
- val tempDir = Files.createTempDir()
- tempDir.deleteOnExit()
+ val tempDir = Utils.createTempDir()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 3d307b3c16..07477dd460 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -168,6 +168,20 @@ private[spark] object Utils extends Logging {
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
+ // Add a shutdown hook to delete the temp dirs when the JVM exits
+ Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ logDebug("Shutdown hook called")
+ shutdownDeletePaths.foreach { dirPath =>
+ try {
+ Utils.deleteRecursively(new File(dirPath))
+ } catch {
+ case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
+ }
+ }
+ }
+ })
+
// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
val absolutePath = file.getAbsolutePath()
@@ -252,14 +266,6 @@ private[spark] object Utils extends Logging {
}
registerShutdownDeleteDir(dir)
-
- // Add a shutdown hook to delete the temp dir when the JVM exits
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
- override def run() {
- // Attempt to delete if some patch which is parent of this is not already registered.
- if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
- }
- })
dir
}
@@ -666,15 +672,30 @@ private[spark] object Utils extends Logging {
*/
def deleteRecursively(file: File) {
if (file != null) {
- if (file.isDirectory() && !isSymlink(file)) {
- for (child <- listFilesSafely(file)) {
- deleteRecursively(child)
+ try {
+ if (file.isDirectory && !isSymlink(file)) {
+ var savedIOException: IOException = null
+ for (child <- listFilesSafely(file)) {
+ try {
+ deleteRecursively(child)
+ } catch {
+ // In case of multiple exceptions, only last one will be thrown
+ case ioe: IOException => savedIOException = ioe
+ }
+ }
+ if (savedIOException != null) {
+ throw savedIOException
+ }
+ shutdownDeletePaths.synchronized {
+ shutdownDeletePaths.remove(file.getAbsolutePath)
+ }
}
- }
- if (!file.delete()) {
- // Delete can also fail if the file simply did not exist
- if (file.exists()) {
- throw new IOException("Failed to delete: " + file.getAbsolutePath)
+ } finally {
+ if (!file.delete()) {
+ // Delete can also fail if the file simply did not exist
+ if (file.exists()) {
+ throw new IOException("Failed to delete: " + file.getAbsolutePath)
+ }
}
}
}
@@ -713,7 +734,7 @@ private[spark] object Utils extends Logging {
*/
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
if (!dir.isDirectory) {
- throw new IllegalArgumentException("$dir is not a directory!")
+ throw new IllegalArgumentException(s"$dir is not a directory!")
}
val filesAndDirs = dir.listFiles()
val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 7e18f45de7..a8867020e4 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark
import java.io._
import java.util.jar.{JarEntry, JarOutputStream}
-import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
@@ -41,8 +40,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
override def beforeAll() {
super.beforeAll()
- tmpDir = Files.createTempDir()
- tmpDir.deleteOnExit()
+ tmpDir = Utils.createTempDir()
val testTempDir = new File(tmpDir, "test")
testTempDir.mkdir()
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 4a53d25012..a2b74c4419 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, FileWriter}
import scala.io.Source
-import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
@@ -39,8 +38,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
override def beforeEach() {
super.beforeEach()
- tempDir = Files.createTempDir()
- tempDir.deleteOnExit()
+ tempDir = Utils.createTempDir()
}
override def afterEach() {
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4cba90e8f2..1cdf50d5c0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.util.Utils
import org.scalatest.FunSuite
import org.scalatest.Matchers
-import com.google.common.io.Files
class SparkSubmitSuite extends FunSuite with Matchers {
def beforeAll() {
@@ -332,7 +331,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
}
def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
- val tmpDir = Files.createTempDir()
+ val tmpDir = Utils.createTempDir()
val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf))
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index d5ebfb3f3f..12d1c7b2fa 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -23,8 +23,6 @@ import java.io.FileOutputStream
import scala.collection.immutable.IndexedSeq
-import com.google.common.io.Files
-
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
@@ -66,9 +64,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
* 3) Does the contents be the same.
*/
test("Correctness of WholeTextFileRecordReader.") {
-
- val dir = Files.createTempDir()
- dir.deleteOnExit()
+ val dir = Utils.createTempDir()
println(s"Local disk address is ${dir.toString}.")
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 75b0119190..3620e251cc 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -24,13 +24,14 @@ import org.apache.hadoop.util.Progressable
import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.Random
-import com.google.common.io.Files
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
TaskAttemptContext => NewTaskAttempContext}
import org.apache.spark.{Partitioner, SharedSparkContext}
import org.apache.spark.SparkContext._
+import org.apache.spark.util.Utils
+
import org.scalatest.FunSuite
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
@@ -381,14 +382,16 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
}
test("zero-partition RDD") {
- val emptyDir = Files.createTempDir()
- emptyDir.deleteOnExit()
- val file = sc.textFile(emptyDir.getAbsolutePath)
- assert(file.partitions.size == 0)
- assert(file.collect().toList === Nil)
- // Test that a shuffle on the file works, because this used to be a bug
- assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
- emptyDir.delete()
+ val emptyDir = Utils.createTempDir()
+ try {
+ val file = sc.textFile(emptyDir.getAbsolutePath)
+ assert(file.partitions.isEmpty)
+ assert(file.collect().toList === Nil)
+ // Test that a shuffle on the file works, because this used to be a bug
+ assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
+ } finally {
+ Utils.deleteRecursively(emptyDir)
+ }
}
test("keys and values") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 3efa854318..abc300fcff 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
import scala.collection.mutable
import scala.io.Source
-import com.google.common.io.Files
import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -51,8 +50,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
private var logDirPath: Path = _
before {
- testDir = Files.createTempDir()
- testDir.deleteOnExit()
+ testDir = Utils.createTempDir()
logDirPath = Utils.getFilePath(testDir, "spark-events")
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 48114feee6..e05f373392 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler
import java.io.{File, PrintWriter}
-import com.google.common.io.Files
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -39,8 +38,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
private var testDir: File = _
before {
- testDir = Files.createTempDir()
- testDir.deleteOnExit()
+ testDir = Utils.createTempDir()
}
after {
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index e4522e00a6..bc5c74c126 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -19,22 +19,13 @@ package org.apache.spark.storage
import java.io.{File, FileWriter}
-import org.apache.spark.network.nio.NioBlockTransferService
-import org.apache.spark.shuffle.hash.HashShuffleManager
-
-import scala.collection.mutable
import scala.language.reflectiveCalls
-import akka.actor.Props
-import com.google.common.io.Files
import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.LiveListenerBus
-import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.util.{AkkaUtils, Utils}
-import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.util.Utils
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
private val testConf = new SparkConf(false)
@@ -48,10 +39,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
override def beforeAll() {
super.beforeAll()
- rootDir0 = Files.createTempDir()
- rootDir0.deleteOnExit()
- rootDir1 = Files.createTempDir()
- rootDir1.deleteOnExit()
+ rootDir0 = Utils.createTempDir()
+ rootDir1 = Utils.createTempDir()
rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath
}
diff --git a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
index c3dd156b40..dc2a05631d 100644
--- a/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
@@ -21,7 +21,6 @@ import java.io.{File, IOException}
import scala.io.Source
-import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -44,7 +43,7 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
private var logDirPathString: String = _
before {
- testDir = Files.createTempDir()
+ testDir = Utils.createTempDir()
logDirPath = Utils.getFilePath(testDir, "test-file-logger")
logDirPathString = logDirPath.toString
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index e63d9d085e..0344da60da 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -112,7 +112,7 @@ class UtilsSuite extends FunSuite {
}
test("reading offset bytes of a file") {
- val tmpDir2 = Files.createTempDir()
+ val tmpDir2 = Utils.createTempDir()
tmpDir2.deleteOnExit()
val f1Path = tmpDir2 + "/f1"
val f1 = new FileOutputStream(f1Path)
@@ -141,7 +141,7 @@ class UtilsSuite extends FunSuite {
}
test("reading offset bytes across multiple files") {
- val tmpDir = Files.createTempDir()
+ val tmpDir = Utils.createTempDir()
tmpDir.deleteOnExit()
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
Files.write("0123456789", files(0), Charsets.UTF_8)
@@ -308,4 +308,28 @@ class UtilsSuite extends FunSuite {
}
}
+ test("deleteRecursively") {
+ val tempDir1 = Utils.createTempDir()
+ assert(tempDir1.exists())
+ Utils.deleteRecursively(tempDir1)
+ assert(!tempDir1.exists())
+
+ val tempDir2 = Utils.createTempDir()
+ val tempFile1 = new File(tempDir2, "foo.txt")
+ Files.touch(tempFile1)
+ assert(tempFile1.exists())
+ Utils.deleteRecursively(tempFile1)
+ assert(!tempFile1.exists())
+
+ val tempDir3 = new File(tempDir2, "subdir")
+ assert(tempDir3.mkdir())
+ val tempFile2 = new File(tempDir3, "bar.txt")
+ Files.touch(tempFile2)
+ assert(tempFile2.exists())
+ Utils.deleteRecursively(tempDir2)
+ assert(!tempDir2.exists())
+ assert(!tempDir3.exists())
+ assert(!tempFile2.exists())
+ }
+
}