aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-11-27 20:51:58 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-11-27 20:51:58 -0800
commitf410a111ada489f9fd8971e047c9927f434773be (patch)
treeb842c0643969c41e0c3f6558edd3274cfb1126f3 /core
parent7d71b9a56a4d644ccabb56dd282e84e2a49ef144 (diff)
parent935c468b71c2b0da131ad996f92e80ea0874a1a3 (diff)
downloadspark-f410a111ada489f9fd8971e047c9927f434773be.tar.gz
spark-f410a111ada489f9fd8971e047c9927f434773be.tar.bz2
spark-f410a111ada489f9fd8971e047c9927f434773be.zip
Merge branch 'master' of github.com:mesos/spark
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala31
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/spark/Utils.scala8
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala15
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala55
5 files changed, 92 insertions, 18 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 6f80f6ac90..50c4183c0e 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -2,6 +2,10 @@ package spark
import java.io._
import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
import akka.actor._
import akka.dispatch._
@@ -11,16 +15,13 @@ import akka.util.Duration
import akka.util.Timeout
import akka.util.duration._
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import scheduler.MapStatus
+import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
- extends MapOutputTrackerMessage
+ extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
@@ -88,14 +89,14 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
mapStatuses.put(shuffleId, new Array[MapStatus](numMaps))
}
-
+
def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
var array = mapStatuses.get(shuffleId)
array.synchronized {
array(mapId) = status
}
}
-
+
def registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
@@ -110,7 +111,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var array = mapStatuses.get(shuffleId)
if (array != null) {
array.synchronized {
- if (array(mapId).address == bmAddress) {
+ if (array(mapId) != null && array(mapId).address == bmAddress) {
array(mapId) = null
}
}
@@ -119,10 +120,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
}
}
-
+
// Remembers which map output locations are currently being fetched on a worker
val fetching = new HashSet[Int]
-
+
// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId)
@@ -149,7 +150,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
val host = System.getProperty("spark.hostname", Utils.localHostName)
val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
val fetchedStatuses = deserializeStatuses(fetchedBytes)
-
+
logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses)
fetching.synchronized {
@@ -258,8 +259,10 @@ private[spark] object MapOutputTracker {
* sizes up to 35 GB with at most 10% error.
*/
def compressSize(size: Long): Byte = {
- if (size <= 1L) {
+ if (size == 0) {
0
+ } else if (size <= 1L) {
+ 1
} else {
math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
}
@@ -270,7 +273,7 @@ private[spark] object MapOutputTracker {
*/
def decompressSize(compressedSize: Byte): Long = {
if (compressedSize == 0) {
- 1
+ 0
} else {
math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
}
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4c6ec6cc6e..9f2b0c42c7 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -68,7 +68,6 @@ object SparkEnv extends Logging {
isMaster: Boolean,
isLocal: Boolean
) : SparkEnv = {
-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 567c4b1475..c8799e6de3 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -199,7 +199,13 @@ private object Utils extends Logging {
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
*/
- def localIpAddress(): String = InetAddress.getLocalHost.getHostAddress
+ def localIpAddress(): String = {
+ val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
+ if (defaultIpOverride != null)
+ defaultIpOverride
+ else
+ InetAddress.getLocalHost.getHostAddress
+ }
private var customHostname: Option[String] = None
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index dfdb22024e..cb29a6b8b4 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -43,6 +43,21 @@ private[spark] class Executor extends Logging {
urlClassLoader = createClassLoader()
Thread.currentThread.setContextClassLoader(urlClassLoader)
+ // Make any thread terminations due to uncaught exceptions kill the entire
+ // executor process to avoid surprising stalls.
+ Thread.setDefaultUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler {
+ override def uncaughtException(thread: Thread, exception: Throwable) {
+ try {
+ logError("Uncaught exception in thread " + thread, exception)
+ System.exit(1)
+ } catch {
+ case t: Throwable => System.exit(2)
+ }
+ }
+ }
+ )
+
// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
SparkEnv.set(env)
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index 4e9717d871..5b4b198960 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -2,10 +2,14 @@ package spark
import org.scalatest.FunSuite
+import akka.actor._
+import spark.scheduler.MapStatus
+import spark.storage.BlockManagerId
+
class MapOutputTrackerSuite extends FunSuite {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
- assert(MapOutputTracker.compressSize(1L) === 0)
+ assert(MapOutputTracker.compressSize(1L) === 1)
assert(MapOutputTracker.compressSize(2L) === 8)
assert(MapOutputTracker.compressSize(10L) === 25)
assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
@@ -15,11 +19,58 @@ class MapOutputTrackerSuite extends FunSuite {
}
test("decompressSize") {
- assert(MapOutputTracker.decompressSize(0) === 1)
+ assert(MapOutputTracker.decompressSize(0) === 0)
for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
"size " + size + " decompressed to " + size2 + ", which is out of range")
}
}
+
+ test("master start and stop") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.stop()
+ }
+
+ test("master register and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize10000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000)))
+ val statuses = tracker.getServerStatuses(10, 0)
+ assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000),
+ (new BlockManagerId("hostB", 1000), size10000)))
+ tracker.stop()
+ }
+
+ test("master register and unregister and fetch") {
+ val actorSystem = ActorSystem("test")
+ val tracker = new MapOutputTracker(actorSystem, true)
+ tracker.registerShuffle(10, 2)
+ val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+ val compressedSize10000 = MapOutputTracker.compressSize(10000L)
+ val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+ val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
+ tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
+ Array(compressedSize1000, compressedSize1000, compressedSize1000)))
+ tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
+ Array(compressedSize10000, compressedSize1000, compressedSize1000)))
+
+ // As if we had two simulatenous fetch failures
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
+
+ // The remaining reduce task might try to grab the output dispite the shuffle failure;
+ // this should cause it to fail, and the scheduler will ignore the failure due to the
+ // stage already being aborted.
+ intercept[Exception] { tracker.getServerStatuses(10, 1) }
+ }
}