aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-18 20:51:41 -0700
committerReynold Xin <rxin@apache.org>2014-08-18 20:51:41 -0700
commit82577339dd58b5811eab5d10667775e61e37ff51 (patch)
tree9867a722cce4621e418158fd72a03935c3c4ecc4 /core
parent1f1819b20f887b487557c31e54b8bcd95b582dc6 (diff)
downloadspark-82577339dd58b5811eab5d10667775e61e37ff51.tar.gz
spark-82577339dd58b5811eab5d10667775e61e37ff51.tar.bz2
spark-82577339dd58b5811eab5d10667775e61e37ff51.zip
[SPARK-3116] Remove the excessive lockings in TorrentBroadcast
Author: Reynold Xin <rxin@apache.org> Closes #2028 from rxin/torrentBroadcast and squashes the following commits: 92c62a5 [Reynold Xin] Revert the MEMORY_AND_DISK_SER changes. 03a5221 [Reynold Xin] [SPARK-3116] Remove the excessive lockings in TorrentBroadcast
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala66
1 files changed, 27 insertions, 39 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index fe73456ef8..d8be649f96 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -17,8 +17,7 @@
package org.apache.spark.broadcast
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
- ObjectInputStream, ObjectOutputStream, OutputStream}
+import java.io._
import scala.reflect.ClassTag
import scala.util.Random
@@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
private val broadcastId = BroadcastBlockId(id)
- TorrentBroadcast.synchronized {
- SparkEnv.get.blockManager.putSingle(
- broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- }
+ SparkEnv.get.blockManager.putSingle(
+ broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
@transient private var totalBlocks = -1
@@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
// Store meta-info
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
- TorrentBroadcast.synchronized {
- SparkEnv.get.blockManager.putSingle(
- metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- }
+ SparkEnv.get.blockManager.putSingle(
+ metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// Store individual pieces
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
- TorrentBroadcast.synchronized {
- SparkEnv.get.blockManager.putSingle(
- pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- }
+ SparkEnv.get.blockManager.putSingle(
+ pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}
@@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val metaId = BroadcastBlockId(id, "meta")
var attemptId = 10
while (attemptId > 0 && totalBlocks == -1) {
- TorrentBroadcast.synchronized {
- SparkEnv.get.blockManager.getSingle(metaId) match {
- case Some(x) =>
- val tInfo = x.asInstanceOf[TorrentInfo]
- totalBlocks = tInfo.totalBlocks
- totalBytes = tInfo.totalBytes
- arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
- hasBlocks = 0
-
- case None =>
- Thread.sleep(500)
- }
+ SparkEnv.get.blockManager.getSingle(metaId) match {
+ case Some(x) =>
+ val tInfo = x.asInstanceOf[TorrentInfo]
+ totalBlocks = tInfo.totalBlocks
+ totalBytes = tInfo.totalBytes
+ arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
+ hasBlocks = 0
+
+ case None =>
+ Thread.sleep(500)
}
attemptId -= 1
}
+
if (totalBlocks == -1) {
return false
}
@@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid <- recvOrder) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
- TorrentBroadcast.synchronized {
- SparkEnv.get.blockManager.getSingle(pieceId) match {
- case Some(x) =>
- arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
- hasBlocks += 1
- SparkEnv.get.blockManager.putSingle(
- pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ SparkEnv.get.blockManager.getSingle(pieceId) match {
+ case Some(x) =>
+ arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
+ hasBlocks += 1
+ SparkEnv.get.blockManager.putSingle(
+ pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- case None =>
- throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
- }
+ case None =>
+ throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}
@@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
- synchronized {
- SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
- }
+ SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}