aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-29 13:41:21 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-29 13:41:21 +0530
commit5618af6803ab21ffc30f6d736a25ea690c03b06c (patch)
tree4cae89be7bed4a816e9b6e432aa8ebfc3211c1a6 /core
parent1bc83ca79187979f58385d3f28236111217174e0 (diff)
parent743a31a7ca4421cbbd5b615b773997a06a7ab4ee (diff)
downloadspark-5618af6803ab21ffc30f6d736a25ea690c03b06c.tar.gz
spark-5618af6803ab21ffc30f6d736a25ea690c03b06c.tar.bz2
spark-5618af6803ab21ffc30f6d736a25ea690c03b06c.zip
Merge branch 'master' into wip-scala-2.10
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala10
1 files changed, 8 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 609464e38d..47db720416 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -19,6 +19,7 @@ package org.apache.spark.broadcast
import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
import java.net.URL
+import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup)
+ private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt
+
private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) {
@@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = {
val url = serverUri + "/" + BroadcastBlockId(id).name
val in = {
+ val httpConnection = new URL(url).openConnection()
+ httpConnection.setReadTimeout(httpReadTimeout)
+ val inputStream = httpConnection.getInputStream()
if (compress) {
- compressionCodec.compressedInputStream(new URL(url).openStream())
+ compressionCodec.compressedInputStream(inputStream)
} else {
- new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ new FastBufferedInputStream(inputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()