diff options
Diffstat (limited to 'core/src/main/scala/spark/broadcast/HttpBroadcast.scala')
-rw-r--r-- | core/src/main/scala/spark/broadcast/HttpBroadcast.scala | 52 |
1 files changed, 29 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index 03986ea756..7eb4ddb74f 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -12,44 +12,47 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import spark._ import spark.storage.StorageLevel -class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean) -extends Broadcast[T] with Logging with Serializable { +private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) +extends Broadcast[T](id) with Logging with Serializable { def value = value_ + def blockId: String = "broadcast_" + id + HttpBroadcast.synchronized { - SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false) } if (!isLocal) { - HttpBroadcast.write(uuid, value_) + HttpBroadcast.write(id, value_) } // Called by JVM when deserializing an object private def readObject(in: ObjectInputStream) { in.defaultReadObject() HttpBroadcast.synchronized { - SparkEnv.get.blockManager.getSingle(uuid.toString) match { + SparkEnv.get.blockManager.getSingle(blockId) match { case Some(x) => value_ = x.asInstanceOf[T] case None => { - logInfo("Started reading broadcast variable " + uuid) + logInfo("Started reading broadcast variable " + id) val start = System.nanoTime - value_ = HttpBroadcast.read[T](uuid) - SparkEnv.get.blockManager.putSingle( - uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false) + value_ = HttpBroadcast.read[T](id) + SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false) val time = (System.nanoTime - start) / 1e9 - logInfo("Reading broadcast variable " + uuid + " took " + time + " s") + logInfo("Reading broadcast variable " + id + " took " + time + " s") } } } } } -class HttpBroadcastFactory extends BroadcastFactory { - def initialize(isMaster: Boolean) = HttpBroadcast.initialize(isMaster) - def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal) - def stop() = HttpBroadcast.stop() +private[spark] class HttpBroadcastFactory extends BroadcastFactory { + def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) } + + def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = + new HttpBroadcast[T](value_, isLocal, id) + + def stop() { HttpBroadcast.stop() } } private object HttpBroadcast extends Logging { @@ -65,7 +68,7 @@ private object HttpBroadcast extends Logging { synchronized { if (!initialized) { bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - compress = System.getProperty("spark.compress", "false").toBoolean + compress = System.getProperty("spark.broadcast.compress", "true").toBoolean if (isMaster) { createServer() } @@ -76,9 +79,12 @@ private object HttpBroadcast extends Logging { } def stop() { - if (server != null) { - server.stop() - server = null + synchronized { + if (server != null) { + server.stop() + server = null + } + initialized = false } } @@ -91,8 +97,8 @@ private object HttpBroadcast extends Logging { logInfo("Broadcast server started at " + serverUri) } - def write(uuid: UUID, value: Any) { - val file = new File(broadcastDir, "broadcast-" + uuid) + def write(id: Long, value: Any) { + val file = new File(broadcastDir, "broadcast-" + id) val out: OutputStream = if (compress) { new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering } else { @@ -104,8 +110,8 @@ private object HttpBroadcast extends Logging { serOut.close() } - def read[T](uuid: UUID): T = { - val url = serverUri + "/broadcast-" + uuid + def read[T](id: Long): T = { + val url = serverUri + "/broadcast-" + id var in = if (compress) { new LZFInputStream(new URL(url).openStream()) // Does its own buffering } else { |