aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-10-12 13:08:42 -0700
committerReynold Xin <rxin@apache.org>2014-10-12 13:08:42 -0700
commit18bd67c24b081f113b34455692451571c466df92 (patch)
treef87ffcccf33a189ad4e848e984439ef642bf3f05 /core/src/main/scala/org
parent69c67abaa9d4bb4b95792d1862bc65efc764c194 (diff)
downloadspark-18bd67c24b081f113b34455692451571c466df92.tar.gz
spark-18bd67c24b081f113b34455692451571c466df92.tar.bz2
spark-18bd67c24b081f113b34455692451571c466df92.zip
[SPARK-3887] Send stracktrace in ConnectionManager error replies
When reporting that a remote error occurred, the ConnectionManager should also log the stacktrace of the remote exception. This PR accomplishes this by sending the remote exception's stacktrace as the payload in the "negative ACK / error message." Author: Josh Rosen <joshrosen@apache.org> Closes #2741 from JoshRosen/propagate-cm-exceptions-to-sender and squashes the following commits: b5366cc [Josh Rosen] Explicitly encode error messages using UTF-8. cef18b3 [Josh Rosen] [SPARK-3887] Send stracktrace in ConnectionManager error messages.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/Message.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala11
3 files changed, 24 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 6b00190c5e..9396b6ba84 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -748,9 +748,7 @@ private[nio] class ConnectionManager(
} catch {
case e: Exception => {
logError(s"Exception was thrown while processing message", e)
- val m = Message.createBufferMessage(bufferMessage.id)
- m.hasError = true
- ackMessage = Some(m)
+ ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id))
}
} finally {
sendMessage(connectionManagerId, ackMessage.getOrElse {
@@ -913,8 +911,12 @@ private[nio] class ConnectionManager(
}
case scala.util.Success(ackMessage) =>
if (ackMessage.hasError) {
+ val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
+ val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
+ errorMsgByteBuf.get(errorMsgBytes)
+ val errorMsg = new String(errorMsgBytes, "utf-8")
val e = new IOException(
- "sendMessageReliably failed with ACK that signalled a remote error")
+ s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
if (!promise.tryFailure(e)) {
logWarning("Ignore error because promise is completed", e)
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
index 0b874c2891..3ad04591da 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.util.Utils
private[nio] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
@@ -84,6 +85,19 @@ private[nio] object Message {
createBufferMessage(new Array[ByteBuffer](0), ackId)
}
+ /**
+ * Create a "negative acknowledgment" to notify a sender that an error occurred
+ * while processing its message. The exception's stacktrace will be formatted
+ * as a string, serialized into a byte array, and sent as the message payload.
+ */
+ def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
+ val exceptionString = Utils.exceptionString(exception)
+ val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
+ val errorMessage = createBufferMessage(serializedExceptionString, ackId)
+ errorMessage.hasError = true
+ errorMessage
+ }
+
def create(header: MessageChunkHeader): Message = {
val newMessage: Message = header.typ match {
case BUFFER_MESSAGE => new BufferMessage(header.id,
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index b389b9a202..5add4fc433 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -151,17 +151,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
} catch {
case e: Exception => {
logError("Exception handling buffer message", e)
- val errorMessage = Message.createBufferMessage(msg.id)
- errorMessage.hasError = true
- Some(errorMessage)
+ Some(Message.createErrorMessage(e, msg.id))
}
}
case otherMessage: Any =>
- logError("Unknown type message received: " + otherMessage)
- val errorMessage = Message.createBufferMessage(msg.id)
- errorMessage.hasError = true
- Some(errorMessage)
+ val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}"
+ logError(errorMsg)
+ Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id))
}
}