aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-03-03 17:20:07 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-03-03 17:20:07 -0800
commit04fb81ffe51df16fd950253223041695e602144e (patch)
tree17d774b1b888c821888700b92a0a412443302cbd /core/src
parent6cf4be4b3991d30b2902f895a7082361c736e88d (diff)
parent44134e12bb5cf45b16ca48d68e6a509bab69d256 (diff)
downloadspark-04fb81ffe51df16fd950253223041695e602144e.tar.gz
spark-04fb81ffe51df16fd950253223041695e602144e.tar.bz2
spark-04fb81ffe51df16fd950253223041695e602144e.zip
Merge pull request #506 from rxin/spark-706
Fixed SPARK-706: Failures in block manager put leads to read task hanging.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala151
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala20
2 files changed, 120 insertions, 51 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 4964060b1c..5849045a55 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -40,21 +40,36 @@ class BlockManager(
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
var pending: Boolean = true
var size: Long = -1L
+ var failed: Boolean = false
- /** Wait for this BlockInfo to be marked as ready (i.e. block is finished writing) */
- def waitForReady() {
+ /**
+ * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing).
+ * Return true if the block is available, false otherwise.
+ */
+ def waitForReady(): Boolean = {
if (pending) {
synchronized {
while (pending) this.wait()
}
}
+ !failed
}
/** Mark this BlockInfo as ready (i.e. block is finished writing) */
def markReady(sizeInBytes: Long) {
- pending = false
- size = sizeInBytes
synchronized {
+ pending = false
+ failed = false
+ size = sizeInBytes
+ this.notifyAll()
+ }
+ }
+
+ /** Mark this BlockInfo as ready but failed */
+ def markFailure() {
+ synchronized {
+ failed = true
+ pending = false
this.notifyAll()
}
}
@@ -277,7 +292,14 @@ class BlockManager(
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
- info.waitForReady() // In case the block is still being put() by another thread
+
+ // In the another thread is writing the block, wait for it to become ready.
+ if (!info.waitForReady()) {
+ // If we get here, the block write failed.
+ logWarning("Block " + blockId + " was marked as failure.")
+ return None
+ }
+
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -362,7 +384,14 @@ class BlockManager(
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
- info.waitForReady() // In case the block is still being put() by another thread
+
+ // In the another thread is writing the block, wait for it to become ready.
+ if (!info.waitForReady()) {
+ // If we get here, the block write failed.
+ logWarning("Block " + blockId + " was marked as failure.")
+ return None
+ }
+
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@@ -423,12 +452,11 @@ class BlockManager(
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port))
if (data != null) {
- logDebug("Data is not null: " + data)
return Some(dataDeserialize(blockId, data))
}
- logDebug("Data is null")
+ logDebug("The value of block " + blockId + " is null")
}
- logDebug("Data not found")
+ logDebug("Block " + blockId + " not found")
return None
}
@@ -474,9 +502,8 @@ class BlockManager(
}
val oldBlock = blockInfo.get(blockId).orNull
- if (oldBlock != null) {
+ if (oldBlock != null && oldBlock.waitForReady()) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
- oldBlock.waitForReady()
return oldBlock.size
}
@@ -504,31 +531,45 @@ class BlockManager(
logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- if (level.useMemory) {
- // Save it just to memory first, even if it also has useDisk set to true; we will later
- // drop it to disk if the memory store can't hold it.
- val res = memoryStore.putValues(blockId, values, level, true)
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case Left(newIterator) => valuesAfterPut = newIterator
- }
- } else {
- // Save directly to disk.
- val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
- val res = diskStore.putValues(blockId, values, level, askForBytes)
- size = res.size
- res.data match {
- case Right(newBytes) => bytesAfterPut = newBytes
- case _ =>
+ try {
+ if (level.useMemory) {
+ // Save it just to memory first, even if it also has useDisk set to true; we will later
+ // drop it to disk if the memory store can't hold it.
+ val res = memoryStore.putValues(blockId, values, level, true)
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
+ case Left(newIterator) => valuesAfterPut = newIterator
+ }
+ } else {
+ // Save directly to disk.
+ // Don't get back the bytes unless we replicate them.
+ val askForBytes = level.replication > 1
+ val res = diskStore.putValues(blockId, values, level, askForBytes)
+ size = res.size
+ res.data match {
+ case Right(newBytes) => bytesAfterPut = newBytes
+ case _ =>
+ }
}
- }
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
- myInfo.markReady(size)
- if (tellMaster) {
- reportBlockStatus(blockId, myInfo)
+ // Now that the block is in either the memory or disk store, let other threads read it,
+ // and tell the master about it.
+ myInfo.markReady(size)
+ if (tellMaster) {
+ reportBlockStatus(blockId, myInfo)
+ }
+ } catch {
+ // If we failed at putting the block to memory/disk, notify other possible readers
+ // that it has failed, and then remove it from the block info map.
+ case e: Exception => {
+ // Note that the remove must happen before markFailure otherwise another thread
+ // could've inserted a new BlockInfo before we remove it.
+ blockInfo.remove(blockId)
+ myInfo.markFailure()
+ logWarning("Putting block " + blockId + " failed", e)
+ throw e
+ }
}
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@@ -598,28 +639,38 @@ class BlockManager(
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
- if (level.useMemory) {
- // Store it only in memory at first, even if useDisk is also set to true
- bytes.rewind()
- memoryStore.putBytes(blockId, bytes, level)
- } else {
- bytes.rewind()
- diskStore.putBytes(blockId, bytes, level)
- }
+ try {
+ if (level.useMemory) {
+ // Store it only in memory at first, even if useDisk is also set to true
+ bytes.rewind()
+ memoryStore.putBytes(blockId, bytes, level)
+ } else {
+ bytes.rewind()
+ diskStore.putBytes(blockId, bytes, level)
+ }
- // Now that the block is in either the memory or disk store, let other threads read it,
- // and tell the master about it.
- myInfo.markReady(bytes.limit)
- if (tellMaster) {
- reportBlockStatus(blockId, myInfo)
+ // Now that the block is in either the memory or disk store, let other threads read it,
+ // and tell the master about it.
+ myInfo.markReady(bytes.limit)
+ if (tellMaster) {
+ reportBlockStatus(blockId, myInfo)
+ }
+ } catch {
+ // If we failed at putting the block to memory/disk, notify other possible readers
+ // that it has failed, and then remove it from the block info map.
+ case e: Exception => {
+ // Note that the remove must happen before markFailure otherwise another thread
+ // could've inserted a new BlockInfo before we remove it.
+ blockInfo.remove(blockId)
+ myInfo.markFailure()
+ logWarning("Putting block " + blockId + " failed", e)
+ throw e
+ }
}
}
// If replication had started, then wait for it to finish
if (level.replication > 1) {
- if (replicationFuture == null) {
- throw new Exception("Unexpected")
- }
Await.ready(replicationFuture, Duration.Inf)
}
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index 2d177bbf67..61e793b31f 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -12,6 +12,7 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers._
import org.scalatest.time.SpanSugar._
+import spark.JavaSerializer
import spark.KryoSerializer
import spark.SizeEstimator
import spark.util.ByteBufferInputStream
@@ -262,7 +263,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
t1.join()
t2.join()
t3.join()
-
+
store.dropFromMemory("a1", null)
store.dropFromMemory("a2", null)
store.waitForAsyncReregister()
@@ -582,4 +583,21 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
System.clearProperty("spark.rdd.compress")
}
}
+
+ test("block store put failure") {
+ // Use Java serializer so we can create an unserializable error.
+ store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200)
+
+ // The put should fail since a1 is not serializable.
+ class UnserializableClass
+ val a1 = new UnserializableClass
+ intercept[java.io.NotSerializableException] {
+ store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+ }
+
+ // Make sure get a1 doesn't hang and returns None.
+ failAfter(1 second) {
+ assert(store.getSingle("a1") == None, "a1 should not be in store")
+ }
+ }
}