aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-05-26 13:15:06 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-05-26 13:15:06 -0700
commite141f644cab34064bda5b551f9a4baa75b995552 (patch)
treef0c6e3a871398fc6ec5ddadf0ab9e3cbb31df6f2 /core
parent69372b48f0edb27d2ae2d3da4a362f7d8be8d97a (diff)
parentae64920337f1658a5b3a2ce65fd94679cc23822f (diff)
downloadspark-e141f644cab34064bda5b551f9a4baa75b995552.tar.gz
spark-e141f644cab34064bda5b551f9a4baa75b995552.tar.bz2
spark-e141f644cab34064bda5b551f9a4baa75b995552.zip
Merge pull request #132 from Benky/rb-first-iteration
Little refactoring and unit tests for CacheTrackerActor
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Cache.scala2
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala43
-rw-r--r--core/src/main/scala/spark/MesosScheduler.scala27
-rw-r--r--core/src/main/scala/spark/Utils.scala15
-rw-r--r--core/src/test/scala/spark/CacheTrackerSuite.scala97
-rw-r--r--core/src/test/scala/spark/MesosSchedulerSuite.scala28
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala (renamed from core/src/test/scala/spark/Utils.scala)13
7 files changed, 178 insertions, 47 deletions
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
index aeff205884..150fe14e2c 100644
--- a/core/src/main/scala/spark/Cache.scala
+++ b/core/src/main/scala/spark/Cache.scala
@@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger
sealed trait CachePutResponse
case class CachePutSuccess(size: Long) extends CachePutResponse
-case class CachePutFailure extends CachePutResponse
+case class CachePutFailure() extends CachePutResponse
/**
* An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 5b5831b2de..4867829c17 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -56,7 +56,7 @@ class CacheTrackerActor extends DaemonActor with Logging {
case AddedToCache(rddId, partition, host, size) =>
if (size > 0) {
- slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size)
+ slaveUsage.put(host, getCacheUsage(host) + size)
logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
@@ -71,10 +71,10 @@ class CacheTrackerActor extends DaemonActor with Logging {
logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
- slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size)
+ slaveUsage.put(host, getCacheUsage(host) - size)
// Do a sanity check to make sure usage is greater than 0.
- val usage = slaveUsage.getOrElse(host, 0L)
+ val usage = getCacheUsage(host)
if (usage < 0) {
logError("Cache usage on %s is negative (%d)".format(host, usage))
}
@@ -82,22 +82,19 @@ class CacheTrackerActor extends DaemonActor with Logging {
logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host))
}
locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host)
-
+ reply('OK)
+
case MemoryCacheLost(host) =>
logInfo("Memory cache lost on " + host)
// TODO: Drop host from the memory locations list of all RDDs
case GetCacheLocations =>
logInfo("Asked for current cache locations")
- val locsCopy = new HashMap[Int, Array[List[String]]]
- for ((rddId, array) <- locs) {
- locsCopy(rddId) = array.clone()
- }
- reply(locsCopy)
+ reply(locs.map{case (rrdId, array) => (rrdId -> array.clone())})
case GetCacheStatus =>
- val status: Seq[Tuple3[String, Long, Long]] = slaveCapacity.keys.map { key =>
- (key, slaveCapacity.getOrElse(key, 0L), slaveUsage.getOrElse(key, 0L))
+ val status = slaveCapacity.map { case (host,capacity) =>
+ (host, capacity, getCacheUsage(host))
}.toSeq
reply(status)
@@ -130,9 +127,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
}
// Report the cache being started.
- trackerActor !? SlaveCacheStarted(
- System.getProperty("spark.hostname", Utils.localHostName),
- cache.getCapacity)
+ trackerActor !? SlaveCacheStarted(Utils.getHost, cache.getCapacity)
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[(Int, Int)]
@@ -151,20 +146,17 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Get a snapshot of the currently known locations
def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = {
(trackerActor !? GetCacheLocations) match {
- case h: HashMap[_, _] =>
- h.asInstanceOf[HashMap[Int, Array[List[String]]]]
+ case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]]
- case _ =>
- throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
+ case _ => throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
}
}
// Get the usage status of slave caches. Each tuple in the returned sequence
// is in the form of (host name, capacity, usage).
- def getCacheStatus(): Seq[Tuple3[String, Long, Long]] = {
+ def getCacheStatus(): Seq[(String, Long, Long)] = {
(trackerActor !? GetCacheStatus) match {
- case h: Seq[Tuple3[String, Long, Long]] =>
- h.asInstanceOf[Seq[Tuple3[String, Long, Long]]]
+ case h: Seq[(String, Long, Long)] => h.asInstanceOf[Seq[(String, Long, Long)]]
case _ =>
throw new SparkException(
@@ -202,7 +194,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
}
// If we got here, we have to load the split
// Tell the master that we're doing so
- val host = System.getProperty("spark.hostname", Utils.localHostName)
+
// TODO: fetch any remote copy of the split that may be available
logInfo("Computing partition " + split)
var array: Array[T] = null
@@ -223,7 +215,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
case CachePutSuccess(size) => {
// Tell the master that we added the entry. Don't return until it
// replies so it can properly schedule future tasks that use this RDD.
- trackerActor !? AddedToCache(rdd.id, split.index, host, size)
+ trackerActor !? AddedToCache(rdd.id, split.index, Utils.getHost, size)
}
case _ => null
}
@@ -234,9 +226,8 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Called by the Cache to report that an entry has been dropped from it
def dropEntry(datasetId: Any, partition: Int) {
datasetId match {
- case (cache.keySpaceId, rddId: Int) =>
- val host = System.getProperty("spark.hostname", Utils.localHostName)
- trackerActor !! DroppedFromCache(rddId, partition, host)
+ //TODO - do we really want to use '!!' when nobody checks returned future? '!' seems to enough here.
+ case (cache.keySpaceId, rddId: Int) => trackerActor !! DroppedFromCache(rddId, partition, Utils.getHost)
}
}
diff --git a/core/src/main/scala/spark/MesosScheduler.scala b/core/src/main/scala/spark/MesosScheduler.scala
index b95f40b877..755e001106 100644
--- a/core/src/main/scala/spark/MesosScheduler.scala
+++ b/core/src/main/scala/spark/MesosScheduler.scala
@@ -42,7 +42,7 @@ private class MesosScheduler(
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
- memoryStringToMb(System.getenv("SPARK_MEM"))
+ MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
@@ -78,9 +78,7 @@ private class MesosScheduler(
// Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first)
private val jobOrdering = new Ordering[Job] {
- override def compare(j1: Job, j2: Job): Int = {
- return j2.runId - j1.runId
- }
+ override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId
}
def newJobId(): Int = this.synchronized {
@@ -156,7 +154,7 @@ private class MesosScheduler(
activeJobs(jobId) = myJob
activeJobsQueue += myJob
logInfo("Adding job with ID " + jobId)
- jobTasks(jobId) = new HashSet()
+ jobTasks(jobId) = HashSet.empty[String]
}
driver.reviveOffers();
}
@@ -376,24 +374,27 @@ private class MesosScheduler(
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+}
+object MesosScheduler {
/**
- * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
- * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
+ * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
+ * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* environment variable.
*/
def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase
if (lower.endsWith("k")) {
- (lower.substring(0, lower.length-1).toLong / 1024).toInt
+ (lower.substring(0, lower.length - 1).toLong / 1024).toInt
} else if (lower.endsWith("m")) {
- lower.substring(0, lower.length-1).toInt
+ lower.substring(0, lower.length - 1).toInt
} else if (lower.endsWith("g")) {
- lower.substring(0, lower.length-1).toInt * 1024
+ lower.substring(0, lower.length - 1).toInt * 1024
} else if (lower.endsWith("t")) {
- lower.substring(0, lower.length-1).toInt * 1024 * 1024
- } else {// no suffix, so it's just a number in bytes
+ lower.substring(0, lower.length - 1).toInt * 1024 * 1024
+ } else {
+ // no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt
}
}
-}
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index d108c14f6b..cfd6dc8b2a 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -2,11 +2,11 @@ package spark
import java.io._
import java.net.InetAddress
-import java.util.UUID
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
+import java.util.{Locale, UUID}
/**
* Various utility methods used by Spark.
@@ -157,9 +157,12 @@ object Utils {
/**
* Get the local machine's hostname.
*/
- def localHostName(): String = {
- return InetAddress.getLocalHost().getHostName
- }
+ def localHostName(): String = InetAddress.getLocalHost.getHostName
+
+ /**
+ * Get current host
+ */
+ def getHost = System.getProperty("spark.hostname", localHostName())
/**
* Delete a file or directory and its contents recursively.
@@ -184,7 +187,7 @@ object Utils {
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10
- val B = 1L
+
val (value, unit) = {
if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
@@ -196,6 +199,6 @@ object Utils {
(size.asInstanceOf[Double], "B")
}
}
- "%.1f%s".format(value, unit)
+ "%.1f%s".formatLocal(Locale.US, value, unit)
}
}
diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala
new file mode 100644
index 0000000000..60290d14ca
--- /dev/null
+++ b/core/src/test/scala/spark/CacheTrackerSuite.scala
@@ -0,0 +1,97 @@
+package spark
+
+import org.scalatest.FunSuite
+import collection.mutable.HashMap
+
+class CacheTrackerSuite extends FunSuite {
+
+ test("CacheTrackerActor slave initialization & cache status") {
+ System.setProperty("spark.master.port", "1345")
+ val initialSize = 2L << 20
+
+ val tracker = new CacheTrackerActor
+ tracker.start()
+
+ tracker !? SlaveCacheStarted("host001", initialSize)
+
+ assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 0L)))
+
+ tracker !? StopCacheTracker
+ }
+
+ test("RegisterRDD") {
+ System.setProperty("spark.master.port", "1345")
+ val initialSize = 2L << 20
+
+ val tracker = new CacheTrackerActor
+ tracker.start()
+
+ tracker !? SlaveCacheStarted("host001", initialSize)
+
+ tracker !? RegisterRDD(1, 3)
+ tracker !? RegisterRDD(2, 1)
+
+ assert(getCacheLocations(tracker) == Map(1 -> List(List(), List(), List()), 2 -> List(List())))
+
+ tracker !? StopCacheTracker
+ }
+
+ test("AddedToCache") {
+ System.setProperty("spark.master.port", "1345")
+ val initialSize = 2L << 20
+
+ val tracker = new CacheTrackerActor
+ tracker.start()
+
+ tracker !? SlaveCacheStarted("host001", initialSize)
+
+ tracker !? RegisterRDD(1, 2)
+ tracker !? RegisterRDD(2, 1)
+
+ tracker !? AddedToCache(1, 0, "host001", 2L << 15)
+ tracker !? AddedToCache(1, 1, "host001", 2L << 11)
+ tracker !? AddedToCache(2, 0, "host001", 3L << 10)
+
+ assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L)))
+
+ assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001"))))
+
+ tracker !? StopCacheTracker
+ }
+
+ test("DroppedFromCache") {
+ System.setProperty("spark.master.port", "1345")
+ val initialSize = 2L << 20
+
+ val tracker = new CacheTrackerActor
+ tracker.start()
+
+ tracker !? SlaveCacheStarted("host001", initialSize)
+
+ tracker !? RegisterRDD(1, 2)
+ tracker !? RegisterRDD(2, 1)
+
+ tracker !? AddedToCache(1, 0, "host001", 2L << 15)
+ tracker !? AddedToCache(1, 1, "host001", 2L << 11)
+ tracker !? AddedToCache(2, 0, "host001", 3L << 10)
+
+ assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L)))
+ assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001"))))
+
+ tracker !? DroppedFromCache(1, 1, "host001", 2L << 11)
+
+ assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 68608L)))
+ assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"),List()), 2 -> List(List("host001"))))
+
+ tracker !? StopCacheTracker
+ }
+
+ /**
+ * Helper function to get cacheLocations from CacheTracker
+ */
+ def getCacheLocations(tracker: CacheTrackerActor) = tracker !? GetCacheLocations match {
+ case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]].map {
+ case (i, arr) => (i -> arr.toList)
+ }
+ }
+}
diff --git a/core/src/test/scala/spark/MesosSchedulerSuite.scala b/core/src/test/scala/spark/MesosSchedulerSuite.scala
new file mode 100644
index 0000000000..0e6820cbdc
--- /dev/null
+++ b/core/src/test/scala/spark/MesosSchedulerSuite.scala
@@ -0,0 +1,28 @@
+package spark
+
+import org.scalatest.FunSuite
+
+class MesosSchedulerSuite extends FunSuite {
+ test("memoryStringToMb"){
+
+ assert(MesosScheduler.memoryStringToMb("1") == 0)
+ assert(MesosScheduler.memoryStringToMb("1048575") == 0)
+ assert(MesosScheduler.memoryStringToMb("3145728") == 3)
+
+ assert(MesosScheduler.memoryStringToMb("1024k") == 1)
+ assert(MesosScheduler.memoryStringToMb("5000k") == 4)
+ assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K"))
+
+ assert(MesosScheduler.memoryStringToMb("1024m") == 1024)
+ assert(MesosScheduler.memoryStringToMb("5000m") == 5000)
+ assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M"))
+
+ assert(MesosScheduler.memoryStringToMb("2g") == 2048)
+ assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G"))
+
+ assert(MesosScheduler.memoryStringToMb("2t") == 2097152)
+ assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T"))
+
+
+ }
+}
diff --git a/core/src/test/scala/spark/Utils.scala b/core/src/test/scala/spark/UtilsSuite.scala
index 4e852903be..f31251e509 100644
--- a/core/src/test/scala/spark/Utils.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -1,7 +1,8 @@
package spark
import org.scalatest.FunSuite
-
+import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
+import util.Random
class UtilsSuite extends FunSuite {
@@ -14,5 +15,15 @@ class UtilsSuite extends FunSuite {
assert(Utils.memoryBytesToString(5368709120L) === "5.0GB")
}
+ test("copyStream") {
+ //input array initialization
+ val bytes = Array.ofDim[Byte](9000)
+ Random.nextBytes(bytes)
+
+ val os = new ByteArrayOutputStream()
+ Utils.copyStream(new ByteArrayInputStream(bytes), os)
+
+ assert(os.toByteArray.toList.equals(bytes.toList))
+ }
}