aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala7
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala10
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala6
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala6
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala8
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala14
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala2
33 files changed, 116 insertions, 153 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 4e58aa0ed4..5668b53fc6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.master.{RecoveryState, SparkCuratorUtil}
+import org.apache.spark.util.Utils
/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
@@ -405,8 +406,7 @@ private object SparkDocker {
private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
val ipPromise = promise[String]()
- val outFile = File.createTempFile("fault-tolerance-test", "")
- outFile.deleteOnExit()
+ val outFile = File.createTempFile("fault-tolerance-test", "", Utils.createTempDir())
val outStream: FileWriter = new FileWriter(outFile)
def findIpAndLog(line: String): Unit = {
if (line.startsWith("CONTAINER_IP=")) {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 91aa70870a..fa56bb09e4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -288,7 +288,7 @@ private[spark] object Utils extends Logging {
} catch { case e: SecurityException => dir = null; }
}
- dir
+ dir.getCanonicalFile
}
/**
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 3b10b3a042..32abc65385 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -33,8 +33,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
override def beforeEach() {
super.beforeEach()
- checkpointDir = File.createTempFile("temp", "")
- checkpointDir.deleteOnExit()
+ checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
checkpointDir.delete()
sc = new SparkContext("local", "test")
sc.setCheckpointDir(checkpointDir.toString)
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 43fbd3ff3f..62cb7649c0 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -21,6 +21,8 @@ import java.io.File
import org.scalatest.FunSuite
+import org.apache.spark.util.Utils
+
class SecurityManagerSuite extends FunSuite {
test("set security with conf") {
@@ -160,8 +162,7 @@ class SecurityManagerSuite extends FunSuite {
}
test("ssl off setup") {
- val file = File.createTempFile("SSLOptionsSuite", "conf")
- file.deleteOnExit()
+ val file = File.createTempFile("SSLOptionsSuite", "conf", Utils.createTempDir())
System.setProperty("spark.ssl.configFile", file.getAbsolutePath)
val conf = new SparkConf()
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index b8e3e83b5a..b07c4d93db 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -79,13 +79,14 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}
-
+
test("addFile works") {
- val file1 = File.createTempFile("someprefix1", "somesuffix1")
+ val dir = Utils.createTempDir()
+
+ val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val absolutePath1 = file1.getAbsolutePath
- val pluto = Utils.createTempDir()
- val file2 = File.createTempFile("someprefix2", "somesuffix2", pluto)
+ val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val relativePath = file2.getParent + "/../" + file2.getParentFile.getName + "/" + file2.getName
val absolutePath2 = file2.getAbsolutePath
@@ -129,7 +130,7 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}
-
+
test("addFile recursive works") {
val pluto = Utils.createTempDir()
val neptune = Utils.createTempDir(pluto.getAbsolutePath)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 46d745c4ec..4561e5b8e9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -402,8 +402,10 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
val archives = "file:/archive1,archive2" // spark.yarn.dist.archives
val pyFiles = "py-file1,py-file2" // spark.submit.pyFiles
+ val tmpDir = Utils.createTempDir()
+
// Test jars and files
- val f1 = File.createTempFile("test-submit-jars-files", "")
+ val f1 = File.createTempFile("test-submit-jars-files", "", tmpDir)
val writer1 = new PrintWriter(f1)
writer1.println("spark.jars " + jars)
writer1.println("spark.files " + files)
@@ -420,7 +422,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps("spark.files") should be(Utils.resolveURIs(files))
// Test files and archives (Yarn)
- val f2 = File.createTempFile("test-submit-files-archives", "")
+ val f2 = File.createTempFile("test-submit-files-archives", "", tmpDir)
val writer2 = new PrintWriter(f2)
writer2.println("spark.yarn.dist.files " + files)
writer2.println("spark.yarn.dist.archives " + archives)
@@ -437,7 +439,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
sysProps2("spark.yarn.dist.archives") should be(Utils.resolveURIs(archives))
// Test python files
- val f3 = File.createTempFile("test-submit-python-files", "")
+ val f3 = File.createTempFile("test-submit-python-files", "", tmpDir)
val writer3 = new PrintWriter(f3)
writer3.println("spark.submit.pyFiles " + pyFiles)
writer3.close()
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 1a9a0e857e..aea76c1adc 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
-import org.apache.spark._
import org.scalatest.FunSuite
import scala.collection.Map
@@ -30,6 +29,9 @@ import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
+import org.apache.spark._
+import org.apache.spark.util.Utils
+
class PipedRDDSuite extends FunSuite with SharedSparkContext {
test("basic pipe") {
@@ -141,7 +143,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
- new File("tasks").delete()
+ Utils.deleteRecursively(new File("tasks"))
} else {
assert(true)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
index c21c92b63a..78bbc4ec2c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
@@ -16,16 +16,18 @@
*/
package org.apache.spark.storage
-import org.scalatest.FunSuite
import java.io.File
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
class BlockObjectWriterSuite extends FunSuite {
test("verify write metrics") {
- val file = new File("somefile")
- file.deleteOnExit()
+ val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
@@ -47,8 +49,7 @@ class BlockObjectWriterSuite extends FunSuite {
}
test("verify write metrics on revert") {
- val file = new File("somefile")
- file.deleteOnExit()
+ val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
@@ -71,8 +72,7 @@ class BlockObjectWriterSuite extends FunSuite {
}
test("Reopening a closed block writer") {
- val file = new File("somefile")
- file.deleteOnExit()
+ val file = new File(Utils.createTempDir(), "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 4dc5b6103d..43b6a405cb 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.logging.{RollingFileAppender, SizeBasedRollingPolic
class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
- val testFile = new File("FileAppenderSuite-test-" + System.currentTimeMillis).getAbsoluteFile
+ val testFile = new File(Utils.createTempDir(), "FileAppenderSuite-test").getAbsoluteFile
before {
cleanup()
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index b91428efad..5d93086082 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -122,7 +122,6 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
test("reading offset bytes of a file") {
val tmpDir2 = Utils.createTempDir()
- tmpDir2.deleteOnExit()
val f1Path = tmpDir2 + "/f1"
val f1 = new FileOutputStream(f1Path)
f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8))
@@ -151,7 +150,6 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
test("reading offset bytes across multiple files") {
val tmpDir = Utils.createTempDir()
- tmpDir.deleteOnExit()
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
Files.write("0123456789", files(0), UTF_8)
Files.write("abcdefghij", files(1), UTF_8)
@@ -357,7 +355,8 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
}
test("loading properties from file") {
- val outFile = File.createTempFile("test-load-spark-properties", "test")
+ val tmpDir = Utils.createTempDir()
+ val outFile = File.createTempFile("test-load-spark-properties", "test", tmpDir)
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
@@ -370,7 +369,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
} finally {
- outFile.delete()
+ Utils.deleteRecursively(tmpDir)
}
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index fc53c23abd..3cd960d1fd 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -25,16 +25,15 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
-import com.google.common.io.Files
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
-import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.util.Utils
class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
@@ -60,7 +59,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
- tempDirectory = Files.createTempDir()
+ tempDirectory = Utils.createTempDir()
ssc.checkpoint(tempDirectory.getAbsolutePath)
}
@@ -68,10 +67,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
if (ssc != null) {
ssc.stop()
}
- if (tempDirectory != null && tempDirectory.exists()) {
- FileUtils.deleteDirectory(tempDirectory)
- tempDirectory = null
- }
+ Utils.deleteRecursively(tempDirectory)
tearDownKafka()
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index b61d9f0fbe..8d15150458 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -19,13 +19,12 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
-import com.google.common.io.Files
-
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
class GraphSuite extends FunSuite with LocalSparkContext {
@@ -369,8 +368,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
test("checkpoint") {
- val checkpointDir = Files.createTempDir()
- checkpointDir.deleteOnExit()
+ val checkpointDir = Utils.createTempDir()
withSpark { sc =>
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index fbef5b25ba..14f5e9ed4f 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -21,11 +21,9 @@ import java.io._
import java.net.URLClassLoader
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Await
import scala.concurrent.duration._
import scala.tools.nsc.interpreter.SparkILoop
-import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.SparkContext
@@ -196,8 +194,7 @@ class ReplSuite extends FunSuite {
}
test("interacting with files") {
- val tempDir = Files.createTempDir()
- tempDir.deleteOnExit()
+ val tempDir = Utils.createTempDir()
val out = new FileWriter(tempDir + "/input")
out.write("Hello world!\n")
out.write("What's up?\n")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
index 80c7dfd376..528e38a50a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/package.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.rules
-import org.apache.spark.sql.catalyst.util
+import org.apache.spark.util.Utils
/**
* A collection of generators that build custom bytecode at runtime for performing the evaluation
@@ -52,7 +52,7 @@ package object codegen {
@DeveloperApi
object DumpByteCode {
import scala.sys.process._
- val dumpDirectory = util.getTempFilePath("sparkSqlByteCode")
+ val dumpDirectory = Utils.createTempDir()
dumpDirectory.mkdir()
def apply(obj: Any): Unit = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index d8da45ae70..feed50f9a2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -19,20 +19,9 @@ package org.apache.spark.sql.catalyst
import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
-import org.apache.spark.util.{Utils => SparkUtils}
+import org.apache.spark.util.Utils
package object util {
- /**
- * Returns a path to a temporary file that probably does not exist.
- * Note, there is always the race condition that someone created this
- * file since the last time we checked. Thus, this shouldn't be used
- * for anything security conscious.
- */
- def getTempFilePath(prefix: String, suffix: String = ""): File = {
- val tempFile = File.createTempFile(prefix, suffix)
- tempFile.delete()
- tempFile
- }
def fileToString(file: File, encoding: String = "UTF-8") = {
val inStream = new FileInputStream(file)
@@ -56,7 +45,7 @@ package object util {
def resourceToString(
resource:String,
encoding: String = "UTF-8",
- classLoader: ClassLoader = SparkUtils.getSparkClassLoader) = {
+ classLoader: ClassLoader = Utils.getSparkClassLoader) = {
val inStream = classLoader.getResourceAsStream(resource)
val outStream = new ByteArrayOutputStream
try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index d6ea6679c5..9d17516e0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -23,7 +23,6 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try
-import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.util.Utils
@@ -67,8 +66,9 @@ private[sql] trait ParquetTest {
* @todo Probably this method should be moved to a more general place
*/
protected def withTempPath(f: File => Unit): Unit = {
- val file = util.getTempFilePath("parquetTest").getCanonicalFile
- try f(file) finally if (file.exists()) Utils.deleteRecursively(file)
+ val path = Utils.createTempDir()
+ path.delete()
+ try f(path) finally Utils.deleteRecursively(path)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 23f424c0bf..fe618e0e8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import java.io.File
+import org.apache.spark.util.Utils
+
import scala.beans.{BeanInfo, BeanProperty}
import org.apache.spark.rdd.RDD
@@ -98,13 +100,13 @@ class UserDefinedTypeSuite extends QueryTest {
test("UDTs with Parquet") {
- val tempDir = File.createTempFile("parquet", "test")
+ val tempDir = Utils.createTempDir()
tempDir.delete()
pointsRDD.saveAsParquetFile(tempDir.getCanonicalPath)
}
test("Repartition UDTs with Parquet") {
- val tempDir = File.createTempFile("parquet", "test")
+ val tempDir = Utils.createTempDir()
tempDir.delete()
pointsRDD.repartition(1).saveAsParquetFile(tempDir.getCanonicalPath)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 320b80d80e..706c966ee0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -22,7 +22,6 @@ import java.sql.{Date, Timestamp}
import org.scalactic.Tolerance._
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.json.JsonRDD.{compatibleType, enforceCorrectType}
import org.apache.spark.sql.sources.LogicalRelation
@@ -31,6 +30,7 @@ import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.util.Utils
class JsonSuite extends QueryTest {
import org.apache.spark.sql.json.TestJsonData._
@@ -554,8 +554,9 @@ class JsonSuite extends QueryTest {
}
test("jsonFile should be based on JSONRelation") {
- val file = getTempFilePath("json")
- val path = file.toString
+ val dir = Utils.createTempDir()
+ dir.delete()
+ val path = dir.getCanonicalPath
sparkContext.parallelize(1 to 100).map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
val jsonDF = jsonFile(path, 0.49)
@@ -580,8 +581,9 @@ class JsonSuite extends QueryTest {
}
test("Loading a JSON dataset from a text file") {
- val file = getTempFilePath("json")
- val path = file.toString
+ val dir = Utils.createTempDir()
+ dir.delete()
+ val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
val jsonDF = jsonFile(path)
@@ -611,8 +613,9 @@ class JsonSuite extends QueryTest {
}
test("Loading a JSON dataset from a text file with SQL") {
- val file = getTempFilePath("json")
- val path = file.toString
+ val dir = Utils.createTempDir()
+ dir.delete()
+ val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
sql(
@@ -637,8 +640,9 @@ class JsonSuite extends QueryTest {
}
test("Applying schemas") {
- val file = getTempFilePath("json")
- val path = file.toString
+ val dir = Utils.createTempDir()
+ dir.delete()
+ val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
val schema = StructType(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 60355414a4..2975a7fee4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.apache.spark.sql.AnalysisException
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.catalyst.util
import org.apache.spark.util.Utils
class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
@@ -32,7 +31,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
var path: File = null
override def beforeAll(): Unit = {
- path = util.getTempFilePath("jsonCTAS").getCanonicalFile
+ path = Utils.createTempDir()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd).registerTempTable("jt")
}
@@ -42,7 +41,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
}
after {
- if (path.exists()) Utils.deleteRecursively(path)
+ Utils.deleteRecursively(path)
}
test("CREATE TEMPORARY TABLE AS SELECT") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index b5b16f9546..80efe9728f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.catalyst.util
import org.apache.spark.util.Utils
class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
@@ -32,7 +31,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
var path: File = null
override def beforeAll: Unit = {
- path = util.getTempFilePath("jsonCTAS").getCanonicalFile
+ path = Utils.createTempDir()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd).registerTempTable("jt")
sql(
@@ -48,7 +47,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
override def afterAll: Unit = {
dropTempTable("jsonTable")
dropTempTable("jt")
- if (path.exists()) Utils.deleteRecursively(path)
+ Utils.deleteRecursively(path)
}
test("Simple INSERT OVERWRITE a JSONRelation") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 607488ccfd..43bc8eb2d1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -21,7 +21,6 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.{SaveMode, SQLConf, DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -39,7 +38,8 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
override def beforeAll(): Unit = {
originalDefaultSource = conf.defaultDataSourceName
- path = util.getTempFilePath("datasource").getCanonicalFile
+ path = Utils.createTempDir()
+ path.delete()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
df = jsonRDD(rdd)
@@ -52,7 +52,7 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
after {
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
- if (path.exists()) Utils.deleteRecursively(path)
+ Utils.deleteRecursively(path)
}
def checkLoad(): Unit = {
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 8bca4b33b3..75738fa22b 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.util.Utils
class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
def runCliWithin(
@@ -38,8 +38,10 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
queriesAndExpectedAnswers: (String, String)*) {
val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip
- val warehousePath = getTempFilePath("warehouse")
- val metastorePath = getTempFilePath("metastore")
+ val warehousePath = Utils.createTempDir()
+ warehousePath.delete()
+ val metastorePath = Utils.createTempDir()
+ metastorePath.delete()
val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator)
val command = {
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index aff96e21a5..bf20acecb1 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -37,7 +37,6 @@ import org.apache.thrift.transport.TSocket
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.util.Utils
@@ -447,8 +446,10 @@ abstract class HiveThriftServer2Test extends FunSuite with BeforeAndAfterAll wit
}
private def startThriftServer(port: Int, attempt: Int) = {
- warehousePath = util.getTempFilePath("warehouse")
- metastorePath = util.getTempFilePath("metastore")
+ warehousePath = Utils.createTempDir()
+ warehousePath.delete()
+ metastorePath = Utils.createTempDir()
+ metastorePath.delete()
logPath = null
logTailingProcess = null
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 4859991e23..b4aee78046 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.CacheTableCommand
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
@@ -69,22 +68,19 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
hiveconf.set("hive.plan.serialization.format", "javaXML")
- lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath
- lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath
+ lazy val warehousePath = Utils.createTempDir()
+ lazy val metastorePath = Utils.createTempDir()
/** Sets up the system initially or after a RESET command */
protected def configure(): Unit = {
+ warehousePath.delete()
+ metastorePath.delete()
setConf("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastorePath;create=true")
- setConf("hive.metastore.warehouse.dir", warehousePath)
- Utils.registerShutdownDeleteDir(new File(warehousePath))
- Utils.registerShutdownDeleteDir(new File(metastorePath))
+ setConf("hive.metastore.warehouse.dir", warehousePath.toString)
}
- val testTempDir = File.createTempFile("testTempFiles", "spark.hive.tmp")
- testTempDir.delete()
- testTempDir.mkdir()
- Utils.registerShutdownDeleteDir(testTempDir)
+ val testTempDir = Utils.createTempDir()
// For some hive test case which contain ${system:test.tmp.dir}
System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index d4b175fa44..381cd2a291 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -21,12 +21,11 @@ import java.io.File
import org.scalatest.BeforeAndAfter
-import com.google.common.io.Files
-
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.{QueryTest, _}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/* Implicits */
import org.apache.spark.sql.hive.test.TestHive._
@@ -112,7 +111,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
test("SPARK-4203:random partition directory order") {
sql("CREATE TABLE tmp_table (key int, value string)")
- val tmpDir = Files.createTempDir()
+ val tmpDir = Utils.createTempDir()
sql(s"CREATE TABLE table_with_partition(c1 string) PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='1') SELECT 'blarr' FROM tmp_table")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='2') SELECT 'blarr' FROM tmp_table")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 5d6a6f3b64..ff2e6ea9ea 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.hive
import java.io.File
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.BeforeAndAfterEach
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.InvalidInputException
-import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql._
import org.apache.spark.util.Utils
import org.apache.spark.sql.types._
@@ -34,8 +35,6 @@ import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
-import scala.collection.mutable.ArrayBuffer
-
/**
* Tests for persisting tables created though the data sources API into the metastore.
*/
@@ -43,11 +42,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
override def afterEach(): Unit = {
reset()
- if (tempPath.exists()) Utils.deleteRecursively(tempPath)
+ Utils.deleteRecursively(tempPath)
}
val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
- var tempPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile
+ var tempPath: File = Utils.createTempDir()
+ tempPath.delete()
test ("persistent JSON table") {
sql(
@@ -154,7 +154,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
}
test("check change without refresh") {
- val tempDir = File.createTempFile("sparksql", "json")
+ val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
tempDir.delete()
sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
@@ -192,7 +192,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
}
test("drop, change, recreate") {
- val tempDir = File.createTempFile("sparksql", "json")
+ val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
tempDir.delete()
sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1904f5faef..d891c4e890 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@@ -579,13 +580,8 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
var partitionedTableDirWithKeyAndComplexTypes: File = null
override def beforeAll(): Unit = {
- partitionedTableDir = File.createTempFile("parquettests", "sparksql")
- partitionedTableDir.delete()
- partitionedTableDir.mkdir()
-
- normalTableDir = File.createTempFile("parquettests", "sparksql")
- normalTableDir.delete()
- normalTableDir.mkdir()
+ partitionedTableDir = Utils.createTempDir()
+ normalTableDir = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDir, s"p=$p")
@@ -601,9 +597,7 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
.toDF()
.saveAsParquetFile(new File(normalTableDir, "normal").getCanonicalPath)
- partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
- partitionedTableDirWithKey.delete()
- partitionedTableDirWithKey.mkdir()
+ partitionedTableDirWithKey = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKey, s"p=$p")
@@ -613,9 +607,7 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
.saveAsParquetFile(partDir.getCanonicalPath)
}
- partitionedTableDirWithKeyAndComplexTypes = File.createTempFile("parquettests", "sparksql")
- partitionedTableDirWithKeyAndComplexTypes.delete()
- partitionedTableDirWithKeyAndComplexTypes.mkdir()
+ partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
@@ -625,9 +617,7 @@ abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll
}.toDF().saveAsParquetFile(partDir.getCanonicalPath)
}
- partitionedTableDirWithComplexTypes = File.createTempFile("parquettests", "sparksql")
- partitionedTableDirWithComplexTypes.delete()
- partitionedTableDirWithComplexTypes.mkdir()
+ partitionedTableDirWithComplexTypes = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 8ea91eca68..91a2b2bba4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -222,7 +222,7 @@ class CheckpointSuite extends TestSuiteBase {
}
test("recovery with saveAsHadoopFiles operation") {
- val tempDir = Files.createTempDir()
+ val tempDir = Utils.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
@@ -245,7 +245,7 @@ class CheckpointSuite extends TestSuiteBase {
}
test("recovery with saveAsNewAPIHadoopFiles operation") {
- val tempDir = Files.createTempDir()
+ val tempDir = Utils.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
@@ -283,7 +283,7 @@ class CheckpointSuite extends TestSuiteBase {
//
// After SPARK-5079 is addressed, should be able to remove this test since a strengthened
// version of the other saveAsHadoopFile* tests would prevent regressions for this issue.
- val tempDir = Files.createTempDir()
+ val tempDir = Utils.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 6500608bba..26435d8515 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -20,15 +20,13 @@ package org.apache.spark.streaming
import org.apache.spark.Logging
import org.apache.spark.util.Utils
-import java.io.File
-
/**
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
class FailureSuite extends TestSuiteBase with Logging {
- val directory = Utils.createTempDir().getAbsolutePath
+ val directory = Utils.createTempDir()
val numBatches = 30
override def batchDuration = Milliseconds(1000)
@@ -36,16 +34,16 @@ class FailureSuite extends TestSuiteBase with Logging {
override def useManualClock = false
override def afterFunction() {
- Utils.deleteRecursively(new File(directory))
+ Utils.deleteRecursively(directory)
super.afterFunction()
}
test("multiple failures with map") {
- MasterFailureTest.testMap(directory, numBatches, batchDuration)
+ MasterFailureTest.testMap(directory.getAbsolutePath, numBatches, batchDuration)
}
test("multiple failures with updateStateByKey") {
- MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
+ MasterFailureTest.testUpdateStateByKey(directory.getAbsolutePath, numBatches, batchDuration)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 818f551dbe..18a477f920 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -25,8 +25,6 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.{ActorSystem, Props}
-import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._
@@ -39,7 +37,7 @@ import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
-import org.apache.spark.util.{AkkaUtils, ManualClock}
+import org.apache.spark.util.{AkkaUtils, ManualClock, Utils}
import WriteAheadLogBasedBlockHandler._
import WriteAheadLogSuite._
@@ -76,7 +74,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")
- tempDirectory = Files.createTempDir()
+ tempDirectory = Utils.createTempDir()
manualClock.setTime(0)
}
@@ -93,10 +91,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
actorSystem.awaitTermination()
actorSystem = null
- if (tempDirectory != null && tempDirectory.exists()) {
- FileUtils.deleteDirectory(tempDirectory)
- tempDirectory = null
- }
+ Utils.deleteRecursively(tempDirectory)
}
test("BlockManagerBasedBlockHandler - store blocks") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index a3a0fd5187..42fad769f0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -24,8 +24,6 @@ import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.util.Random
-import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._
@@ -51,15 +49,12 @@ class ReceivedBlockTrackerSuite
before {
conf = new SparkConf().setMaster("local[2]").setAppName("ReceivedBlockTrackerSuite")
- checkpointDirectory = Files.createTempDir()
+ checkpointDirectory = Utils.createTempDir()
}
after {
allReceivedBlockTrackers.foreach { _.stop() }
- if (checkpointDirectory != null && checkpointDirectory.exists()) {
- FileUtils.deleteDirectory(checkpointDirectory)
- checkpointDirectory = null
- }
+ Utils.deleteRecursively(checkpointDirectory)
}
test("block addition, and block to batch allocation") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index e8c34a9ee4..aa20ad0b53 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.Semaphore
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import com.google.common.io.Files
import org.scalatest.concurrent.Timeouts
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -34,6 +33,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
+import org.apache.spark.util.Utils
/** Testsuite for testing the network receiver behavior */
class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
@@ -222,7 +222,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")
val batchDuration = Milliseconds(500)
- val tempDirectory = Files.createTempDir()
+ val tempDirectory = Utils.createTempDir()
val logDirectory1 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 0))
val logDirectory2 = new File(checkpointDirToLogDir(tempDirectory.getAbsolutePath, 1))
val allLogFiles1 = new mutable.HashSet[String]()
@@ -251,7 +251,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
}
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
- tempDirectory.deleteOnExit()
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
val receiverStream1 = ssc.receiverStream(receiver1)
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index b5a2db8f62..4194f36499 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -50,7 +50,7 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
if (hasBash) test(name)(fn) else ignore(name)(fn)
bashTest("shell script escaping") {
- val scriptFile = File.createTempFile("script.", ".sh")
+ val scriptFile = File.createTempFile("script.", ".sh", Utils.createTempDir())
val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6")
try {
val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ")