diff options
author | Jacek Lewandowski <lewandowski.jacek@gmail.com> | 2015-11-17 15:57:43 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-11-17 16:00:00 -0800 |
commit | b362d50fca30693f97bd859984157bb8a76d48a1 (patch) | |
tree | a884803cf7565affd6db7359fa830473aa5e51ec | |
parent | 52c734b589277267be07e245c959199db92aa189 (diff) | |
download | spark-b362d50fca30693f97bd859984157bb8a76d48a1.tar.gz spark-b362d50fca30693f97bd859984157bb8a76d48a1.tar.bz2 spark-b362d50fca30693f97bd859984157bb8a76d48a1.zip |
[SPARK-11726] Throw exception on timeout when waiting for REST server response
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>
Closes #9692 from jacek-lewandowski/SPARK-11726.
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 957a928bc4..f0dd667ea1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -19,16 +19,19 @@ package org.apache.spark.deploy.rest import java.io.{DataOutputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} +import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse import scala.collection.mutable +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import scala.io.Source import com.fasterxml.jackson.core.JsonProcessingException import com.google.common.base.Charsets -import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf} /** * A client that submits applications to a [[RestSubmissionServer]]. @@ -225,7 +228,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { * Exposed for testing. */ private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = { - try { + import scala.concurrent.ExecutionContext.Implicits.global + val responseFuture = Future { val dataStream = if (connection.getResponseCode == HttpServletResponse.SC_OK) { connection.getInputStream @@ -251,11 +255,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { throw new SubmitRestProtocolException( s"Message received from server was not a response:\n${unexpected.toJson}") } - } catch { + } + + try { Await.result(responseFuture, 10.seconds) } catch { case unreachable @ (_: FileNotFoundException | _: SocketException) => throw new SubmitRestConnectionException("Unable to connect to server", unreachable) case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => throw new SubmitRestProtocolException("Malformed response received from server", malformed) + case timeout: TimeoutException => + throw new SubmitRestConnectionException("No response from server", timeout) } } |