diff options
Diffstat (limited to 'streaming/src')
5 files changed, 14 insertions, 27 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 8ea91eca68..91a2b2bba4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -222,7 +222,7 @@ class CheckpointSuite extends TestSuiteBase { } test("recovery with saveAsHadoopFiles operation") { - val tempDir = Files.createTempDir() + val tempDir = Utils.createTempDir() try { testCheckpointedOperation( Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), @@ -245,7 +245,7 @@ class CheckpointSuite extends TestSuiteBase { } test("recovery with saveAsNewAPIHadoopFiles operation") { - val tempDir = Files.createTempDir() + val tempDir = Utils.createTempDir() try { testCheckpointedOperation( Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), @@ -283,7 +283,7 @@ class CheckpointSuite extends TestSuiteBase { // // After SPARK-5079 is addressed, should be able to remove this test since a strengthened // version of the other saveAsHadoopFile* tests would prevent regressions for this issue. - val tempDir = Files.createTempDir() + val tempDir = Utils.createTempDir() try { testCheckpointedOperation( Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index 6500608bba..26435d8515 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -20,15 +20,13 @@ package org.apache.spark.streaming import org.apache.spark.Logging import org.apache.spark.util.Utils -import java.io.File - /** * This testsuite tests master failures at random times while the stream is running using * the real clock. */ class FailureSuite extends TestSuiteBase with Logging { - val directory = Utils.createTempDir().getAbsolutePath + val directory = Utils.createTempDir() val numBatches = 30 override def batchDuration = Milliseconds(1000) @@ -36,16 +34,16 @@ class FailureSuite extends TestSuiteBase with Logging { override def useManualClock = false override def afterFunction() { - Utils.deleteRecursively(new File(directory)) + Utils.deleteRecursively(directory) super.afterFunction() } test("multiple failures with map") { - MasterFailureTest.testMap(directory, numBatches, batchDuration) + MasterFailureTest.testMap(directory.getAbsolutePath, numBatches, batchDuration) } test("multiple failures with updateStateByKey") { - MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration) + MasterFailureTest.testUpdateStateByKey(directory.getAbsolutePath, numBatches, batchDuration) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 818f551dbe..18a477f920 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -25,8 +25,6 @@ import scala.concurrent.duration._ import scala.language.postfixOps import akka.actor.{ActorSystem, Props} -import com.google.common.io.Files -import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} import org.scalatest.concurrent.Eventually._ @@ -39,7 +37,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ -import org.apache.spark.util.{AkkaUtils, ManualClock} +import org.apache.spark.util.{AkkaUtils, ManualClock, Utils} import WriteAheadLogBasedBlockHandler._ import WriteAheadLogSuite._ @@ -76,7 +74,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche new NioBlockTransferService(conf, securityMgr), securityMgr, 0) blockManager.initialize("app-id") - tempDirectory = Files.createTempDir() + tempDirectory = Utils.createTempDir() manualClock.setTime(0) } @@ -93,10 +91,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche actorSystem.awaitTermination() actorSystem = null - if (tempDirectory != null && tempDirectory.exists()) { - FileUtils.deleteDirectory(tempDirectory) - tempDirectory = null - } + Utils.deleteRecursively(tempDirectory) } test("BlockManagerBasedBlockHandler - store blocks") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index a3a0fd5187..42fad769f0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -24,8 +24,6 @@ import scala.concurrent.duration._ import scala.language.{implicitConversions, postfixOps} import scala.util.Random -import com.google.common.io.Files -import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} import org.scalatest.concurrent.Eventually._ @@ -51,15 +49,12 @@ class ReceivedBlockTrackerSuite before { conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite") - checkpointDirectory = Files.createTempDir() + checkpointDirectory = Utils.createTempDir() } after { allReceivedBlockTrackers.foreach { _.stop() } - if (checkpointDirectory != null && checkpointDirectory.exists()) { - FileUtils.deleteDirectory(checkpointDirectory) - checkpointDirectory = null - } + Utils.deleteRecursively(checkpointDirectory) } test("block addition, and block to batch allocation") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index e8c34a9ee4..aa20ad0b53 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -24,7 +24,6 @@ import java.util.concurrent.Semaphore import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.io.Files import org.scalatest.concurrent.Timeouts import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -34,6 +33,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._ +import org.apache.spark.util.Utils /** Testsuite for testing the network receiver behavior */ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { @@ -222,7 +222,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { .set("spark.streaming.receiver.writeAheadLog.enable", "true") .set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1") val batchDuration = Milliseconds(500) - val tempDirectory = Files.createTempDir() + val tempDirectory = Utils.createTempDir() val logDirectory1 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 0)) val logDirectory2 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 1)) val allLogFiles1 = new mutable.HashSet[String]() @@ -251,7 +251,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable { } withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc => - tempDirectory.deleteOnExit() val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true)) val receiverStream1 = ssc.receiverStream(receiver1) |