diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 957a928bc4..f0dd667ea1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -19,16 +19,19 @@ package org.apache.spark.deploy.rest import java.io.{DataOutputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} +import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse import scala.collection.mutable +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} import scala.io.Source import com.fasterxml.jackson.core.JsonProcessingException import com.google.common.base.Charsets -import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf} /** * A client that submits applications to a [[RestSubmissionServer]]. @@ -225,7 +228,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { * Exposed for testing. */ private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = { - try { + import scala.concurrent.ExecutionContext.Implicits.global + val responseFuture = Future { val dataStream = if (connection.getResponseCode == HttpServletResponse.SC_OK) { connection.getInputStream @@ -251,11 +255,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { throw new SubmitRestProtocolException( s"Message received from server was not a response:\n${unexpected.toJson}") } - } catch { + } + + try { Await.result(responseFuture, 10.seconds) } catch { case unreachable @ (_: FileNotFoundException | _: SocketException) => throw new SubmitRestConnectionException("Unable to connect to server", unreachable) case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) => throw new SubmitRestProtocolException("Malformed response received from server", malformed) + case timeout: TimeoutException => + throw new SubmitRestConnectionException("No response from server", timeout) } } |