diff options
author | Jiacheng Guo <guojc03@gmail.com> | 2014-03-09 11:37:44 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-09 11:38:40 -0700 |
commit | f6f9d02e85d17da2f742ed0062f1648a9293e73c (patch) | |
tree | cbc3e4e183131494fae678153d61a23f13746e4a /core | |
parent | 52834d761b059264214dfc6a1f9c70b8bc7ec089 (diff) | |
download | spark-f6f9d02e85d17da2f742ed0062f1648a9293e73c.tar.gz spark-f6f9d02e85d17da2f742ed0062f1648a9293e73c.tar.bz2 spark-f6f9d02e85d17da2f742ed0062f1648a9293e73c.zip |
Add timeout for fetch file
Currently, when fetch a file, the connection's connect timeout
and read timeout is based on the default jvm setting, in this change, I change it to
use spark.worker.timeout. This can be usefull, when the
connection status between worker is not perfect. And prevent
prematurely remove task set.
Author: Jiacheng Guo <guojc03@gmail.com>
Closes #98 from guojc/master and squashes the following commits:
abfe698 [Jiacheng Guo] add space according request
2a37c34 [Jiacheng Guo] Add timeout for fetch file Currently, when fetch a file, the connection's connect timeout and read timeout is based on the default jvm setting, in this change, I change it to use spark.worker.timeout. This can be usefull, when the connection status between worker is not perfect. And prevent prematurely remove task set.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 4 |
1 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 53458b6660..ac376fc403 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -278,6 +278,10 @@ private[spark] object Utils extends Logging { uc = new URL(url).openConnection() } + val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 + uc.setConnectTimeout(timeout) + uc.setReadTimeout(timeout) + uc.connect() val in = uc.getInputStream(); val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) |