aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-13 11:54:54 -0800
committerBurak Yavuz <brkyvz@gmail.com>2017-02-13 11:54:54 -0800
commit3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529 (patch)
tree4b435e0f79c787b200162585a8e101f08fed0cce /sql/core/src/test
parent0417ce8787245791342d5609446f0e2fc4c219b1 (diff)
downloadspark-3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529.tar.gz
spark-3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529.tar.bz2
spark-3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529.zip
[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors
## What changes were proposed in this pull request? When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors. ## How was this patch tested? New unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16880 from zsxwing/delete-temp-checkpoint.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala26
1 files changed, 26 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 30a957ef81..0470411a0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -670,4 +670,30 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with Pr
}
}
}
+
+ test("temp checkpoint dir should be deleted if a query is stopped without errors") {
+ import testImplicits._
+ val query = MemoryStream[Int].toDS.writeStream.format("console").start()
+ val checkpointDir = new Path(
+ query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
+ val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(fs.exists(checkpointDir))
+ query.stop()
+ assert(!fs.exists(checkpointDir))
+ }
+
+ testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
+ import testImplicits._
+ val input = MemoryStream[Int]
+ val query = input.toDS.map(_ / 0).writeStream.format("console").start()
+ val checkpointDir = new Path(
+ query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
+ val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(fs.exists(checkpointDir))
+ input.addData(1)
+ intercept[StreamingQueryException] {
+ query.awaitTermination()
+ }
+ assert(fs.exists(checkpointDir))
+ }
}