aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2014-10-27 23:31:46 -0700
committerReynold Xin <rxin@databricks.com>2014-10-27 23:31:46 -0700
commit7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42 (patch)
treeece7756ea2179b68edaf60a6799d67b701b538c5
parent0c34fa5b4b3c1c20d7a2d7df3a8ae757b532dd32 (diff)
downloadspark-7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42.tar.gz
spark-7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42.tar.bz2
spark-7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42.zip
[SPARK-4064]NioBlockTransferService.fetchBlocks may cause spark to hang.
cc @rxin Author: GuoQiang Li <witgo@qq.com> Closes #2929 from witgo/SPARK-4064 and squashes the following commits: 20110f2 [GuoQiang Li] Modify the exception msg 3425225 [GuoQiang Li] review commits 2b07e49 [GuoQiang Li] If we create a lot of big broadcast variables, Spark may hang
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala25
1 files changed, 15 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index 5add4fc433..e3113205be 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -95,16 +95,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
future.onSuccess { case message =>
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
-
- for (blockMessage <- blockMessageArray) {
- if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
- listener.onBlockFetchFailure(
- new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId"))
- } else {
- val blockId = blockMessage.getId
- val networkSize = blockMessage.getData.limit()
- listener.onBlockFetchSuccess(
- blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
+ // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
+ if (blockMessageArray.isEmpty) {
+ listener.onBlockFetchFailure(
+ new SparkException(s"Received empty message from $cmId"))
+ } else {
+ for (blockMessage <- blockMessageArray) {
+ val msgType = blockMessage.getType
+ if (msgType != BlockMessage.TYPE_GOT_BLOCK) {
+ listener.onBlockFetchFailure(
+ new SparkException(s"Unexpected message ${msgType} received from $cmId"))
+ } else {
+ val blockId = blockMessage.getId
+ listener.onBlockFetchSuccess(
+ blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
+ }
}
}
}(cm.futureExecContext)