aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-06-29 16:01:36 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-06-29 16:02:21 -0700
commit3a58efa5a5da9a9a83bdaf0d4e5d4df6223e6a22 (patch)
tree5c78ba46ea5b2eaf470c13b074e36f80cefb30eb
parent14bfad11837ae3041170a839547a286db99f5208 (diff)
downloadspark-3a58efa5a5da9a9a83bdaf0d4e5d4df6223e6a22.tar.gz
spark-3a58efa5a5da9a9a83bdaf0d4e5d4df6223e6a22.tar.bz2
spark-3a58efa5a5da9a9a83bdaf0d4e5d4df6223e6a22.zip
Allow binding to a free port and change Akka logging to use SLF4J. Also
fixes various bugs in the previous code when running on Mesos.
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala2
-rw-r--r--core/src/main/scala/spark/Executor.scala2
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala2
-rw-r--r--core/src/main/scala/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala38
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala5
6 files changed, 38 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 65e3803144..19870408d3 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -116,7 +116,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
logInfo("Registered CacheTrackerActor actor")
actor
} else {
- val url = "akka://spark@%s:%s/%s".format(ip, port, actorName)
+ val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
actorSystem.actorFor(url)
}
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index af9eb9c878..3d70cf1737 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -43,7 +43,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
RemoteActor.classLoader = getClass.getClassLoader
// Initialize Spark environment (using system properties read above)
- env = SparkEnv.createFromSystemProperties(false, false)
+ env = SparkEnv.createFromSystemProperties(slaveInfo.getHostname(), 0, false, false)
SparkEnv.set(env)
// Old stuff that isn't yet using env
Broadcast.initialize(false)
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index d18ecb921d..0c97cd44a1 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -51,7 +51,7 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
logInfo("Registered MapOutputTrackerActor actor")
actor
} else {
- val url = "akka://spark@%s:%s/%s".format(ip, port, actorName)
+ val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
actorSystem.actorFor(url)
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0272040080..fc364b5307 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -60,13 +60,17 @@ class SparkContext(
System.setProperty("spark.master.host", Utils.localIpAddress)
}
if (System.getProperty("spark.master.port") == null) {
- System.setProperty("spark.master.port", "7077")
+ System.setProperty("spark.master.port", "0")
}
private val isLocal = master.startsWith("local") // TODO: better check for local
// Create the Spark execution environment (cache, map output tracker, etc)
- val env = SparkEnv.createFromSystemProperties(true, isLocal)
+ val env = SparkEnv.createFromSystemProperties(
+ System.getProperty("spark.master.host"),
+ System.getProperty("spark.master.port").toInt,
+ true,
+ isLocal)
SparkEnv.set(env)
Broadcast.initialize(true)
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 974cb5f401..5dcf25f997 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -1,6 +1,8 @@
package spark
import akka.actor.ActorSystem
+import akka.actor.ActorSystemImpl
+import akka.remote.RemoteActorRefProvider
import com.typesafe.config.ConfigFactory
@@ -36,21 +38,33 @@ object SparkEnv {
env.get()
}
- def createFromSystemProperties(isMaster: Boolean, isLocal: Boolean): SparkEnv = {
- val host = System.getProperty("spark.master.host")
- val port = System.getProperty("spark.master.port").toInt
- if (port == 0) {
- throw new IllegalArgumentException("Setting spark.master.port to 0 is not yet supported")
- }
+ def createFromSystemProperties(
+ hostname: String,
+ port: Int,
+ isMaster: Boolean,
+ isLocal: Boolean
+ ) : SparkEnv = {
+
val akkaConf = ConfigFactory.parseString("""
- akka.daemonic = on
- akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
- akka.remote.netty.hostname = "%s"
- akka.remote.netty.port = %d
- """.format(host, port))
+ akka.daemonic = on
+ akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
+ akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
+ akka.remote.netty.hostname = "%s"
+ akka.remote.netty.port = %d
+ """.format(hostname, port))
+
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
+ // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
+ // figure out which port number Akka actually bound to and set spark.master.port to it.
+ // Unfortunately Akka doesn't yet provide an API for this except if you cast objects as below.
+ if (isMaster && port == 0) {
+ val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
+ val port = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
+ System.setProperty("spark.master.port", port.toString)
+ }
+
val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer")
val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 5fe0e22dd0..97a5b0cb45 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -348,8 +348,9 @@ object BlockManagerMaster extends Logging {
Props(new BlockManagerMaster(isLocal)), name = AKKA_ACTOR_NAME)
logInfo("Registered BlockManagerMaster Actor")
} else {
- val url = "akka://spark@%s:%s/%s".format(
+ val url = "akka://spark@%s:%s/user/%s".format(
DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
+ logInfo("Connecting to BlockManagerMaster: " + url)
masterActor = actorSystem.actorFor(url)
}
}
@@ -425,7 +426,7 @@ object BlockManagerMaster extends Logging {
try {
communicate(msg)
- logInfo("Heartbeat sent successfully")
+ logDebug("Heartbeat sent successfully")
logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
return true
} catch {