aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-11-08 15:08:09 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-11-08 15:08:09 -0800
commit6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb (patch)
treea018e0871a7757de11c8b1e943f8d0749d509675
parentb6de0c98c70960a97b07615b0b08fbd8f900fbe7 (diff)
downloadspark-6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb.tar.gz
spark-6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb.tar.bz2
spark-6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb.zip
[SPARK-18342] Make rename failures fatal in HDFSBackedStateStore
## What changes were proposed in this pull request? If the rename operation in the state store fails (`fs.rename` returns `false`), the StateStore should throw an exception and have the task retry. Currently if renames fail, nothing happens during execution immediately. However, you will observe that snapshot operations will fail, and then any attempt at recovery (executor failure / checkpoint recovery) also fails. ## How was this patch tested? Unit test Author: Burak Yavuz <brkyvz@gmail.com> Closes #15804 from brkyvz/rename-state.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala41
2 files changed, 40 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index f1e7f1d113..808713161c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider(
private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
synchronized {
val finalDeltaFile = deltaFile(newVersion)
- fs.rename(tempDeltaFile, finalDeltaFile)
+ if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
+ throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
+ }
loadedMaps.put(newVersion, map)
finalDeltaFile
}
@@ -525,7 +527,7 @@ private[state] class HDFSBackedStateStoreProvider(
val deltaFiles = allFiles.filter { file =>
file.version > snapshotFile.version && file.version <= version
- }
+ }.toList
verify(
deltaFiles.size == version - snapshotFile.version,
s"Unexpected list of delta files for version $version for $this: $deltaFiles"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index fcf300b3c8..504a265161 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -17,13 +17,14 @@
package org.apache.spark.sql.execution.streaming.state
-import java.io.File
+import java.io.{File, IOException}
+import java.net.URI
import scala.collection.mutable
import scala.util.Random
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
}
}
+ test("SPARK-18342: commit fails when rename fails") {
+ import RenameReturnsFalseFileSystem._
+ val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString
+ val conf = new Configuration()
+ conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
+ val provider = newStoreProvider(dir = dir, hadoopConf = conf)
+ val store = provider.getStore(0)
+ put(store, "a", 0)
+ val e = intercept[IllegalStateException](store.commit())
+ assert(e.getCause.getMessage.contains("Failed to rename"))
+ }
+
def getDataFromFiles(
provider: HDFSBackedStateStoreProvider,
version: Int = -1): Set[(String, Int)] = {
@@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
def newStoreProvider(
opId: Long = Random.nextLong,
partition: Int = 0,
- minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get
+ minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
+ dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString,
+ hadoopConf: Configuration = new Configuration()
): HDFSBackedStateStoreProvider = {
- val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
val sqlConf = new SQLConf()
sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot)
new HDFSBackedStateStoreProvider(
@@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
keySchema,
valueSchema,
new StateStoreConf(sqlConf),
- new Configuration())
+ hadoopConf)
}
def remove(store: StateStore, condition: String => Boolean): Unit = {
@@ -598,3 +612,20 @@ private[state] object StateStoreSuite {
}}.toSet
}
}
+
+/**
+ * Fake FileSystem to test that the StateStore throws an exception while committing the
+ * delta file, when `fs.rename` returns `false`.
+ */
+class RenameReturnsFalseFileSystem extends RawLocalFileSystem {
+ import RenameReturnsFalseFileSystem._
+ override def getUri: URI = {
+ URI.create(s"$scheme:///")
+ }
+
+ override def rename(src: Path, dst: Path): Boolean = false
+}
+
+object RenameReturnsFalseFileSystem {
+ val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs"
+}