aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/Message.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala6
4 files changed, 28 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 6b00190c5e..9396b6ba84 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -748,9 +748,7 @@ private[nio] class ConnectionManager(
} catch {
case e: Exception => {
logError(s"Exception was thrown while processing message", e)
- val m = Message.createBufferMessage(bufferMessage.id)
- m.hasError = true
- ackMessage = Some(m)
+ ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id))
}
} finally {
sendMessage(connectionManagerId, ackMessage.getOrElse {
@@ -913,8 +911,12 @@ private[nio] class ConnectionManager(
}
case scala.util.Success(ackMessage) =>
if (ackMessage.hasError) {
+ val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
+ val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
+ errorMsgByteBuf.get(errorMsgBytes)
+ val errorMsg = new String(errorMsgBytes, "utf-8")
val e = new IOException(
- "sendMessageReliably failed with ACK that signalled a remote error")
+ s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
if (!promise.tryFailure(e)) {
logWarning("Ignore error because promise is completed", e)
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
index 0b874c2891..3ad04591da 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.util.Utils
private[nio] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
@@ -84,6 +85,19 @@ private[nio] object Message {
createBufferMessage(new Array[ByteBuffer](0), ackId)
}
+ /**
+ * Create a "negative acknowledgment" to notify a sender that an error occurred
+ * while processing its message. The exception's stacktrace will be formatted
+ * as a string, serialized into a byte array, and sent as the message payload.
+ */
+ def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
+ val exceptionString = Utils.exceptionString(exception)
+ val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
+ val errorMessage = createBufferMessage(serializedExceptionString, ackId)
+ errorMessage.hasError = true
+ errorMessage
+ }
+
def create(header: MessageChunkHeader): Message = {
val newMessage: Message = header.typ match {
case BUFFER_MESSAGE => new BufferMessage(header.id,
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index b389b9a202..5add4fc433 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -151,17 +151,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
} catch {
case e: Exception => {
logError("Exception handling buffer message", e)
- val errorMessage = Message.createBufferMessage(msg.id)
- errorMessage.hasError = true
- Some(errorMessage)
+ Some(Message.createErrorMessage(e, msg.id))
}
}
case otherMessage: Any =>
- logError("Unknown type message received: " + otherMessage)
- val errorMessage = Message.createBufferMessage(msg.id)
- errorMessage.hasError = true
- Some(errorMessage)
+ val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}"
+ logError(errorMsg)
+ Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id))
}
}
diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
index 9f49587cdc..b70734dfe3 100644
--- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
@@ -27,6 +27,7 @@ import scala.language.postfixOps
import org.scalatest.FunSuite
import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.util.Utils
/**
* Test the ConnectionManager with various security settings.
@@ -236,7 +237,7 @@ class ConnectionManagerSuite extends FunSuite {
val manager = new ConnectionManager(0, conf, securityManager)
val managerServer = new ConnectionManager(0, conf, securityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- throw new Exception
+ throw new Exception("Custom exception text")
})
val size = 10 * 1024 * 1024
@@ -246,9 +247,10 @@ class ConnectionManagerSuite extends FunSuite {
val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
- intercept[IOException] {
+ val exception = intercept[IOException] {
Await.result(future, 1 second)
}
+ assert(Utils.exceptionString(exception).contains("Custom exception text"))
manager.stop()
managerServer.stop()