diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-05-26 13:15:06 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-05-26 13:15:06 -0700 |
commit | e141f644cab34064bda5b551f9a4baa75b995552 (patch) | |
tree | f0c6e3a871398fc6ec5ddadf0ab9e3cbb31df6f2 | |
parent | 69372b48f0edb27d2ae2d3da4a362f7d8be8d97a (diff) | |
parent | ae64920337f1658a5b3a2ce65fd94679cc23822f (diff) | |
download | spark-e141f644cab34064bda5b551f9a4baa75b995552.tar.gz spark-e141f644cab34064bda5b551f9a4baa75b995552.tar.bz2 spark-e141f644cab34064bda5b551f9a4baa75b995552.zip |
Merge pull request #132 from Benky/rb-first-iteration
Little refactoring and unit tests for CacheTrackerActor
-rw-r--r-- | core/src/main/scala/spark/Cache.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/CacheTracker.scala | 43 | ||||
-rw-r--r-- | core/src/main/scala/spark/MesosScheduler.scala | 27 | ||||
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 15 | ||||
-rw-r--r-- | core/src/test/scala/spark/CacheTrackerSuite.scala | 97 | ||||
-rw-r--r-- | core/src/test/scala/spark/MesosSchedulerSuite.scala | 28 | ||||
-rw-r--r-- | core/src/test/scala/spark/UtilsSuite.scala (renamed from core/src/test/scala/spark/Utils.scala) | 13 |
7 files changed, 178 insertions, 47 deletions
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala index aeff205884..150fe14e2c 100644 --- a/core/src/main/scala/spark/Cache.scala +++ b/core/src/main/scala/spark/Cache.scala @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger sealed trait CachePutResponse case class CachePutSuccess(size: Long) extends CachePutResponse -case class CachePutFailure extends CachePutResponse +case class CachePutFailure() extends CachePutResponse /** * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 5b5831b2de..4867829c17 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -56,7 +56,7 @@ class CacheTrackerActor extends DaemonActor with Logging { case AddedToCache(rddId, partition, host, size) => if (size > 0) { - slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size) + slaveUsage.put(host, getCacheUsage(host) + size) logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format( rddId, partition, host, Utils.memoryBytesToString(size), Utils.memoryBytesToString(getCacheAvailable(host)))) @@ -71,10 +71,10 @@ class CacheTrackerActor extends DaemonActor with Logging { logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format( rddId, partition, host, Utils.memoryBytesToString(size), Utils.memoryBytesToString(getCacheAvailable(host)))) - slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size) + slaveUsage.put(host, getCacheUsage(host) - size) // Do a sanity check to make sure usage is greater than 0. - val usage = slaveUsage.getOrElse(host, 0L) + val usage = getCacheUsage(host) if (usage < 0) { logError("Cache usage on %s is negative (%d)".format(host, usage)) } @@ -82,22 +82,19 @@ class CacheTrackerActor extends DaemonActor with Logging { logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host)) } locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host) - + reply('OK) + case MemoryCacheLost(host) => logInfo("Memory cache lost on " + host) // TODO: Drop host from the memory locations list of all RDDs case GetCacheLocations => logInfo("Asked for current cache locations") - val locsCopy = new HashMap[Int, Array[List[String]]] - for ((rddId, array) <- locs) { - locsCopy(rddId) = array.clone() - } - reply(locsCopy) + reply(locs.map{case (rrdId, array) => (rrdId -> array.clone())}) case GetCacheStatus => - val status: Seq[Tuple3[String, Long, Long]] = slaveCapacity.keys.map { key => - (key, slaveCapacity.getOrElse(key, 0L), slaveUsage.getOrElse(key, 0L)) + val status = slaveCapacity.map { case (host,capacity) => + (host, capacity, getCacheUsage(host)) }.toSeq reply(status) @@ -130,9 +127,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { } // Report the cache being started. - trackerActor !? SlaveCacheStarted( - System.getProperty("spark.hostname", Utils.localHostName), - cache.getCapacity) + trackerActor !? SlaveCacheStarted(Utils.getHost, cache.getCapacity) // Remembers which splits are currently being loaded (on worker nodes) val loading = new HashSet[(Int, Int)] @@ -151,20 +146,17 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { // Get a snapshot of the currently known locations def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = { (trackerActor !? GetCacheLocations) match { - case h: HashMap[_, _] => - h.asInstanceOf[HashMap[Int, Array[List[String]]]] + case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]] - case _ => - throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap") + case _ => throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap") } } // Get the usage status of slave caches. Each tuple in the returned sequence // is in the form of (host name, capacity, usage). - def getCacheStatus(): Seq[Tuple3[String, Long, Long]] = { + def getCacheStatus(): Seq[(String, Long, Long)] = { (trackerActor !? GetCacheStatus) match { - case h: Seq[Tuple3[String, Long, Long]] => - h.asInstanceOf[Seq[Tuple3[String, Long, Long]]] + case h: Seq[(String, Long, Long)] => h.asInstanceOf[Seq[(String, Long, Long)]] case _ => throw new SparkException( @@ -202,7 +194,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { } // If we got here, we have to load the split // Tell the master that we're doing so - val host = System.getProperty("spark.hostname", Utils.localHostName) + // TODO: fetch any remote copy of the split that may be available logInfo("Computing partition " + split) var array: Array[T] = null @@ -223,7 +215,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { case CachePutSuccess(size) => { // Tell the master that we added the entry. Don't return until it // replies so it can properly schedule future tasks that use this RDD. - trackerActor !? AddedToCache(rdd.id, split.index, host, size) + trackerActor !? AddedToCache(rdd.id, split.index, Utils.getHost, size) } case _ => null } @@ -234,9 +226,8 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { // Called by the Cache to report that an entry has been dropped from it def dropEntry(datasetId: Any, partition: Int) { datasetId match { - case (cache.keySpaceId, rddId: Int) => - val host = System.getProperty("spark.hostname", Utils.localHostName) - trackerActor !! DroppedFromCache(rddId, partition, host) + //TODO - do we really want to use '!!' when nobody checks returned future? '!' seems to enough here. + case (cache.keySpaceId, rddId: Int) => trackerActor !! DroppedFromCache(rddId, partition, Utils.getHost) } } diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala index b95f40b877..755e001106 100644 --- a/core/src/main/scala/spark/MesosScheduler.scala +++ b/core/src/main/scala/spark/MesosScheduler.scala @@ -42,7 +42,7 @@ private class MesosScheduler( // Memory used by each executor (in megabytes) val EXECUTOR_MEMORY = { if (System.getenv("SPARK_MEM") != null) { - memoryStringToMb(System.getenv("SPARK_MEM")) + MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM")) // TODO: Might need to add some extra memory for the non-heap parts of the JVM } else { 512 @@ -78,9 +78,7 @@ private class MesosScheduler( // Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first) private val jobOrdering = new Ordering[Job] { - override def compare(j1: Job, j2: Job): Int = { - return j2.runId - j1.runId - } + override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId } def newJobId(): Int = this.synchronized { @@ -156,7 +154,7 @@ private class MesosScheduler( activeJobs(jobId) = myJob activeJobsQueue += myJob logInfo("Adding job with ID " + jobId) - jobTasks(jobId) = new HashSet() + jobTasks(jobId) = HashSet.empty[String] } driver.reviveOffers(); } @@ -376,24 +374,27 @@ private class MesosScheduler( } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} +} +object MesosScheduler { /** - * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. - * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM + * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. + * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM * environment variable. */ def memoryStringToMb(str: String): Int = { val lower = str.toLowerCase if (lower.endsWith("k")) { - (lower.substring(0, lower.length-1).toLong / 1024).toInt + (lower.substring(0, lower.length - 1).toLong / 1024).toInt } else if (lower.endsWith("m")) { - lower.substring(0, lower.length-1).toInt + lower.substring(0, lower.length - 1).toInt } else if (lower.endsWith("g")) { - lower.substring(0, lower.length-1).toInt * 1024 + lower.substring(0, lower.length - 1).toInt * 1024 } else if (lower.endsWith("t")) { - lower.substring(0, lower.length-1).toInt * 1024 * 1024 - } else {// no suffix, so it's just a number in bytes + lower.substring(0, lower.length - 1).toInt * 1024 * 1024 + } else { + // no suffix, so it's just a number in bytes (lower.toLong / 1024 / 1024).toInt } } -} +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index d108c14f6b..cfd6dc8b2a 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -2,11 +2,11 @@ package spark import java.io._ import java.net.InetAddress -import java.util.UUID import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import scala.collection.mutable.ArrayBuffer import scala.util.Random +import java.util.{Locale, UUID} /** * Various utility methods used by Spark. @@ -157,9 +157,12 @@ object Utils { /** * Get the local machine's hostname. */ - def localHostName(): String = { - return InetAddress.getLocalHost().getHostName - } + def localHostName(): String = InetAddress.getLocalHost.getHostName + + /** + * Get current host + */ + def getHost = System.getProperty("spark.hostname", localHostName()) /** * Delete a file or directory and its contents recursively. @@ -184,7 +187,7 @@ object Utils { val GB = 1L << 30 val MB = 1L << 20 val KB = 1L << 10 - val B = 1L + val (value, unit) = { if (size >= 2*GB) { (size.asInstanceOf[Double] / GB, "GB") @@ -196,6 +199,6 @@ object Utils { (size.asInstanceOf[Double], "B") } } - "%.1f%s".format(value, unit) + "%.1f%s".formatLocal(Locale.US, value, unit) } } diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala new file mode 100644 index 0000000000..60290d14ca --- /dev/null +++ b/core/src/test/scala/spark/CacheTrackerSuite.scala @@ -0,0 +1,97 @@ +package spark + +import org.scalatest.FunSuite +import collection.mutable.HashMap + +class CacheTrackerSuite extends FunSuite { + + test("CacheTrackerActor slave initialization & cache status") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 0L))) + + tracker !? StopCacheTracker + } + + test("RegisterRDD") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + tracker !? RegisterRDD(1, 3) + tracker !? RegisterRDD(2, 1) + + assert(getCacheLocations(tracker) == Map(1 -> List(List(), List(), List()), 2 -> List(List()))) + + tracker !? StopCacheTracker + } + + test("AddedToCache") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + tracker !? RegisterRDD(1, 2) + tracker !? RegisterRDD(2, 1) + + tracker !? AddedToCache(1, 0, "host001", 2L << 15) + tracker !? AddedToCache(1, 1, "host001", 2L << 11) + tracker !? AddedToCache(2, 0, "host001", 3L << 10) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L))) + + assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001")))) + + tracker !? StopCacheTracker + } + + test("DroppedFromCache") { + System.setProperty("spark.master.port", "1345") + val initialSize = 2L << 20 + + val tracker = new CacheTrackerActor + tracker.start() + + tracker !? SlaveCacheStarted("host001", initialSize) + + tracker !? RegisterRDD(1, 2) + tracker !? RegisterRDD(2, 1) + + tracker !? AddedToCache(1, 0, "host001", 2L << 15) + tracker !? AddedToCache(1, 1, "host001", 2L << 11) + tracker !? AddedToCache(2, 0, "host001", 3L << 10) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L))) + assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001")))) + + tracker !? DroppedFromCache(1, 1, "host001", 2L << 11) + + assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 68608L))) + assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"),List()), 2 -> List(List("host001")))) + + tracker !? StopCacheTracker + } + + /** + * Helper function to get cacheLocations from CacheTracker + */ + def getCacheLocations(tracker: CacheTrackerActor) = tracker !? GetCacheLocations match { + case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]].map { + case (i, arr) => (i -> arr.toList) + } + } +} diff --git a/core/src/test/scala/spark/MesosSchedulerSuite.scala b/core/src/test/scala/spark/MesosSchedulerSuite.scala new file mode 100644 index 0000000000..0e6820cbdc --- /dev/null +++ b/core/src/test/scala/spark/MesosSchedulerSuite.scala @@ -0,0 +1,28 @@ +package spark + +import org.scalatest.FunSuite + +class MesosSchedulerSuite extends FunSuite { + test("memoryStringToMb"){ + + assert(MesosScheduler.memoryStringToMb("1") == 0) + assert(MesosScheduler.memoryStringToMb("1048575") == 0) + assert(MesosScheduler.memoryStringToMb("3145728") == 3) + + assert(MesosScheduler.memoryStringToMb("1024k") == 1) + assert(MesosScheduler.memoryStringToMb("5000k") == 4) + assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K")) + + assert(MesosScheduler.memoryStringToMb("1024m") == 1024) + assert(MesosScheduler.memoryStringToMb("5000m") == 5000) + assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M")) + + assert(MesosScheduler.memoryStringToMb("2g") == 2048) + assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G")) + + assert(MesosScheduler.memoryStringToMb("2t") == 2097152) + assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T")) + + + } +} diff --git a/core/src/test/scala/spark/Utils.scala b/core/src/test/scala/spark/UtilsSuite.scala index 4e852903be..f31251e509 100644 --- a/core/src/test/scala/spark/Utils.scala +++ b/core/src/test/scala/spark/UtilsSuite.scala @@ -1,7 +1,8 @@ package spark import org.scalatest.FunSuite - +import java.io.{ByteArrayOutputStream, ByteArrayInputStream} +import util.Random class UtilsSuite extends FunSuite { @@ -14,5 +15,15 @@ class UtilsSuite extends FunSuite { assert(Utils.memoryBytesToString(5368709120L) === "5.0GB") } + test("copyStream") { + //input array initialization + val bytes = Array.ofDim[Byte](9000) + Random.nextBytes(bytes) + + val os = new ByteArrayOutputStream() + Utils.copyStream(new ByteArrayInputStream(bytes), os) + + assert(os.toByteArray.toList.equals(bytes.toList)) + } } |