aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorguifeng <guifengleaf@gmail.com>2017-03-02 21:19:29 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-03-02 21:19:29 -0800
commite24f21b5f8365ed25346e986748b393e0b4be25c (patch)
tree800ceebcd56c5b1b24896685f837c12c7447bacf
parentf37bb143022ea10107877c80c5c73bd77aeda7ff (diff)
downloadspark-e24f21b5f8365ed25346e986748b393e0b4be25c.tar.gz
spark-e24f21b5f8365ed25346e986748b393e0b4be25c.tar.bz2
spark-e24f21b5f8365ed25346e986748b393e0b4be25c.zip
[SPARK-19779][SS] Delete needless tmp file after restart structured streaming job
## What changes were proposed in this pull request? [SPARK-19779](https://issues.apache.org/jira/browse/SPARK-19779) The PR (https://github.com/apache/spark/pull/17012) can to fix restart a Structured Streaming application using hdfs as fileSystem, but also exist a problem that a tmp file of delta file is still reserved in hdfs. And Structured Streaming don't delete the tmp file generated when restart streaming job in future. ## How was this patch tested? unit tests Author: guifeng <guifengleaf@gmail.com> Closes #17124 from gf53520/SPARK-19779.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala7
2 files changed, 10 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 2d29940eb8..ab1204a750 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -283,7 +283,9 @@ private[state] class HDFSBackedStateStoreProvider(
// semantically correct because Structured Streaming requires rerunning a batch should
// generate the same output. (SPARK-19677)
// scalastyle:on
- if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
+ if (fs.exists(finalDeltaFile)) {
+ fs.delete(tempDeltaFile, true)
+ } else if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
}
loadedMaps.put(newVersion, map)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index dc4e935f3d..e848f74e31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming.state
import java.io.{File, IOException}
import java.net.URI
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random
+import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -293,6 +295,11 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
val provider = newStoreProvider(hadoopConf = conf)
provider.getStore(0).commit()
provider.getStore(0).commit()
+
+ // Verify we don't leak temp files
+ val tempFiles = FileUtils.listFiles(new File(provider.id.checkpointLocation),
+ null, true).asScala.filter(_.getName.startsWith("temp-"))
+ assert(tempFiles.isEmpty)
}
test("corrupted file handling") {