aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-13 11:54:54 -0800
committerBurak Yavuz <brkyvz@gmail.com>2017-02-13 11:54:54 -0800
commit3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529 (patch)
tree4b435e0f79c787b200162585a8e101f08fed0cce /sql/core/src
parent0417ce8787245791342d5609446f0e2fc4c219b1 (diff)
downloadspark-3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529.tar.gz
spark-3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529.tar.bz2
spark-3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529.zip
[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors
## What changes were proposed in this pull request? When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors. ## How was this patch tested? New unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16880 from zsxwing/delete-temp-checkpoint.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala26
3 files changed, 53 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ea3719421b..3149ef04f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.streaming
+import java.io.IOException
import java.util.UUID
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.locks.ReentrantLock
@@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
* Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
* [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
* and the results are committed transactionally to the given [[Sink]].
+ *
+ * @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
+ * errors
*/
class StreamExecution(
override val sparkSession: SparkSession,
override val name: String,
- checkpointRoot: String,
+ val checkpointRoot: String,
analyzedPlan: LogicalPlan,
val sink: Sink,
val trigger: Trigger,
val triggerClock: Clock,
- val outputMode: OutputMode)
+ val outputMode: OutputMode,
+ deleteCheckpointOnStop: Boolean)
extends StreamingQuery with ProgressReporter with Logging {
import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -213,6 +218,7 @@ class StreamExecution(
* has been posted to all the listeners.
*/
def start(): Unit = {
+ logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query checkpoint.")
microBatchThread.setDaemon(true)
microBatchThread.start()
startLatch.await() // Wait until thread started and QueryStart event has been posted
@@ -323,6 +329,20 @@ class StreamExecution(
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
+
+ // Delete the temp checkpoint only when the query didn't fail
+ if (deleteCheckpointOnStop && exception.isEmpty) {
+ val checkpointPath = new Path(checkpointRoot)
+ try {
+ val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ fs.delete(checkpointPath, true)
+ } catch {
+ case NonFatal(e) =>
+ // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions
+ // when we cannot delete them.
+ logWarning(s"Cannot delete $checkpointPath", e)
+ }
+ }
} finally {
terminationLatch.countDown()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 0b9406b027..38edb40dfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -195,6 +195,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
+ var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
@@ -203,6 +204,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
}.getOrElse {
if (useTempCheckpointLocation) {
+ // Delete the temp checkpoint when a query is being stopped without errors.
+ deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
@@ -244,7 +247,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
sink,
trigger,
triggerClock,
- outputMode))
+ outputMode,
+ deleteCheckpointOnStop))
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 30a957ef81..0470411a0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -670,4 +670,30 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter with Pr
}
}
}
+
+ test("temp checkpoint dir should be deleted if a query is stopped without errors") {
+ import testImplicits._
+ val query = MemoryStream[Int].toDS.writeStream.format("console").start()
+ val checkpointDir = new Path(
+ query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
+ val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(fs.exists(checkpointDir))
+ query.stop()
+ assert(!fs.exists(checkpointDir))
+ }
+
+ testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
+ import testImplicits._
+ val input = MemoryStream[Int]
+ val query = input.toDS.map(_ / 0).writeStream.format("console").start()
+ val checkpointDir = new Path(
+ query.asInstanceOf[StreamingQueryWrapper].streamingQuery.checkpointRoot)
+ val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
+ assert(fs.exists(checkpointDir))
+ input.addData(1)
+ intercept[StreamingQueryException] {
+ query.awaitTermination()
+ }
+ assert(fs.exists(checkpointDir))
+ }
}