aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2014-08-16 14:15:58 -0700
committerJosh Rosen <joshrosen@apache.org>2014-08-16 14:15:58 -0700
commit76fa0eaf515fd6771cdd69422b1259485debcae5 (patch)
tree44964bbdc24431aa7de67e9c720c7494ae1eefd4 /core
parent4bdfaa16fce399bd97c98858151246b3b02f350f (diff)
downloadspark-76fa0eaf515fd6771cdd69422b1259485debcae5.tar.gz
spark-76fa0eaf515fd6771cdd69422b1259485debcae5.tar.bz2
spark-76fa0eaf515fd6771cdd69422b1259485debcae5.zip
[SPARK-2677] BasicBlockFetchIterator#next can wait forever
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #1632 from sarutak/SPARK-2677 and squashes the following commits: cddbc7b [Kousuke Saruta] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message d3bd2a8 [Kousuke Saruta] Modified configuration.md for spark.core.connection.ack.timeout e85f88b [Kousuke Saruta] Removed useless synchronized blocks 7ed48be [Kousuke Saruta] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide 9b620a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0dd9ad3 [Kousuke Saruta] Modified typo in ConnectionManagerSuite.scala 7cbb8ca [Kousuke Saruta] Modified to match with scalastyle 8a73974 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 ade279a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 0174d6a [Kousuke Saruta] Modified ConnectionManager.scala to handle the case remote Executor cannot ack a454239 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-2677 9b7b7c1 [Kousuke Saruta] (WIP) Modifying ConnectionManager.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/network/ConnectionManager.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala44
2 files changed, 78 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 95f96b8463..37d69a9ec4 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -22,6 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
+import java.util.{Timer, TimerTask}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
@@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
var ackMessage: Option[Message] = None
def markDone(ackMessage: Option[Message]) {
- this.synchronized {
- this.ackMessage = ackMessage
- completionHandler(this)
- }
+ this.ackMessage = ackMessage
+ completionHandler(this)
}
}
private val selector = SelectorProvider.provider.openSelector()
+ private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
// default to 30 second timeout waiting for authentication
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
+ private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
@@ -652,19 +653,27 @@ private[spark] class ConnectionManager(
}
}
if (bufferMessage.hasAckId()) {
- val sentMessageStatus = messageStatuses.synchronized {
+ messageStatuses.synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) => {
messageStatuses -= bufferMessage.ackId
- status
+ status.markDone(Some(message))
}
case None => {
- throw new Exception("Could not find reference for received ack message " +
- message.id)
+ /**
+ * We can fall down on this code because of following 2 cases
+ *
+ * (1) Invalid ack sent due to buggy code.
+ *
+ * (2) Late-arriving ack for a SendMessageStatus
+ * To avoid unwilling late-arriving ack
+ * caused by long pause like GC, you can set
+ * larger value than default to spark.core.connection.ack.wait.timeout
+ */
+ logWarning(s"Could not find reference for received ack Message ${message.id}")
}
}
}
- sentMessageStatus.markDone(Some(message))
} else {
var ackMessage : Option[Message] = None
try {
@@ -836,9 +845,23 @@ private[spark] class ConnectionManager(
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Message] = {
val promise = Promise[Message]()
+
+ val timeoutTask = new TimerTask {
+ override def run(): Unit = {
+ messageStatuses.synchronized {
+ messageStatuses.remove(message.id).foreach ( s => {
+ promise.failure(
+ new IOException(s"sendMessageReliably failed because ack " +
+ "was not received within ${ackTimeout} sec"))
+ })
+ }
+ }
+ }
+
val status = new MessageStatus(message, connectionManagerId, s => {
+ timeoutTask.cancel()
s.ackMessage match {
- case None => // Indicates a failure where we either never sent or never got ACK'd
+ case None => // Indicates a failure where we either never sent or never got ACK'd
promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
case Some(ackMessage) =>
if (ackMessage.hasError) {
@@ -852,6 +875,8 @@ private[spark] class ConnectionManager(
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}
+
+ ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
sendMessage(connectionManagerId, message)
promise.future
}
diff --git a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
index 846537df00..e2f4d4c57c 100644
--- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
@@ -19,14 +19,19 @@ package org.apache.spark.network
import java.io.IOException
import java.nio._
+import java.util.concurrent.TimeoutException
import org.apache.spark.{SecurityManager, SparkConf}
import org.scalatest.FunSuite
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+
+import scala.concurrent.TimeoutException
import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration._
import scala.language.postfixOps
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
/**
* Test the ConnectionManager with various security settings.
@@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite {
}
+ test("sendMessageReliably timeout") {
+ val clientConf = new SparkConf
+ clientConf.set("spark.authenticate", "false")
+ val ackTimeout = 30
+ clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")
+
+ val clientSecurityManager = new SecurityManager(clientConf)
+ val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
+
+ val serverConf = new SparkConf
+ serverConf.set("spark.authenticate", "false")
+ val serverSecurityManager = new SecurityManager(serverConf)
+ val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
+ managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+ // sleep 60 sec > ack timeout for simulating server slow down or hang up
+ Thread.sleep(ackTimeout * 3 * 1000)
+ None
+ })
+
+ val size = 10 * 1024 * 1024
+ val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
+ buffer.flip
+ val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+
+ val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
+
+ // Future should throw IOException in 30 sec.
+ // Otherwise TimeoutExcepton is thrown from Await.result.
+ // We expect TimeoutException is not thrown.
+ intercept[IOException] {
+ Await.result(future, (ackTimeout * 2) second)
+ }
+
+ manager.stop()
+ managerServer.stop()
+ }
+
}