diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 03:05:52 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-06 03:05:52 -0800 |
commit | 3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (patch) | |
tree | 98b744beacdb06925eba8413288c473b13f49174 /core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala | |
parent | d0fd3b9ad238294346eb3465c489eabd41fb2380 (diff) | |
parent | a2e7e0497484554f86bd71e93705eb0422b1512b (diff) | |
download | spark-3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686.tar.gz spark-3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686.tar.bz2 spark-3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686.zip |
Merge remote-tracking branch 'apache/master' into project-refactor
Conflicts:
examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
Diffstat (limited to 'core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 073a0a5029..9530938278 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -83,13 +83,13 @@ extends Broadcast[T](id) with Logging with Serializable { case None => val start = System.nanoTime logInfo("Started reading broadcast variable " + id) - + // Initialize @transient variables that will receive garbage values from the master. resetWorkerVariables() if (receiveBroadcast(id)) { value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) - + // Store the merged copy in cache so that the next worker doesn't need to rebuild it. // This creates a tradeoff between memory usage and latency. // Storing copy doubles the memory footprint; not storing doubles deserialization cost. @@ -122,14 +122,14 @@ extends Broadcast[T](id) with Logging with Serializable { while (attemptId > 0 && totalBlocks == -1) { TorrentBroadcast.synchronized { SparkEnv.get.blockManager.getSingle(metaId) match { - case Some(x) => + case Some(x) => val tInfo = x.asInstanceOf[TorrentInfo] totalBlocks = tInfo.totalBlocks totalBytes = tInfo.totalBytes arrayOfBlocks = new Array[TorrentBlock](totalBlocks) hasBlocks = 0 - - case None => + + case None => Thread.sleep(500) } } @@ -145,13 +145,13 @@ extends Broadcast[T](id) with Logging with Serializable { val pieceId = BroadcastHelperBlockId(broadcastId, "piece" + pid) TorrentBroadcast.synchronized { SparkEnv.get.blockManager.getSingle(pieceId) match { - case Some(x) => + case Some(x) => arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock] hasBlocks += 1 SparkEnv.get.blockManager.putSingle( pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, true) - - case None => + + case None => throw new SparkException("Failed to get " + pieceId + " of " + broadcastId) } } @@ -166,21 +166,22 @@ private object TorrentBroadcast extends Logging { private var initialized = false - - def initialize(_isDriver: Boolean) { + private var conf: SparkConf = null + def initialize(_isDriver: Boolean, conf: SparkConf) { + TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests synchronized { if (!initialized) { initialized = true } } } - + def stop() { initialized = false } - val BLOCK_SIZE = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024 - + lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024 + def blockifyObject[T](obj: T): TorrentInfo = { val byteArray = Utils.serialize[T](obj) val bais = new ByteArrayInputStream(byteArray) @@ -209,7 +210,7 @@ extends Logging { } def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock], - totalBytes: Int, + totalBytes: Int, totalBlocks: Int): T = { var retByteArray = new Array[Byte](totalBytes) for (i <- 0 until totalBlocks) { @@ -222,23 +223,23 @@ extends Logging { } private[spark] case class TorrentBlock( - blockID: Int, - byteArray: Array[Byte]) + blockID: Int, + byteArray: Array[Byte]) extends Serializable private[spark] case class TorrentInfo( @transient arrayOfBlocks : Array[TorrentBlock], - totalBlocks: Int, - totalBytes: Int) + totalBlocks: Int, + totalBytes: Int) extends Serializable { - - @transient var hasBlocks = 0 + + @transient var hasBlocks = 0 } private[spark] class TorrentBroadcastFactory extends BroadcastFactory { - - def initialize(isDriver: Boolean) { TorrentBroadcast.initialize(isDriver) } + + def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) } def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) = new TorrentBroadcast[T](value_, isLocal, id) |