From efe2a8b1262a371471f52ca7d47dc34789e80558 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 16 Jul 2014 10:44:54 -0700 Subject: Tightening visibility for various Broadcast related classes. In preparation for SPARK-2521. Author: Reynold Xin Closes #1438 from rxin/broadcast and squashes the following commits: 432f1cc [Reynold Xin] Tightening visibility for various Broadcast related classes. --- .../org/apache/spark/broadcast/Broadcast.scala | 8 +++--- .../org/apache/spark/broadcast/HttpBroadcast.scala | 14 ++++----- .../spark/broadcast/HttpBroadcastFactory.scala | 8 +++--- .../apache/spark/broadcast/TorrentBroadcast.scala | 33 +++++++++++----------- .../spark/broadcast/TorrentBroadcastFactory.scala | 8 +++--- 5 files changed, 36 insertions(+), 35 deletions(-) (limited to 'core/src/main/scala') diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 76956f6a34..15fd30e657 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable { * Actually get the broadcasted value. Concrete implementations of Broadcast class must * define their own way to get the value. */ - private[spark] def getValue(): T + protected def getValue(): T /** * Actually unpersist the broadcasted value on the executors. Concrete implementations of * Broadcast class must define their own logic to unpersist their own data. */ - private[spark] def doUnpersist(blocking: Boolean) + protected def doUnpersist(blocking: Boolean) /** * Actually destroy all data and metadata related to this broadcast variable. * Implementation of Broadcast class must define their own logic to destroy their own * state. */ - private[spark] def doDestroy(blocking: Boolean) + protected def doDestroy(blocking: Boolean) /** Check if this broadcast is valid. If not valid, exception is thrown. */ - private[spark] def assertValid() { + protected def assertValid() { if (!_isValid) { throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString)) } diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 4f6cabaff2..487456467b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag]( @transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { - def getValue = value_ + override protected def getValue() = value_ - val blockId = BroadcastBlockId(id) + private val blockId = BroadcastBlockId(id) /* * Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster @@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag]( /** * Remove all persisted state associated with this HTTP broadcast on the executors. */ - def doUnpersist(blocking: Boolean) { + override protected def doUnpersist(blocking: Boolean) { HttpBroadcast.unpersist(id, removeFromDriver = false, blocking) } /** * Remove all persisted state associated with this HTTP broadcast on the executors and driver. */ - def doDestroy(blocking: Boolean) { + override protected def doDestroy(blocking: Boolean) { HttpBroadcast.unpersist(id, removeFromDriver = true, blocking) } @@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag]( } } -private[spark] object HttpBroadcast extends Logging { +private[broadcast] object HttpBroadcast extends Logging { private var initialized = false private var broadcastDir: File = null private var compress: Boolean = false @@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging { def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name) - def write(id: Long, value: Any) { + private def write(id: Long, value: Any) { val file = getFile(id) val out: OutputStream = { if (compress) { @@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging { files += file } - def read[T: ClassTag](id: Long): T = { + private def read[T: ClassTag](id: Long): T = { logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id) val url = serverUri + "/" + BroadcastBlockId(id).name diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala index d5a031e2bb..c7ef02d572 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala @@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf} * [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism. */ class HttpBroadcastFactory extends BroadcastFactory { - def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { + override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { HttpBroadcast.initialize(isDriver, conf, securityMgr) } - def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = + override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = new HttpBroadcast[T](value_, isLocal, id) - def stop() { HttpBroadcast.stop() } + override def stop() { HttpBroadcast.stop() } /** * Remove all persisted state associated with the HTTP broadcast with the given ID. * @param removeFromDriver Whether to remove state from the driver * @param blocking Whether to block until unbroadcasted */ - def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { + override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { HttpBroadcast.unpersist(id, removeFromDriver, blocking) } } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 734de37ba1..86731b684f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -20,7 +20,6 @@ package org.apache.spark.broadcast import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream} import scala.reflect.ClassTag -import scala.math import scala.util.Random import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} @@ -49,19 +48,19 @@ private[spark] class TorrentBroadcast[T: ClassTag]( @transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { - def getValue = value_ + override protected def getValue() = value_ - val broadcastId = BroadcastBlockId(id) + private val broadcastId = BroadcastBlockId(id) TorrentBroadcast.synchronized { SparkEnv.get.blockManager.putSingle( broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false) } - @transient var arrayOfBlocks: Array[TorrentBlock] = null - @transient var totalBlocks = -1 - @transient var totalBytes = -1 - @transient var hasBlocks = 0 + @transient private var arrayOfBlocks: Array[TorrentBlock] = null + @transient private var totalBlocks = -1 + @transient private var totalBytes = -1 + @transient private var hasBlocks = 0 if (!isLocal) { sendBroadcast() @@ -70,7 +69,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( /** * Remove all persisted state associated with this Torrent broadcast on the executors. */ - def doUnpersist(blocking: Boolean) { + override protected def doUnpersist(blocking: Boolean) { TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking) } @@ -78,11 +77,11 @@ private[spark] class TorrentBroadcast[T: ClassTag]( * Remove all persisted state associated with this Torrent broadcast on the executors * and driver. */ - def doDestroy(blocking: Boolean) { + override protected def doDestroy(blocking: Boolean) { TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking) } - def sendBroadcast() { + private def sendBroadcast() { val tInfo = TorrentBroadcast.blockifyObject(value_) totalBlocks = tInfo.totalBlocks totalBytes = tInfo.totalBytes @@ -159,7 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( hasBlocks = 0 } - def receiveBroadcast(): Boolean = { + private def receiveBroadcast(): Boolean = { // Receive meta-info about the size of broadcast data, // the number of chunks it is divided into, etc. val metaId = BroadcastBlockId(id, "meta") @@ -211,7 +210,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( } -private[spark] object TorrentBroadcast extends Logging { +private[broadcast] object TorrentBroadcast extends Logging { private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 private var initialized = false private var conf: SparkConf = null @@ -272,17 +271,19 @@ private[spark] object TorrentBroadcast extends Logging { * Remove all persisted blocks associated with this torrent broadcast on the executors. * If removeFromDriver is true, also remove these persisted blocks on the driver. */ - def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized { - SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) + def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = { + synchronized { + SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) + } } } -private[spark] case class TorrentBlock( +private[broadcast] case class TorrentBlock( blockID: Int, byteArray: Array[Byte]) extends Serializable -private[spark] case class TorrentInfo( +private[broadcast] case class TorrentInfo( @transient arrayOfBlocks: Array[TorrentBlock], totalBlocks: Int, totalBytes: Int) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index 1de8396a0e..ad0f701d7a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -28,21 +28,21 @@ import org.apache.spark.{SecurityManager, SparkConf} */ class TorrentBroadcastFactory extends BroadcastFactory { - def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { + override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { TorrentBroadcast.initialize(isDriver, conf) } - def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = + override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) = new TorrentBroadcast[T](value_, isLocal, id) - def stop() { TorrentBroadcast.stop() } + override def stop() { TorrentBroadcast.stop() } /** * Remove all persisted state associated with the torrent broadcast with the given ID. * @param removeFromDriver Whether to remove state from the driver. * @param blocking Whether to block until unbroadcasted */ - def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { + override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { TorrentBroadcast.unpersist(id, removeFromDriver, blocking) } } -- cgit v1.2.3