From 6f7ecb0f2975d24a71e4240cf623f5bd8992bbeb Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 8 Nov 2016 15:08:09 -0800 Subject: [SPARK-18342] Make rename failures fatal in HDFSBackedStateStore ## What changes were proposed in this pull request? If the rename operation in the state store fails (`fs.rename` returns `false`), the StateStore should throw an exception and have the task retry. Currently if renames fail, nothing happens during execution immediately. However, you will observe that snapshot operations will fail, and then any attempt at recovery (executor failure / checkpoint recovery) also fails. ## How was this patch tested? Unit test Author: Burak Yavuz Closes #15804 from brkyvz/rename-state. --- .../state/HDFSBackedStateStoreProvider.scala | 6 ++-- .../streaming/state/StateStoreSuite.scala | 41 +++++++++++++++++++--- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index f1e7f1d113..808713161c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -254,7 +254,9 @@ private[state] class HDFSBackedStateStoreProvider( private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { synchronized { val finalDeltaFile = deltaFile(newVersion) - fs.rename(tempDeltaFile, finalDeltaFile) + if (!fs.rename(tempDeltaFile, finalDeltaFile)) { + throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") + } loadedMaps.put(newVersion, map) finalDeltaFile } @@ -525,7 +527,7 @@ private[state] class HDFSBackedStateStoreProvider( val deltaFiles = allFiles.filter { file => file.version > snapshotFile.version && file.version <= version - } + }.toList verify( deltaFiles.size == version - snapshotFile.version, s"Unexpected list of delta files for version $version for $this: $deltaFiles" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index fcf300b3c8..504a265161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.File +import java.io.{File, IOException} +import java.net.URI import scala.collection.mutable import scala.util.Random import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -455,6 +456,18 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth } } + test("SPARK-18342: commit fails when rename fails") { + import RenameReturnsFalseFileSystem._ + val dir = scheme + "://" + Utils.createDirectory(tempDir, Random.nextString(5)).toString + val conf = new Configuration() + conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName) + val provider = newStoreProvider(dir = dir, hadoopConf = conf) + val store = provider.getStore(0) + put(store, "a", 0) + val e = intercept[IllegalStateException](store.commit()) + assert(e.getCause.getMessage.contains("Failed to rename")) + } + def getDataFromFiles( provider: HDFSBackedStateStoreProvider, version: Int = -1): Set[(String, Int)] = { @@ -524,9 +537,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth def newStoreProvider( opId: Long = Random.nextLong, partition: Int = 0, - minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get + minDeltasForSnapshot: Int = SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, + dir: String = Utils.createDirectory(tempDir, Random.nextString(5)).toString, + hadoopConf: Configuration = new Configuration() ): HDFSBackedStateStoreProvider = { - val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString val sqlConf = new SQLConf() sqlConf.setConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT, minDeltasForSnapshot) new HDFSBackedStateStoreProvider( @@ -534,7 +548,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth keySchema, valueSchema, new StateStoreConf(sqlConf), - new Configuration()) + hadoopConf) } def remove(store: StateStore, condition: String => Boolean): Unit = { @@ -598,3 +612,20 @@ private[state] object StateStoreSuite { }}.toSet } } + +/** + * Fake FileSystem to test that the StateStore throws an exception while committing the + * delta file, when `fs.rename` returns `false`. + */ +class RenameReturnsFalseFileSystem extends RawLocalFileSystem { + import RenameReturnsFalseFileSystem._ + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def rename(src: Path, dst: Path): Boolean = false +} + +object RenameReturnsFalseFileSystem { + val scheme = s"StateStoreSuite${math.abs(Random.nextInt)}fs" +} -- cgit v1.2.3