aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/Message.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala11
3 files changed, 24 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 6b00190c5e..9396b6ba84 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -748,9 +748,7 @@ private[nio] class ConnectionManager(
} catch {
case e: Exception => {
logError(s"Exception was thrown while processing message", e)
- val m = Message.createBufferMessage(bufferMessage.id)
- m.hasError = true
- ackMessage = Some(m)
+ ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id))
}
} finally {
sendMessage(connectionManagerId, ackMessage.getOrElse {
@@ -913,8 +911,12 @@ private[nio] class ConnectionManager(
}
case scala.util.Success(ackMessage) =>
if (ackMessage.hasError) {
+ val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
+ val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
+ errorMsgByteBuf.get(errorMsgBytes)
+ val errorMsg = new String(errorMsgBytes, "utf-8")
val e = new IOException(
- "sendMessageReliably failed with ACK that signalled a remote error")
+ s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
if (!promise.tryFailure(e)) {
logWarning("Ignore error because promise is completed", e)
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
index 0b874c2891..3ad04591da 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.util.Utils
private[nio] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
@@ -84,6 +85,19 @@ private[nio] object Message {
createBufferMessage(new Array[ByteBuffer](0), ackId)
}
+ /**
+ * Create a "negative acknowledgment" to notify a sender that an error occurred
+ * while processing its message. The exception's stacktrace will be formatted
+ * as a string, serialized into a byte array, and sent as the message payload.
+ */
+ def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
+ val exceptionString = Utils.exceptionString(exception)
+ val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
+ val errorMessage = createBufferMessage(serializedExceptionString, ackId)
+ errorMessage.hasError = true
+ errorMessage
+ }
+
def create(header: MessageChunkHeader): Message = {
val newMessage: Message = header.typ match {
case BUFFER_MESSAGE => new BufferMessage(header.id,
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index b389b9a202..5add4fc433 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -151,17 +151,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
} catch {
case e: Exception => {
logError("Exception handling buffer message", e)
- val errorMessage = Message.createBufferMessage(msg.id)
- errorMessage.hasError = true
- Some(errorMessage)
+ Some(Message.createErrorMessage(e, msg.id))
}
}
case otherMessage: Any =>
- logError("Unknown type message received: " + otherMessage)
- val errorMessage = Message.createBufferMessage(msg.id)
- errorMessage.hasError = true
- Some(errorMessage)
+ val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}"
+ logError(errorMsg)
+ Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id))
}
}