aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-31 12:17:25 -0700
committerAndrew Or <andrew@databricks.com>2016-03-31 12:17:25 -0700
commite785402826dcd984d9312470464714ba6c908a49 (patch)
treef1c4bf7418c03a24860e7bcd710c9f0ce72d0ae4
parent3cfbeb70b1feb1f3a8c4d0b2d2f3715a356c80f2 (diff)
downloadspark-e785402826dcd984d9312470464714ba6c908a49.tar.gz
spark-e785402826dcd984d9312470464714ba6c908a49.tar.bz2
spark-e785402826dcd984d9312470464714ba6c908a49.zip
[SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the `java.io.tmpdir` folder
## What changes were proposed in this pull request? If I press `CTRL-C` when running these tests, the temp files will be left in `sql/core` folder and I need to delete them manually. It's annoying. This PR just moves the temp files to the `java.io.tmpdir` folder and add a name prefix for them. ## How was this patch tested? Existing Jenkins tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #12093 from zsxwing/temp-file.
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala8
6 files changed, 24 insertions, 22 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 4ca739450c..b5be7ef47e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -265,7 +265,7 @@ trait StreamTest extends QueryTest with Timeouts {
}
val testThread = Thread.currentThread()
- val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
+ val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
try {
startedTest.foreach { action =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index 54ce98d195..29bd3e018e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
@volatile var query: StreamExecution = null
try {
val df = ds.toDF
- val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
+ val metadataRoot =
+ Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
query = sqlContext
.streams
.startQuery(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index c1bab9b577..102473d7d0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -69,7 +69,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._
- private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
+ private def newMetadataDir =
+ Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
after {
sqlContext.streams.active.foreach(_.stop())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 7f31611383..8cf5dedabc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -29,8 +29,8 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
- val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
- val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
val query =
df.write
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 89de15acf5..054f5c9fa2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -202,8 +202,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from text files") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -224,8 +224,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from json files") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -258,8 +258,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from json files with inferring schema") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
@@ -279,8 +279,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from parquet files") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema))
val filtered = fileSource.toDF().filter($"value" contains "keep")
@@ -301,7 +301,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("file stream source without schema") {
- val src = Utils.createTempDir("streaming.src")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
// Only "text" doesn't need a schema
createFileStreamSource("text", src.getCanonicalPath)
@@ -318,8 +318,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("fault tolerance") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
@@ -346,8 +346,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
import testImplicits._
test("file source stress test") {
- val src = Utils.createTempDir("streaming.src")
- val tmp = Utils.createTempDir("streaming.tmp")
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val ds = textSource.toDS[String]().map(_.toInt + 1)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 5a1bfb3a00..3916430cdf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -43,10 +43,10 @@ class FileStressSuite extends StreamTest with SharedSQLContext {
test("fault tolerance stress test") {
val numRecords = 10000
- val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
- val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
- val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
- val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+ val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
+ val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
@volatile
var continue = true