From 18bd67c24b081f113b34455692451571c466df92 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 12 Oct 2014 13:08:42 -0700 Subject: [SPARK-3887] Send stracktrace in ConnectionManager error replies When reporting that a remote error occurred, the ConnectionManager should also log the stacktrace of the remote exception. This PR accomplishes this by sending the remote exception's stacktrace as the payload in the "negative ACK / error message." Author: Josh Rosen Closes #2741 from JoshRosen/propagate-cm-exceptions-to-sender and squashes the following commits: b5366cc [Josh Rosen] Explicitly encode error messages using UTF-8. cef18b3 [Josh Rosen] [SPARK-3887] Send stracktrace in ConnectionManager error messages. --- .../org/apache/spark/network/nio/ConnectionManager.scala | 10 ++++++---- .../main/scala/org/apache/spark/network/nio/Message.scala | 14 ++++++++++++++ .../apache/spark/network/nio/NioBlockTransferService.scala | 11 ++++------- 3 files changed, 24 insertions(+), 11 deletions(-) (limited to 'core/src/main/scala/org') diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 6b00190c5e..9396b6ba84 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -748,9 +748,7 @@ private[nio] class ConnectionManager( } catch { case e: Exception => { logError(s"Exception was thrown while processing message", e) - val m = Message.createBufferMessage(bufferMessage.id) - m.hasError = true - ackMessage = Some(m) + ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id)) } } finally { sendMessage(connectionManagerId, ackMessage.getOrElse { @@ -913,8 +911,12 @@ private[nio] class ConnectionManager( } case scala.util.Success(ackMessage) => if (ackMessage.hasError) { + val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head + val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit()) + errorMsgByteBuf.get(errorMsgBytes) + val errorMsg = new String(errorMsgBytes, "utf-8") val e = new IOException( - "sendMessageReliably failed with ACK that signalled a remote error") + s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg") if (!promise.tryFailure(e)) { logWarning("Ignore error because promise is completed", e) } diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala index 0b874c2891..3ad04591da 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer +import org.apache.spark.util.Utils private[nio] abstract class Message(val typ: Long, val id: Int) { var senderAddress: InetSocketAddress = null @@ -84,6 +85,19 @@ private[nio] object Message { createBufferMessage(new Array[ByteBuffer](0), ackId) } + /** + * Create a "negative acknowledgment" to notify a sender that an error occurred + * while processing its message. The exception's stacktrace will be formatted + * as a string, serialized into a byte array, and sent as the message payload. + */ + def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = { + val exceptionString = Utils.exceptionString(exception) + val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8")) + val errorMessage = createBufferMessage(serializedExceptionString, ackId) + errorMessage.hasError = true + errorMessage + } + def create(header: MessageChunkHeader): Message = { val newMessage: Message = header.typ match { case BUFFER_MESSAGE => new BufferMessage(header.id, diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala index b389b9a202..5add4fc433 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala @@ -151,17 +151,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa } catch { case e: Exception => { logError("Exception handling buffer message", e) - val errorMessage = Message.createBufferMessage(msg.id) - errorMessage.hasError = true - Some(errorMessage) + Some(Message.createErrorMessage(e, msg.id)) } } case otherMessage: Any => - logError("Unknown type message received: " + otherMessage) - val errorMessage = Message.createBufferMessage(msg.id) - errorMessage.hasError = true - Some(errorMessage) + val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}" + logError(errorMsg) + Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id)) } } -- cgit v1.2.3