diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-03-31 12:17:25 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-03-31 12:17:25 -0700 |
commit | e785402826dcd984d9312470464714ba6c908a49 (patch) | |
tree | f1c4bf7418c03a24860e7bcd710c9f0ce72d0ae4 | |
parent | 3cfbeb70b1feb1f3a8c4d0b2d2f3715a356c80f2 (diff) | |
download | spark-e785402826dcd984d9312470464714ba6c908a49.tar.gz spark-e785402826dcd984d9312470464714ba6c908a49.tar.bz2 spark-e785402826dcd984d9312470464714ba6c908a49.zip |
[SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the `java.io.tmpdir` folder
## What changes were proposed in this pull request?
If I press `CTRL-C` when running these tests, the temp files will be left in `sql/core` folder and I need to delete them manually. It's annoying. This PR just moves the temp files to the `java.io.tmpdir` folder and add a name prefix for them.
## How was this patch tested?
Existing Jenkins tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #12093 from zsxwing/temp-file.
6 files changed, 24 insertions, 22 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 4ca739450c..b5be7ef47e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -265,7 +265,7 @@ trait StreamTest extends QueryTest with Timeouts { } val testThread = Thread.currentThread() - val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath + val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath try { startedTest.foreach { action => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index 54ce98d195..29bd3e018e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with @volatile var query: StreamExecution = null try { val df = ds.toDF - val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath + val metadataRoot = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath query = sqlContext .streams .startQuery( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index c1bab9b577..102473d7d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -69,7 +69,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ - private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath after { sqlContext.streams.active.foreach(_.stop()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 7f31611383..8cf5dedabc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -29,8 +29,8 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { val inputData = MemoryStream[Int] val df = inputData.toDF() - val outputDir = Utils.createTempDir("stream.output").getCanonicalPath - val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath val query = df.write diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 89de15acf5..054f5c9fa2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -202,8 +202,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from text files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -224,8 +224,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from json files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema)) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -258,8 +258,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from json files with inferring schema") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") // Add a file so that we can infer its schema stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}") @@ -279,8 +279,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("read from parquet files") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema)) val filtered = fileSource.toDF().filter($"value" contains "keep") @@ -301,7 +301,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("file stream source without schema") { - val src = Utils.createTempDir("streaming.src") + val src = Utils.createTempDir(namePrefix = "streaming.src") // Only "text" doesn't need a schema createFileStreamSource("text", src.getCanonicalPath) @@ -318,8 +318,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("fault tolerance") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val filtered = textSource.toDF().filter($"value" contains "keep") @@ -346,8 +346,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ import testImplicits._ test("file source stress test") { - val src = Utils.createTempDir("streaming.src") - val tmp = Utils.createTempDir("streaming.tmp") + val src = Utils.createTempDir(namePrefix = "streaming.src") + val tmp = Utils.createTempDir(namePrefix = "streaming.tmp") val textSource = createFileStreamSource("text", src.getCanonicalPath) val ds = textSource.toDS[String]().map(_.toInt + 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 5a1bfb3a00..3916430cdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -43,10 +43,10 @@ class FileStressSuite extends StreamTest with SharedSQLContext { test("fault tolerance stress test") { val numRecords = 10000 - val inputDir = Utils.createTempDir("stream.input").getCanonicalPath - val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath - val outputDir = Utils.createTempDir("stream.output").getCanonicalPath - val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath + val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath + val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath @volatile var continue = true |