aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-16 10:44:54 -0700
committerReynold Xin <rxin@apache.org>2014-07-16 10:44:54 -0700
commitefe2a8b1262a371471f52ca7d47dc34789e80558 (patch)
treef92e3a37fbefb4d503e6765e63e4756a797d45c4 /core/src/main/scala
parent33e64ecacbc44567f9cba2644a30a118653ea5fa (diff)
downloadspark-efe2a8b1262a371471f52ca7d47dc34789e80558.tar.gz
spark-efe2a8b1262a371471f52ca7d47dc34789e80558.tar.bz2
spark-efe2a8b1262a371471f52ca7d47dc34789e80558.zip
Tightening visibility for various Broadcast related classes.
In preparation for SPARK-2521. Author: Reynold Xin <rxin@apache.org> Closes #1438 from rxin/broadcast and squashes the following commits: 432f1cc [Reynold Xin] Tightening visibility for various Broadcast related classes.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala8
5 files changed, 36 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 76956f6a34..15fd30e657 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
* define their own way to get the value.
*/
- private[spark] def getValue(): T
+ protected def getValue(): T
/**
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
* Broadcast class must define their own logic to unpersist their own data.
*/
- private[spark] def doUnpersist(blocking: Boolean)
+ protected def doUnpersist(blocking: Boolean)
/**
* Actually destroy all data and metadata related to this broadcast variable.
* Implementation of Broadcast class must define their own logic to destroy their own
* state.
*/
- private[spark] def doDestroy(blocking: Boolean)
+ protected def doDestroy(blocking: Boolean)
/** Check if this broadcast is valid. If not valid, exception is thrown. */
- private[spark] def assertValid() {
+ protected def assertValid() {
if (!_isValid) {
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 4f6cabaff2..487456467b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag](
@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
- def getValue = value_
+ override protected def getValue() = value_
- val blockId = BroadcastBlockId(id)
+ private val blockId = BroadcastBlockId(id)
/*
* Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
@@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag](
/**
* Remove all persisted state associated with this HTTP broadcast on the executors.
*/
- def doUnpersist(blocking: Boolean) {
+ override protected def doUnpersist(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
}
/**
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
*/
- def doDestroy(blocking: Boolean) {
+ override protected def doDestroy(blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
}
@@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag](
}
}
-private[spark] object HttpBroadcast extends Logging {
+private[broadcast] object HttpBroadcast extends Logging {
private var initialized = false
private var broadcastDir: File = null
private var compress: Boolean = false
@@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging {
def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)
- def write(id: Long, value: Any) {
+ private def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
@@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging {
files += file
}
- def read[T: ClassTag](id: Long): T = {
+ private def read[T: ClassTag](id: Long): T = {
logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
val url = serverUri + "/" + BroadcastBlockId(id).name
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
index d5a031e2bb..c7ef02d572 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
@@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
*/
class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
+ override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
HttpBroadcast.initialize(isDriver, conf, securityMgr)
}
- def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
+ override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
- def stop() { HttpBroadcast.stop() }
+ override def stop() { HttpBroadcast.stop() }
/**
* Remove all persisted state associated with the HTTP broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver
* @param blocking Whether to block until unbroadcasted
*/
- def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
+ override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
HttpBroadcast.unpersist(id, removeFromDriver, blocking)
}
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 734de37ba1..86731b684f 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -20,7 +20,6 @@ package org.apache.spark.broadcast
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
import scala.reflect.ClassTag
-import scala.math
import scala.util.Random
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
@@ -49,19 +48,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
- def getValue = value_
+ override protected def getValue() = value_
- val broadcastId = BroadcastBlockId(id)
+ private val broadcastId = BroadcastBlockId(id)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
- @transient var arrayOfBlocks: Array[TorrentBlock] = null
- @transient var totalBlocks = -1
- @transient var totalBytes = -1
- @transient var hasBlocks = 0
+ @transient private var arrayOfBlocks: Array[TorrentBlock] = null
+ @transient private var totalBlocks = -1
+ @transient private var totalBytes = -1
+ @transient private var hasBlocks = 0
if (!isLocal) {
sendBroadcast()
@@ -70,7 +69,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
/**
* Remove all persisted state associated with this Torrent broadcast on the executors.
*/
- def doUnpersist(blocking: Boolean) {
+ override protected def doUnpersist(blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
}
@@ -78,11 +77,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](
* Remove all persisted state associated with this Torrent broadcast on the executors
* and driver.
*/
- def doDestroy(blocking: Boolean) {
+ override protected def doDestroy(blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
}
- def sendBroadcast() {
+ private def sendBroadcast() {
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
@@ -159,7 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
hasBlocks = 0
}
- def receiveBroadcast(): Boolean = {
+ private def receiveBroadcast(): Boolean = {
// Receive meta-info about the size of broadcast data,
// the number of chunks it is divided into, etc.
val metaId = BroadcastBlockId(id, "meta")
@@ -211,7 +210,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
}
-private[spark] object TorrentBroadcast extends Logging {
+private[broadcast] object TorrentBroadcast extends Logging {
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
private var initialized = false
private var conf: SparkConf = null
@@ -272,17 +271,19 @@ private[spark] object TorrentBroadcast extends Logging {
* Remove all persisted blocks associated with this torrent broadcast on the executors.
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
- def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
- SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
+ def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+ synchronized {
+ SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
+ }
}
}
-private[spark] case class TorrentBlock(
+private[broadcast] case class TorrentBlock(
blockID: Int,
byteArray: Array[Byte])
extends Serializable
-private[spark] case class TorrentInfo(
+private[broadcast] case class TorrentInfo(
@transient arrayOfBlocks: Array[TorrentBlock],
totalBlocks: Int,
totalBytes: Int)
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
index 1de8396a0e..ad0f701d7a 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
@@ -28,21 +28,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
*/
class TorrentBroadcastFactory extends BroadcastFactory {
- def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
+ override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
TorrentBroadcast.initialize(isDriver, conf)
}
- def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
+ override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
- def stop() { TorrentBroadcast.stop() }
+ override def stop() { TorrentBroadcast.stop() }
/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver.
* @param blocking Whether to block until unbroadcasted
*/
- def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
+ override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
}
}