aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-10-09 18:21:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-09 18:21:59 -0700
commit363baacaded56047bcc63276d729ab911e0336cf (patch)
treec6114d94f6b6f04c386df0414f17d3a0e6c87a33 /streaming
parent2837bf8548db7e9d43f6eefedf5a73feb22daedb (diff)
downloadspark-363baacaded56047bcc63276d729ab911e0336cf.tar.gz
spark-363baacaded56047bcc63276d729ab911e0336cf.tar.bz2
spark-363baacaded56047bcc63276d729ab911e0336cf.zip
SPARK-3811 [CORE] More robust / standard Utils.deleteRecursively, Utils.createTempDir
I noticed a few issues with how temp directories are created and deleted: *Minor* * Guava's `Files.createTempDir()` plus `File.deleteOnExit()` is used in many tests to make a temp dir, but `Utils.createTempDir()` seems to be the standard Spark mechanism * Call to `File.deleteOnExit()` could be pushed into `Utils.createTempDir()` as well, along with this replacement * _I messed up the message in an exception in `Utils` in SPARK-3794; fixed here_ *Bit Less Minor* * `Utils.deleteRecursively()` fails immediately if any `IOException` occurs, instead of trying to delete any remaining files and subdirectories. I've observed this leave temp dirs around. I suggest changing it to continue in the face of an exception and throw one of the possibly several exceptions that occur at the end. * `Utils.createTempDir()` will add a JVM shutdown hook every time the method is called. Even if the subdir is the parent of another parent dir, since this check is inside the hook. However `Utils` manages a set of all dirs to delete on shutdown already, called `shutdownDeletePaths`. A single hook can be registered to delete all of these on exit. This is how Tachyon temp paths are cleaned up in `TachyonBlockManager`. I noticed a few other things that might be changed but wanted to ask first: * Shouldn't the set of dirs to delete be `File`, not just `String` paths? * `Utils` manages the set of `TachyonFile` that have been registered for deletion, but the shutdown hook is managed in `TachyonBlockManager`. Should this logic not live together, and not in `Utils`? it's more specific to Tachyon, and looks a slight bit odd to import in such a generic place. Author: Sean Owen <sowen@cloudera.com> Closes #2670 from srowen/SPARK-3811 and squashes the following commits: 071ae60 [Sean Owen] Update per @vanzin's review da0146d [Sean Owen] Make Utils.deleteRecursively try to delete all paths even when an exception occurs; use one shutdown hook instead of one per method call to delete temp dirs 3a0faa4 [Sean Owen] Standardize on Utils.createTempDir instead of Files.createTempDir
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala5
4 files changed, 5 insertions, 9 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 8511390cb1..e5592e52b0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -231,8 +231,7 @@ class CheckpointSuite extends TestSuiteBase {
// failure, are re-processed or not.
test("recovery with file input stream") {
// Set up the streaming context and input streams
- val testDir = Files.createTempDir()
- testDir.deleteOnExit()
+ val testDir = Utils.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 952a74fd5f..a44a45a3e9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -98,8 +98,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
// Set up the streaming context and input streams
- val testDir = Files.createTempDir()
- testDir.deleteOnExit()
+ val testDir = Utils.createTempDir()
val ssc = new StreamingContext(conf, batchDuration)
val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index c53c017060..5dbb723200 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -352,8 +352,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
extends Thread with Logging {
override def run() {
- val localTestDir = Files.createTempDir()
- localTestDir.deleteOnExit()
+ val localTestDir = Utils.createTempDir()
var fs = testDir.getFileSystem(new Configuration())
val maxTries = 3
try {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 759baacaa4..9327ff4822 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -24,12 +24,12 @@ import scala.collection.mutable.SynchronizedBuffer
import scala.reflect.ClassTag
import org.scalatest.{BeforeAndAfter, FunSuite}
-import com.google.common.io.Files
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
/**
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
@@ -120,9 +120,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Directory where the checkpoint data will be saved
lazy val checkpointDir = {
- val dir = Files.createTempDir()
+ val dir = Utils.createTempDir()
logDebug(s"checkpointDir: $dir")
- dir.deleteOnExit()
dir.toString
}