diff options
author | GuoQiang Li <witgo@qq.com> | 2014-10-27 23:31:46 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-10-27 23:31:46 -0700 |
commit | 7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42 (patch) | |
tree | ece7756ea2179b68edaf60a6799d67b701b538c5 | |
parent | 0c34fa5b4b3c1c20d7a2d7df3a8ae757b532dd32 (diff) | |
download | spark-7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42.tar.gz spark-7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42.tar.bz2 spark-7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42.zip |
[SPARK-4064]NioBlockTransferService.fetchBlocks may cause spark to hang.
cc @rxin
Author: GuoQiang Li <witgo@qq.com>
Closes #2929 from witgo/SPARK-4064 and squashes the following commits:
20110f2 [GuoQiang Li] Modify the exception msg
3425225 [GuoQiang Li] review commits
2b07e49 [GuoQiang Li] If we create a lot of big broadcast variables, Spark may hang
-rw-r--r-- | core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala | 25 |
1 files changed, 15 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index 5add4fc433..e3113205be 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -95,16 +95,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa future.onSuccess { case message => val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) - - for (blockMessage <- blockMessageArray) { - if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { - listener.onBlockFetchFailure( - new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId")) - } else { - val blockId = blockMessage.getId - val networkSize = blockMessage.getData.limit() - listener.onBlockFetchSuccess( - blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData)) + // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty. + if (blockMessageArray.isEmpty) { + listener.onBlockFetchFailure( + new SparkException(s"Received empty message from $cmId")) + } else { + for (blockMessage <- blockMessageArray) { + val msgType = blockMessage.getType + if (msgType != BlockMessage.TYPE_GOT_BLOCK) { + listener.onBlockFetchFailure( + new SparkException(s"Unexpected message ${msgType} received from $cmId")) + } else { + val blockId = blockMessage.getId + listener.onBlockFetchSuccess( + blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData)) + } } } }(cm.futureExecContext) |