diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-11-27 18:24:39 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-11-27 18:24:39 -0800 |
commit | 743a31a7ca4421cbbd5b615b773997a06a7ab4ee (patch) | |
tree | 36ad6245012f63fdff398edfdb1a5aad05502486 | |
parent | fb6875dd5c9334802580155464cef9ac4d4cc1f0 (diff) | |
parent | db998a6e14389768f93b1fdd6be7847d5f7604fd (diff) | |
download | spark-743a31a7ca4421cbbd5b615b773997a06a7ab4ee.tar.gz spark-743a31a7ca4421cbbd5b615b773997a06a7ab4ee.tar.bz2 spark-743a31a7ca4421cbbd5b615b773997a06a7ab4ee.zip |
Merge pull request #210 from haitaoyao/http-timeout
add http timeout for httpbroadcast
While pulling task bytecode from HttpBroadcast server, there's no timeout value set. This may cause spark executor code hang and other task in the same executor process wait for the lock. I have encountered the issue in my cluster. Here's the stacktrace I captured : https://gist.github.com/haitaoyao/7655830
So add a time out value to ensure the task fail fast.
-rw-r--r-- | core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 609464e38d..47db720416 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -19,6 +19,7 @@ package org.apache.spark.broadcast import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream} import java.net.URL +import java.util.concurrent.TimeUnit import it.unimi.dsi.fastutil.io.FastBufferedInputStream import it.unimi.dsi.fastutil.io.FastBufferedOutputStream @@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging { private val files = new TimeStampedHashSet[String] private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup) + private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt + private lazy val compressionCodec = CompressionCodec.createCodec() def initialize(isDriver: Boolean) { @@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging { def read[T](id: Long): T = { val url = serverUri + "/" + BroadcastBlockId(id).name val in = { + val httpConnection = new URL(url).openConnection() + httpConnection.setReadTimeout(httpReadTimeout) + val inputStream = httpConnection.getInputStream() if (compress) { - compressionCodec.compressedInputStream(new URL(url).openStream()) + compressionCodec.compressedInputStream(inputStream) } else { - new FastBufferedInputStream(new URL(url).openStream(), bufferSize) + new FastBufferedInputStream(inputStream, bufferSize) } } val ser = SparkEnv.get.serializer.newInstance() |