aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/broadcast/HttpBroadcast.scala')
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala52
1 files changed, 29 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 03986ea756..7eb4ddb74f 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -12,44 +12,47 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark._
import spark.storage.StorageLevel
-class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean)
-extends Broadcast[T] with Logging with Serializable {
+private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
+extends Broadcast[T](id) with Logging with Serializable {
def value = value_
+ def blockId: String = "broadcast_" + id
+
HttpBroadcast.synchronized {
- SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
+ SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
}
if (!isLocal) {
- HttpBroadcast.write(uuid, value_)
+ HttpBroadcast.write(id, value_)
}
// Called by JVM when deserializing an object
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
HttpBroadcast.synchronized {
- SparkEnv.get.blockManager.getSingle(uuid.toString) match {
+ SparkEnv.get.blockManager.getSingle(blockId) match {
case Some(x) => value_ = x.asInstanceOf[T]
case None => {
- logInfo("Started reading broadcast variable " + uuid)
+ logInfo("Started reading broadcast variable " + id)
val start = System.nanoTime
- value_ = HttpBroadcast.read[T](uuid)
- SparkEnv.get.blockManager.putSingle(
- uuid.toString, value_, StorageLevel.MEMORY_ONLY_DESER, false)
+ value_ = HttpBroadcast.read[T](id)
+ SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
val time = (System.nanoTime - start) / 1e9
- logInfo("Reading broadcast variable " + uuid + " took " + time + " s")
+ logInfo("Reading broadcast variable " + id + " took " + time + " s")
}
}
}
}
}
-class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isMaster: Boolean) = HttpBroadcast.initialize(isMaster)
- def newBroadcast[T](value_ : T, isLocal: Boolean) = new HttpBroadcast[T](value_, isLocal)
- def stop() = HttpBroadcast.stop()
+private[spark] class HttpBroadcastFactory extends BroadcastFactory {
+ def initialize(isMaster: Boolean) { HttpBroadcast.initialize(isMaster) }
+
+ def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
+ new HttpBroadcast[T](value_, isLocal, id)
+
+ def stop() { HttpBroadcast.stop() }
}
private object HttpBroadcast extends Logging {
@@ -65,7 +68,7 @@ private object HttpBroadcast extends Logging {
synchronized {
if (!initialized) {
bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- compress = System.getProperty("spark.compress", "false").toBoolean
+ compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
if (isMaster) {
createServer()
}
@@ -76,9 +79,12 @@ private object HttpBroadcast extends Logging {
}
def stop() {
- if (server != null) {
- server.stop()
- server = null
+ synchronized {
+ if (server != null) {
+ server.stop()
+ server = null
+ }
+ initialized = false
}
}
@@ -91,8 +97,8 @@ private object HttpBroadcast extends Logging {
logInfo("Broadcast server started at " + serverUri)
}
- def write(uuid: UUID, value: Any) {
- val file = new File(broadcastDir, "broadcast-" + uuid)
+ def write(id: Long, value: Any) {
+ val file = new File(broadcastDir, "broadcast-" + id)
val out: OutputStream = if (compress) {
new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
} else {
@@ -104,8 +110,8 @@ private object HttpBroadcast extends Logging {
serOut.close()
}
- def read[T](uuid: UUID): T = {
- val url = serverUri + "/broadcast-" + uuid
+ def read[T](id: Long): T = {
+ val url = serverUri + "/broadcast-" + id
var in = if (compress) {
new LZFInputStream(new URL(url).openStream()) // Does its own buffering
} else {