aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala21
-rw-r--r--core/src/main/scala/spark/Utils.scala78
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala18
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala33
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala36
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala192
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala3
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala8
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerWorker.scala10
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala2
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala8
-rw-r--r--core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala9
-rw-r--r--core/src/test/scala/spark/storage/BlockManagerSuite.scala36
13 files changed, 233 insertions, 221 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index dde131696f..e6c0438d76 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,13 +1,10 @@
package spark
-import java.net.URL
-import java.util.{Date, Random}
-import java.util.{HashMap => JHashMap}
+import java.util.Random
import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
@@ -32,7 +29,6 @@ import spark.rdd.MapPartitionsWithIndexRDD
import spark.rdd.PipedRDD
import spark.rdd.SampledRDD
import spark.rdd.ShuffledRDD
-import spark.rdd.SubtractedRDD
import spark.rdd.UnionRDD
import spark.rdd.ZippedRDD
import spark.rdd.ZippedPartitionsRDD2
@@ -141,10 +137,15 @@ abstract class RDD[T: ClassManifest](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
- /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */
- def unpersist(): RDD[T] = {
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ * @return This RDD.
+ */
+ def unpersist(blocking: Boolean = true): RDD[T] = {
logInfo("Removing RDD " + id + " from persistence list")
- sc.env.blockManager.master.removeRdd(id)
+ sc.env.blockManager.master.removeRdd(id, blocking)
sc.persistentRdds.remove(id)
storageLevel = StorageLevel.NONE
this
@@ -269,8 +270,8 @@ abstract class RDD[T: ClassManifest](
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
- var multiplier = 3.0
- var initialCount = count()
+ val multiplier = 3.0
+ val initialCount = count()
var maxSelected = 0
if (initialCount > Integer.MAX_VALUE - 1) {
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 84626df553..ec15326014 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -4,20 +4,26 @@ import java.io._
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import java.util.regex.Pattern
+
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConversions._
import scala.io.Source
+
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+
import spark.serializer.SerializerInstance
import spark.deploy.SparkHadoopUtil
-import java.util.regex.Pattern
+
/**
* Various utility methods used by Spark.
*/
private object Utils extends Logging {
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -68,7 +74,6 @@ private object Utils extends Logging {
return buf
}
-
private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
// Register the path to be deleted via shutdown hook
@@ -87,19 +92,19 @@ private object Utils extends Logging {
}
}
- // Note: if file is child of some registered path, while not equal to it, then return true; else false
- // This is to ensure that two shutdown hooks do not try to delete each others paths - resulting in IOException
- // and incomplete cleanup
+ // Note: if file is child of some registered path, while not equal to it, then return true;
+ // else false. This is to ensure that two shutdown hooks do not try to delete each others
+ // paths - resulting in IOException and incomplete cleanup.
def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-
val absolutePath = file.getAbsolutePath()
-
val retval = shutdownDeletePaths.synchronized {
- shutdownDeletePaths.find(path => ! absolutePath.equals(path) && absolutePath.startsWith(path) ).isDefined
+ shutdownDeletePaths.find { path =>
+ !absolutePath.equals(path) && absolutePath.startsWith(path)
+ }.isDefined
+ }
+ if (retval) {
+ logInfo("path = " + file + ", already present as root for deletion.")
}
-
- if (retval) logInfo("path = " + file + ", already present as root for deletion.")
-
retval
}
@@ -131,7 +136,7 @@ private object Utils extends Logging {
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
}
})
- return dir
+ dir
}
/** Copy all data from an InputStream to an OutputStream */
@@ -174,35 +179,30 @@ private object Utils extends Logging {
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
tempFile.delete()
- throw new SparkException("File " + targetFile + " exists and does not match contents of" +
- " " + url)
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
} else {
Files.move(tempFile, targetFile)
}
case "file" | null =>
- val sourceFile = if (uri.isAbsolute) {
- new File(uri)
- } else {
- new File(url)
- }
- if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
- throw new SparkException("File " + targetFile + " exists and does not match contents of" +
- " " + url)
- } else {
- // Remove the file if it already exists
- targetFile.delete()
- // Symlink the file locally.
- if (uri.isAbsolute) {
- // url is absolute, i.e. it starts with "file:///". Extract the source
- // file's absolute path from the url.
- val sourceFile = new File(uri)
- logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+ // In the case of a local file, copy the local file to the target directory.
+ // Note the difference between uri vs url.
+ val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+ if (targetFile.exists) {
+ // If the target file already exists, warn the user if
+ if (!Files.equal(sourceFile, targetFile)) {
+ throw new SparkException(
+ "File " + targetFile + " exists and does not match contents of" + " " + url)
} else {
- // url is not absolute, i.e. itself is the path to the source file.
- logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
- FileUtil.symLink(url, targetFile.getAbsolutePath)
+ // Do nothing if the file contents are the same, i.e. this file has been copied
+ // previously.
+ logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ + targetFile.getAbsolutePath)
}
+ } else {
+ // The file does not exist in the target directory. Copy it there.
+ logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+ Files.copy(sourceFile, targetFile)
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
@@ -323,8 +323,6 @@ private object Utils extends Logging {
InetAddress.getByName(address).getHostName
}
-
-
def localHostPort(): String = {
val retval = System.getProperty("spark.hostPort", null)
if (retval == null) {
@@ -382,6 +380,7 @@ private object Utils extends Logging {
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
+
def parseHostPort(hostPort: String): (String, Int) = {
{
// Check cache first.
@@ -390,7 +389,8 @@ private object Utils extends Logging {
}
val indx: Int = hostPort.lastIndexOf(':')
- // This is potentially broken - when dealing with ipv6 addresses for example, sigh ... but then hadoop does not support ipv6 right now.
+ // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
+ // but then hadoop does not support ipv6 right now.
// For now, we assume that if port exists, then it is valid - not check if it is an int > 0
if (-1 == indx) {
val retval = (hostPort, 0)
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
index a5d6285c99..e071917c00 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -40,17 +40,25 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true
+ var compare:Int = 0
if (s1Needy && !s2Needy) {
- res = true
+ return true
} else if (!s1Needy && s2Needy) {
- res = false
+ return false
} else if (s1Needy && s2Needy) {
- res = minShareRatio1 <= minShareRatio2
+ compare = minShareRatio1.compareTo(minShareRatio2)
+ } else {
+ compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
+ }
+
+ if (compare < 0) {
+ return true
+ } else if (compare > 0) {
+ return false
} else {
- res = taskToWeightRatio1 <= taskToWeightRatio2
+ return s1.name < s2.name
}
- return res
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index d35c43f194..9b39d3aadf 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -3,8 +3,7 @@ package spark.storage
import java.io.{InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
-import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue}
-import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
import akka.actor.{ActorSystem, Cancellable, Props}
import akka.dispatch.{Await, Future}
@@ -15,7 +14,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import spark.{Logging, SizeEstimator, SparkEnv, SparkException, Utils}
+import spark.{Logging, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@@ -95,9 +94,11 @@ private[spark] class BlockManager(
new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
- private val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
- private val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
- private val nettyPort = if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
+ private val nettyPort: Int = {
+ val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
+ val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
+ if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
+ }
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
@@ -825,9 +826,23 @@ private[spark] class BlockManager(
}
/**
+ * Remove all blocks belonging to the given RDD.
+ * @return The number of blocks removed.
+ */
+ def removeRdd(rddId: Int): Int = {
+ // TODO: Instead of doing a linear scan on the blockInfo map, create another map that maps
+ // from RDD.id to blocks.
+ logInfo("Removing RDD " + rddId)
+ val rddPrefix = "rdd_" + rddId + "_"
+ val blocksToRemove = blockInfo.filter(_._1.startsWith(rddPrefix)).map(_._1)
+ blocksToRemove.foreach(blockId => removeBlock(blockId, false))
+ blocksToRemove.size
+ }
+
+ /**
* Remove a block from both memory and disk.
*/
- def removeBlock(blockId: String) {
+ def removeBlock(blockId: String, tellMaster: Boolean = true) {
logInfo("Removing block " + blockId)
val info = blockInfo.get(blockId).orNull
if (info != null) info.synchronized {
@@ -839,7 +854,7 @@ private[spark] class BlockManager(
"the disk or memory store")
}
blockInfo.remove(blockId)
- if (info.tellMaster) {
+ if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info)
}
} else {
@@ -950,7 +965,7 @@ private[spark] object BlockManager extends Logging {
}
def getHeartBeatFrequencyFromSystemProperties: Long =
- System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
+ System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
def getDisableHeartBeatsForTesting: Boolean =
System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index ac26c16867..58888b1ebb 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,19 +1,11 @@
package spark.storage
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.dispatch.Await
+import akka.actor.ActorRef
+import akka.dispatch.{Await, Future}
import akka.pattern.ask
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
+import akka.util.Duration
-import spark.{Logging, SparkException, Utils}
+import spark.{Logging, SparkException}
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
@@ -91,15 +83,13 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
/**
* Remove all blocks belonging to the given RDD.
*/
- def removeRdd(rddId: Int) {
- val rddBlockPrefix = "rdd_" + rddId + "_"
- // Get the list of blocks in block manager, and remove ones that are part of this RDD.
- // The runtime complexity is linear to the number of blocks persisted in the cluster.
- // It could be expensive if the cluster is large and has a lot of blocks persisted.
- getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) =>
- if (blockId.startsWith(rddBlockPrefix)) {
- removeBlock(blockId)
- }
+ def removeRdd(rddId: Int, blocking: Boolean) {
+ val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
+ future onFailure {
+ case e: Throwable => logError("Failed to remove RDD " + rddId, e)
+ }
+ if (blocking) {
+ Await.result(future, timeout)
}
}
@@ -114,7 +104,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
}
def getStorageStatus: Array[StorageStatus] = {
- askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
+ askDriverWithReply[Array[StorageStatus]](GetStorageStatus)
}
/** Stop the driver actor, called only on the Spark driver node */
@@ -151,7 +141,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
- throw new Exception("BlockManagerMaster returned null")
+ throw new SparkException("BlockManagerMaster returned null")
}
return result.asInstanceOf[T]
} catch {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 9b64f95df8..0d4384ba1f 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -2,15 +2,16 @@ package spark.storage
import java.util.{HashMap => JHashMap}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable
import scala.collection.JavaConversions._
-import scala.util.Random
import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.util.{Duration, Timeout}
+import akka.dispatch.Future
+import akka.pattern.ask
+import akka.util.Duration
import akka.util.duration._
-import spark.{Logging, Utils}
+import spark.{Logging, Utils, SparkException}
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -21,13 +22,16 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
// Mapping from block manager id to the block manager's information.
private val blockManagerInfo =
- new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+ new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
// Mapping from executor ID to block manager ID.
- private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId]
+ private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
// Mapping from block id to the set of block managers that have the block.
- private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+ private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]
+
+ val akkaTimeout = Duration.create(
+ System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
initLogging()
@@ -35,7 +39,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
- "5000").toLong
+ "60000").toLong
var timeoutCheckingTask: Cancellable = null
@@ -50,28 +54,34 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
+ sender ! true
case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+ // TODO: Ideally we want to handle all the message replies in receive instead of in the
+ // individual private methods.
updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
- getLocations(blockId)
+ sender ! getLocations(blockId)
case GetLocationsMultipleBlockIds(blockIds) =>
- getLocationsMultipleBlockIds(blockIds)
+ sender ! getLocationsMultipleBlockIds(blockIds)
case GetPeers(blockManagerId, size) =>
- getPeersDeterministic(blockManagerId, size)
- /*getPeers(blockManagerId, size)*/
+ sender ! getPeers(blockManagerId, size)
case GetMemoryStatus =>
- getMemoryStatus
+ sender ! memoryStatus
case GetStorageStatus =>
- getStorageStatus
+ sender ! storageStatus
+
+ case RemoveRdd(rddId) =>
+ sender ! removeRdd(rddId)
case RemoveBlock(blockId) =>
- removeBlock(blockId)
+ removeBlockFromWorkers(blockId)
+ sender ! true
case RemoveExecutor(execId) =>
removeExecutor(execId)
@@ -81,7 +91,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
- timeoutCheckingTask.cancel
+ timeoutCheckingTask.cancel()
}
context.stop(self)
@@ -89,13 +99,36 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
expireDeadHosts()
case HeartBeat(blockManagerId) =>
- heartBeat(blockManagerId)
+ sender ! heartBeat(blockManagerId)
case other =>
- logInfo("Got unknown message: " + other)
+ logWarning("Got unknown message: " + other)
+ }
+
+ private def removeRdd(rddId: Int): Future[Seq[Int]] = {
+ // First remove the metadata for the given RDD, and then asynchronously remove the blocks
+ // from the slaves.
+
+ val prefix = "rdd_" + rddId + "_"
+ // Find all blocks for the given RDD, remove the block from both blockLocations and
+ // the blockManagerInfo that is tracking the blocks.
+ val blocks = blockLocations.keySet().filter(_.startsWith(prefix))
+ blocks.foreach { blockId =>
+ val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
+ bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+ blockLocations.remove(blockId)
+ }
+
+ // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
+ // The dispatcher is used as an implicit argument into the Future sequence construction.
+ import context.dispatcher
+ val removeMsg = RemoveRdd(rddId)
+ Future.sequence(blockManagerInfo.values.map { bm =>
+ bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
+ }.toSeq)
}
- def removeBlockManager(blockManagerId: BlockManagerId) {
+ private def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
// Remove the block manager from blockManagerIdByExecutor.
@@ -106,7 +139,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
- val locations = blockLocations.get(blockId)._2
+ val locations = blockLocations.get(blockId)
locations -= blockManagerId
if (locations.size == 0) {
blockLocations.remove(locations)
@@ -114,11 +147,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
}
- def expireDeadHosts() {
+ private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
- val toRemove = new HashSet[BlockManagerId]
+ val toRemove = new mutable.HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
@@ -129,31 +162,26 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
toRemove.foreach(removeBlockManager)
}
- def removeExecutor(execId: String) {
+ private def removeExecutor(execId: String) {
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
- sender ! true
}
- def heartBeat(blockManagerId: BlockManagerId) {
+ private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
if (!blockManagerInfo.contains(blockManagerId)) {
- if (blockManagerId.executorId == "<driver>" && !isLocal) {
- sender ! true
- } else {
- sender ! false
- }
+ blockManagerId.executorId == "<driver>" && !isLocal
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
- sender ! true
+ true
}
}
// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
- private def removeBlock(blockId: String) {
- val block = blockLocations.get(blockId)
- if (block != null) {
- block._2.foreach { blockManagerId: BlockManagerId =>
+ private def removeBlockFromWorkers(blockId: String) {
+ val locations = blockLocations.get(blockId)
+ if (locations != null) {
+ locations.foreach { blockManagerId: BlockManagerId =>
val blockManager = blockManagerInfo.get(blockManagerId)
if (blockManager.isDefined) {
// Remove the block from the slave's BlockManager.
@@ -163,23 +191,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
}
}
}
- sender ! true
}
// Return a map from the block manager id to max memory and remaining memory.
- private def getMemoryStatus() {
- val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
+ blockManagerInfo.map { case(blockManagerId, info) =>
(blockManagerId, (info.maxMem, info.remainingMem))
}.toMap
- sender ! res
}
- private def getStorageStatus() {
- val res = blockManagerInfo.map { case(blockManagerId, info) =>
+ private def storageStatus: Array[StorageStatus] = {
+ blockManagerInfo.map { case(blockManagerId, info) =>
import collection.JavaConverters._
StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
- }
- sender ! res
+ }.toArray
}
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
@@ -188,7 +213,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
} else if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
- // A block manager of the same host name already exists
+ // A block manager of the same executor already exists.
+ // This should never happen. Let's just quit.
logError("Got two different block manager registrations on " + id.executorId)
System.exit(1)
case None =>
@@ -197,7 +223,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
- sender ! true
}
private def updateBlockInfo(
@@ -226,12 +251,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
- var locations: HashSet[BlockManagerId] = null
+ var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
- locations = blockLocations.get(blockId)._2
+ locations = blockLocations.get(blockId)
} else {
- locations = new HashSet[BlockManagerId]
- blockLocations.put(blockId, (storageLevel.replication, locations))
+ locations = new mutable.HashSet[BlockManagerId]
+ blockLocations.put(blockId, locations)
}
if (storageLevel.isValid) {
@@ -247,70 +272,24 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! true
}
- private def getLocations(blockId: String) {
- val startTimeMs = System.currentTimeMillis()
- val tmp = " " + blockId + " "
- if (blockLocations.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockLocations.get(blockId)._2)
- sender ! res.toSeq
- } else {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- sender ! res
- }
- }
-
- private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
- def getLocations(blockId: String): Seq[BlockManagerId] = {
- val tmp = blockId
- if (blockLocations.containsKey(blockId)) {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(blockLocations.get(blockId)._2)
- return res.toSeq
- } else {
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- return res.toSeq
- }
- }
-
- var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
- for (blockId <- blockIds) {
- res.append(getLocations(blockId))
- }
- sender ! res.toSeq
+ private def getLocations(blockId: String): Seq[BlockManagerId] = {
+ if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}
- private def getPeers(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
- res.appendAll(peers)
- res -= blockManagerId
- val rand = new Random(System.currentTimeMillis())
- while (res.length > size) {
- res.remove(rand.nextInt(res.length))
- }
- sender ! res.toSeq
+ private def getLocationsMultipleBlockIds(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+ blockIds.map(blockId => getLocations(blockId))
}
- private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
- var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
- var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+ private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
+ val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
val selfIndex = peers.indexOf(blockManagerId)
if (selfIndex == -1) {
- throw new Exception("Self index for " + blockManagerId + " not found")
+ throw new SparkException("Self index for " + blockManagerId + " not found")
}
// Note that this logic will select the same node multiple times if there aren't enough peers
- var index = selfIndex
- while (res.size < size) {
- index += 1
- if (index == selfIndex) {
- throw new Exception("More peer expected than available")
- }
- res += peers(index % peers.size)
- }
- sender ! res.toSeq
+ Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
}
}
@@ -384,6 +363,13 @@ object BlockManagerMasterActor {
}
}
+ def removeBlock(blockId: String) {
+ if (_blocks.containsKey(blockId)) {
+ _remainingMem += _blocks.get(blockId).memSize
+ _blocks.remove(blockId)
+ }
+ }
+
def remainingMem: Long = _remainingMem
def lastSeenMs: Long = _lastSeenMs
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index cff48d9909..0010726c8d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -16,6 +16,9 @@ sealed trait ToBlockManagerSlave
private[spark]
case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+// Remove all blocks belonging to a specific RDD.
+private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
+
//////////////////////////////////////////////////////////////////////////////////
// Messages from slaves to the master.
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
index f570cdc52d..b264d1deb5 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -11,6 +11,12 @@ import spark.{Logging, SparkException, Utils}
*/
class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
override def receive = {
- case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+
+ case RemoveBlock(blockId) =>
+ blockManager.removeBlock(blockId)
+
+ case RemoveRdd(rddId) =>
+ val numBlocksRemoved = blockManager.removeRdd(rddId)
+ sender ! numBlocksRemoved
}
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 15225f93a6..3057ade233 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -2,13 +2,7 @@ package spark.storage
import java.nio.ByteBuffer
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
-
-import spark.{Logging, Utils, SparkEnv}
+import spark.{Logging, Utils}
import spark.network._
/**
@@ -88,8 +82,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
private[spark] object BlockManagerWorker extends Logging {
private var blockManagerWorker: BlockManagerWorker = null
- private val DATA_TRANSFER_TIME_OUT_MS: Long = 500
- private val REQUEST_RETRY_INTERVAL_MS: Long = 1000
initLogging()
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index cd79bd2bda..e93cc3b485 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -29,7 +29,7 @@ private[spark] object AkkaUtils {
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
+ val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
// 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index a761dd77c5..3f69e99780 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -106,9 +106,9 @@ class RDDSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
rdd.count
- assert(sc.persistentRdds.isEmpty == false)
+ assert(sc.persistentRdds.isEmpty === false)
rdd.unpersist()
- assert(sc.persistentRdds.isEmpty == true)
+ assert(sc.persistentRdds.isEmpty === true)
failAfter(Span(3000, Millis)) {
try {
@@ -116,12 +116,12 @@ class RDDSuite extends FunSuite with LocalSparkContext {
Thread.sleep(200)
}
} catch {
- case e: Exception =>
+ case _ => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
}
- assert(sc.getRDDStorageInfo.isEmpty == true)
+ assert(sc.getRDDStorageInfo.isEmpty === true)
}
test("caching with failures") {
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index a39418b716..c861597c6b 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
}
}
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
+class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
@@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedTaskSetQueue()
- for (taskSet <- taskSetQueue)
- {
+ /* Just for Test*/
+ for (manager <- taskSetQueue) {
+ logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+ }
+ for (taskSet <- taskSetQueue) {
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
case Some(task) =>
return taskSet.stageId
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index bff2475686..b9d5f9668e 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -15,10 +15,10 @@ import org.scalatest.time.SpanSugar._
import spark.JavaSerializer
import spark.KryoSerializer
import spark.SizeEstimator
-import spark.Utils
import spark.util.AkkaUtils
import spark.util.ByteBufferInputStream
+
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
var store2: BlockManager = null
@@ -124,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
@@ -170,7 +170,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
- store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
@@ -218,7 +218,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
- master.removeRdd(0)
+ master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
store.getSingle("rdd_0_0") should be (None)
@@ -232,6 +232,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.getSingle("nonrddblock") should not be (None)
master.getLocations("nonrddblock") should have size (1)
}
+
+ store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+ master.removeRdd(0, blocking = true)
+ store.getSingle("rdd_0_0") should be (None)
+ master.getLocations("rdd_0_0") should have size 0
+ store.getSingle("rdd_0_1") should be (None)
+ master.getLocations("rdd_0_1") should have size 0
}
test("reregistration on heart beat") {
@@ -262,7 +270,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
- store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -280,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
@@ -490,9 +498,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store")
@@ -501,7 +509,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1") != None, "list1 was not in store")
assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store")
@@ -516,9 +524,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val list3 = List(new Array[Byte](200), new Array[Byte](200))
val list4 = List(new Array[Byte](200), new Array[Byte](200))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
// At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size === 2)
@@ -533,7 +541,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size === 2)