diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-29 13:41:21 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-29 13:41:21 +0530 |
commit | 5618af6803ab21ffc30f6d736a25ea690c03b06c (patch) | |
tree | 4cae89be7bed4a816e9b6e432aa8ebfc3211c1a6 /core | |
parent | 1bc83ca79187979f58385d3f28236111217174e0 (diff) | |
parent | 743a31a7ca4421cbbd5b615b773997a06a7ab4ee (diff) | |
download | spark-5618af6803ab21ffc30f6d736a25ea690c03b06c.tar.gz spark-5618af6803ab21ffc30f6d736a25ea690c03b06c.tar.bz2 spark-5618af6803ab21ffc30f6d736a25ea690c03b06c.zip |
Merge branch 'master' into wip-scala-2.10
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 609464e38d..47db720416 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -19,6 +19,7 @@ package org.apache.spark.broadcast import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream} import java.net.URL +import java.util.concurrent.TimeUnit import it.unimi.dsi.fastutil.io.FastBufferedInputStream import it.unimi.dsi.fastutil.io.FastBufferedOutputStream @@ -83,6 +84,8 @@ private object HttpBroadcast extends Logging { private val files = new TimeStampedHashSet[String] private val cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup) + private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5,TimeUnit.MINUTES).toInt + private lazy val compressionCodec = CompressionCodec.createCodec() def initialize(isDriver: Boolean) { @@ -138,10 +141,13 @@ private object HttpBroadcast extends Logging { def read[T](id: Long): T = { val url = serverUri + "/" + BroadcastBlockId(id).name val in = { + val httpConnection = new URL(url).openConnection() + httpConnection.setReadTimeout(httpReadTimeout) + val inputStream = httpConnection.getInputStream() if (compress) { - compressionCodec.compressedInputStream(new URL(url).openStream()) + compressionCodec.compressedInputStream(inputStream) } else { - new FastBufferedInputStream(new URL(url).openStream(), bufferSize) + new FastBufferedInputStream(inputStream, bufferSize) } } val ser = SparkEnv.get.serializer.newInstance() |