aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJiacheng Guo <guojc03@gmail.com>2014-03-09 11:37:44 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-09 11:38:40 -0700
commitf6f9d02e85d17da2f742ed0062f1648a9293e73c (patch)
treecbc3e4e183131494fae678153d61a23f13746e4a /core
parent52834d761b059264214dfc6a1f9c70b8bc7ec089 (diff)
downloadspark-f6f9d02e85d17da2f742ed0062f1648a9293e73c.tar.gz
spark-f6f9d02e85d17da2f742ed0062f1648a9293e73c.tar.bz2
spark-f6f9d02e85d17da2f742ed0062f1648a9293e73c.zip
Add timeout for fetch file
Currently, when fetch a file, the connection's connect timeout and read timeout is based on the default jvm setting, in this change, I change it to use spark.worker.timeout. This can be usefull, when the connection status between worker is not perfect. And prevent prematurely remove task set. Author: Jiacheng Guo <guojc03@gmail.com> Closes #98 from guojc/master and squashes the following commits: abfe698 [Jiacheng Guo] add space according request 2a37c34 [Jiacheng Guo] Add timeout for fetch file Currently, when fetch a file, the connection's connect timeout and read timeout is based on the default jvm setting, in this change, I change it to use spark.worker.timeout. This can be usefull, when the connection status between worker is not perfect. And prevent prematurely remove task set.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala4
1 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 53458b6660..ac376fc403 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -278,6 +278,10 @@ private[spark] object Utils extends Logging {
uc = new URL(url).openConnection()
}
+ val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000
+ uc.setConnectTimeout(timeout)
+ uc.setReadTimeout(timeout)
+ uc.connect()
val in = uc.getInputStream();
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)