aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRichard Benkovsky <richard.benkovsky@gooddata.com>2012-05-20 08:02:30 +0200
committerRichard Benkovsky <richard.benkovsky@gooddata.com>2012-05-22 11:04:54 +0200
commit8f2f736d5311968f7fa0baa93b1bbc8d7aeed4e1 (patch)
treee849fad4306856362430de855c95412e424025d4 /core
parent518506a7c504ea84d93e6354f8cc7e9187c48317 (diff)
downloadspark-8f2f736d5311968f7fa0baa93b1bbc8d7aeed4e1.tar.gz
spark-8f2f736d5311968f7fa0baa93b1bbc8d7aeed4e1.tar.bz2
spark-8f2f736d5311968f7fa0baa93b1bbc8d7aeed4e1.zip
Little refactoring
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Cache.scala2
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala39
-rw-r--r--core/src/main/scala/spark/Utils.scala5
3 files changed, 20 insertions, 26 deletions
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
index aeff205884..150fe14e2c 100644
--- a/core/src/main/scala/spark/Cache.scala
+++ b/core/src/main/scala/spark/Cache.scala
@@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger
sealed trait CachePutResponse
case class CachePutSuccess(size: Long) extends CachePutResponse
-case class CachePutFailure extends CachePutResponse
+case class CachePutFailure() extends CachePutResponse
/**
* An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 5b5831b2de..0719f14a39 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -56,7 +56,7 @@ class CacheTrackerActor extends DaemonActor with Logging {
case AddedToCache(rddId, partition, host, size) =>
if (size > 0) {
- slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size)
+ slaveUsage.put(host, getCacheUsage(host) + size)
logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
@@ -71,10 +71,10 @@ class CacheTrackerActor extends DaemonActor with Logging {
logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
- slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size)
+ slaveUsage.put(host, getCacheUsage(host) - size)
// Do a sanity check to make sure usage is greater than 0.
- val usage = slaveUsage.getOrElse(host, 0L)
+ val usage = getCacheUsage(host)
if (usage < 0) {
logError("Cache usage on %s is negative (%d)".format(host, usage))
}
@@ -89,15 +89,11 @@ class CacheTrackerActor extends DaemonActor with Logging {
case GetCacheLocations =>
logInfo("Asked for current cache locations")
- val locsCopy = new HashMap[Int, Array[List[String]]]
- for ((rddId, array) <- locs) {
- locsCopy(rddId) = array.clone()
- }
- reply(locsCopy)
+ reply(locs.map{case (rrdId, array) => (rrdId -> array.clone())})
case GetCacheStatus =>
- val status: Seq[Tuple3[String, Long, Long]] = slaveCapacity.keys.map { key =>
- (key, slaveCapacity.getOrElse(key, 0L), slaveUsage.getOrElse(key, 0L))
+ val status = slaveCapacity.map { case (host,capacity) =>
+ (host, capacity, getCacheUsage(host))
}.toSeq
reply(status)
@@ -130,9 +126,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
}
// Report the cache being started.
- trackerActor !? SlaveCacheStarted(
- System.getProperty("spark.hostname", Utils.localHostName),
- cache.getCapacity)
+ trackerActor !? SlaveCacheStarted(Utils.getHost, cache.getCapacity)
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[(Int, Int)]
@@ -151,20 +145,17 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Get a snapshot of the currently known locations
def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = {
(trackerActor !? GetCacheLocations) match {
- case h: HashMap[_, _] =>
- h.asInstanceOf[HashMap[Int, Array[List[String]]]]
+ case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]]
- case _ =>
- throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
+ case _ => throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
}
}
// Get the usage status of slave caches. Each tuple in the returned sequence
// is in the form of (host name, capacity, usage).
- def getCacheStatus(): Seq[Tuple3[String, Long, Long]] = {
+ def getCacheStatus(): Seq[(String, Long, Long)] = {
(trackerActor !? GetCacheStatus) match {
- case h: Seq[Tuple3[String, Long, Long]] =>
- h.asInstanceOf[Seq[Tuple3[String, Long, Long]]]
+ case h: Seq[(String, Long, Long)] => h.asInstanceOf[Seq[(String, Long, Long)]]
case _ =>
throw new SparkException(
@@ -202,7 +193,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
}
// If we got here, we have to load the split
// Tell the master that we're doing so
- val host = System.getProperty("spark.hostname", Utils.localHostName)
+
// TODO: fetch any remote copy of the split that may be available
logInfo("Computing partition " + split)
var array: Array[T] = null
@@ -223,7 +214,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
case CachePutSuccess(size) => {
// Tell the master that we added the entry. Don't return until it
// replies so it can properly schedule future tasks that use this RDD.
- trackerActor !? AddedToCache(rdd.id, split.index, host, size)
+ trackerActor !? AddedToCache(rdd.id, split.index, Utils.getHost, size)
}
case _ => null
}
@@ -234,9 +225,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Called by the Cache to report that an entry has been dropped from it
def dropEntry(datasetId: Any, partition: Int) {
datasetId match {
- case (cache.keySpaceId, rddId: Int) =>
- val host = System.getProperty("spark.hostname", Utils.localHostName)
- trackerActor !! DroppedFromCache(rddId, partition, host)
+ case (cache.keySpaceId, rddId: Int) => trackerActor !! DroppedFromCache(rddId, partition, Utils.getHost)
}
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index a1bb054bde..cfd6dc8b2a 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -160,6 +160,11 @@ object Utils {
def localHostName(): String = InetAddress.getLocalHost.getHostName
/**
+ * Get current host
+ */
+ def getHost = System.getProperty("spark.hostname", localHostName())
+
+ /**
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(file: File) {