aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJacek Lewandowski <lewandowski.jacek@gmail.com>2015-11-17 15:57:43 -0800
committerAndrew Or <andrew@databricks.com>2015-11-17 16:00:00 -0800
commitb362d50fca30693f97bd859984157bb8a76d48a1 (patch)
treea884803cf7565affd6db7359fa830473aa5e51ec /core
parent52c734b589277267be07e245c959199db92aa189 (diff)
downloadspark-b362d50fca30693f97bd859984157bb8a76d48a1.tar.gz
spark-b362d50fca30693f97bd859984157bb8a76d48a1.tar.bz2
spark-b362d50fca30693f97bd859984157bb8a76d48a1.zip
[SPARK-11726] Throw exception on timeout when waiting for REST server response
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Closes #9692 from jacek-lewandowski/SPARK-11726.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala14
1 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 957a928bc4..f0dd667ea1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -19,16 +19,19 @@ package org.apache.spark.deploy.rest
import java.io.{DataOutputStream, FileNotFoundException}
import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
+import java.util.concurrent.TimeoutException
import javax.servlet.http.HttpServletResponse
import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Future}
import scala.io.Source
import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.base.Charsets
-import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf}
/**
* A client that submits applications to a [[RestSubmissionServer]].
@@ -225,7 +228,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
* Exposed for testing.
*/
private[rest] def readResponse(connection: HttpURLConnection): SubmitRestProtocolResponse = {
- try {
+ import scala.concurrent.ExecutionContext.Implicits.global
+ val responseFuture = Future {
val dataStream =
if (connection.getResponseCode == HttpServletResponse.SC_OK) {
connection.getInputStream
@@ -251,11 +255,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
throw new SubmitRestProtocolException(
s"Message received from server was not a response:\n${unexpected.toJson}")
}
- } catch {
+ }
+
+ try { Await.result(responseFuture, 10.seconds) } catch {
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
throw new SubmitRestProtocolException("Malformed response received from server", malformed)
+ case timeout: TimeoutException =>
+ throw new SubmitRestConnectionException("No response from server", timeout)
}
}