aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-11-27 18:24:39 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-11-27 18:24:39 -0800
commit743a31a7ca4421cbbd5b615b773997a06a7ab4ee (patch)
tree36ad6245012f63fdff398edfdb1a5aad05502486 /core
parentfb6875dd5c9334802580155464cef9ac4d4cc1f0 (diff)
parentdb998a6e14389768f93b1fdd6be7847d5f7604fd (diff)
downloadspark-743a31a7ca4421cbbd5b615b773997a06a7ab4ee.tar.gz
spark-743a31a7ca4421cbbd5b615b773997a06a7ab4ee.tar.bz2
spark-743a31a7ca4421cbbd5b615b773997a06a7ab4ee.zip
Merge pull request #210 from haitaoyao/http-timeout
add http timeout for httpbroadcast While pulling task bytecode from HttpBroadcast server, there's no timeout value set. This may cause spark executor code hang and other task in the same executor process wait for the lock. I have encountered the issue in my cluster. Here's the stacktrace I captured : https://gist.github.com/haitaoyao/7655830 So add a time out value to ensure the task fail fast.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala10
1 files changed, 8 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 609464e38d..47db720416 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -19,6 +19,7 @@ package org.apache.spark.broadcast
import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
import java.net.URL
+import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
+ private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
+
private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) {
@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = {
val url = serverUri + "/" + BroadcastBlockId(id).name
val in = {
+ val httpConnection = new URL(url).openConnection()
+ httpConnection.setReadTimeout(httpReadTimeout)
+ val inputStream = httpConnection.getInputStream()
if (compress) {
- compressionCodec.compressedInputStream(new URL(url).openStream())
+ compressionCodec.compressedInputStream(inputStream)
} else {
- new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ new FastBufferedInputStream(inputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()