aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-03-20 14:16:21 +0000
committerSean Owen <sowen@cloudera.com>2015-03-20 14:16:21 +0000
commit6f80c3e8880340597f161f87e64697bec86cc586 (patch)
treec019ca07ed3b4dd178c102aac00418485da5e679 /streaming
parentd08e3eb3dc455970b685a7b8b7e00c537c89a8e4 (diff)
downloadspark-6f80c3e8880340597f161f87e64697bec86cc586.tar.gz
spark-6f80c3e8880340597f161f87e64697bec86cc586.tar.bz2
spark-6f80c3e8880340597f161f87e64697bec86cc586.zip
SPARK-6338 [CORE] Use standard temp dir mechanisms in tests to avoid orphaned temp files
Use `Utils.createTempDir()` to replace other temp file mechanisms used in some tests, to further ensure they are cleaned up, and simplify Author: Sean Owen <sowen@cloudera.com> Closes #5029 from srowen/SPARK-6338 and squashes the following commits: 27b740a [Sean Owen] Fix hive-thriftserver tests that don't expect an existing dir 4a212fa [Sean Owen] Standardize a bit more temp dir management 9004081 [Sean Owen] Revert some added recursive-delete calls 57609e4 [Sean Owen] Use Utils.createTempDir() to replace other temp file mechanisms used in some tests, to further ensure they are cleaned up, and simplify
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala5
5 files changed, 14 insertions, 27 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 8ea91eca68..91a2b2bba4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -222,7 +222,7 @@ class CheckpointSuite extends TestSuiteBase {
}
test("recovery with saveAsHadoopFiles operation") {
- val tempDir = Files.createTempDir()
+ val tempDir = Utils.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
@@ -245,7 +245,7 @@ class CheckpointSuite extends TestSuiteBase {
}
test("recovery with saveAsNewAPIHadoopFiles operation") {
- val tempDir = Files.createTempDir()
+ val tempDir = Utils.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
@@ -283,7 +283,7 @@ class CheckpointSuite extends TestSuiteBase {
//
// After SPARK-5079 is addressed, should be able to remove this test since a strengthened
// version of the other saveAsHadoopFile* tests would prevent regressions for this issue.
- val tempDir = Files.createTempDir()
+ val tempDir = Utils.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 6500608bba..26435d8515 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -20,15 +20,13 @@ package org.apache.spark.streaming
import org.apache.spark.Logging
import org.apache.spark.util.Utils
-import java.io.File
-
/**
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
class FailureSuite extends TestSuiteBase with Logging {
- val directory = Utils.createTempDir().getAbsolutePath
+ val directory = Utils.createTempDir()
val numBatches = 30
override def batchDuration = Milliseconds(1000)
@@ -36,16 +34,16 @@ class FailureSuite extends TestSuiteBase with Logging {
override def useManualClock = false
override def afterFunction() {
- Utils.deleteRecursively(new File(directory))
+ Utils.deleteRecursively(directory)
super.afterFunction()
}
test("multiple failures with map") {
- MasterFailureTest.testMap(directory, numBatches, batchDuration)
+ MasterFailureTest.testMap(directory.getAbsolutePath, numBatches, batchDuration)
}
test("multiple failures with updateStateByKey") {
- MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
+ MasterFailureTest.testUpdateStateByKey(directory.getAbsolutePath, numBatches, batchDuration)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 818f551dbe..18a477f920 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -25,8 +25,6 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.{ActorSystem, Props}
-import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._
@@ -39,7 +37,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
-import org.apache.spark.util.{AkkaUtils, ManualClock}
+import org.apache.spark.util.{AkkaUtils, ManualClock, Utils}
import WriteAheadLogBasedBlockHandler._
import WriteAheadLogSuite._
@@ -76,7 +74,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")
- tempDirectory = Files.createTempDir()
+ tempDirectory = Utils.createTempDir()
manualClock.setTime(0)
}
@@ -93,10 +91,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
actorSystem.awaitTermination()
actorSystem = null
- if (tempDirectory != null && tempDirectory.exists()) {
- FileUtils.deleteDirectory(tempDirectory)
- tempDirectory = null
- }
+ Utils.deleteRecursively(tempDirectory)
}
test("BlockManagerBasedBlockHandler - store blocks") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index a3a0fd5187..42fad769f0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -24,8 +24,6 @@ import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.util.Random
-import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._
@@ -51,15 +49,12 @@ class ReceivedBlockTrackerSuite
before {
conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
- checkpointDirectory = Files.createTempDir()
+ checkpointDirectory = Utils.createTempDir()
}
after {
allReceivedBlockTrackers.foreach { _.stop() }
- if (checkpointDirectory != null && checkpointDirectory.exists()) {
- FileUtils.deleteDirectory(checkpointDirectory)
- checkpointDirectory = null
- }
+ Utils.deleteRecursively(checkpointDirectory)
}
test("block addition, and block to batch allocation") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index e8c34a9ee4..aa20ad0b53 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.Semaphore
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import com.google.common.io.Files
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -34,6 +33,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
+import org.apache.spark.util.Utils
/** Testsuite for testing the network receiver behavior */
class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
@@ -222,7 +222,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
val batchDuration = Milliseconds(500)
- val tempDirectory = Files.createTempDir()
+ val tempDirectory = Utils.createTempDir()
val logDirectory1 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 0))
val logDirectory2 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 1))
val allLogFiles1 = new mutable.HashSet[String]()
@@ -251,7 +251,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
}
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
- tempDirectory.deleteOnExit()
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiverStream1 = ssc.receiverStream(receiver1)