diff options
Diffstat (limited to 'core/src/main/scala/spark/util/AkkaUtils.scala')
-rw-r--r-- | core/src/main/scala/spark/util/AkkaUtils.scala | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index 30aec5a663..e93cc3b485 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -11,7 +11,7 @@ import cc.spray.{SprayCanRootService, HttpService} import cc.spray.can.server.HttpServer import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler import akka.dispatch.Await -import spark.SparkException +import spark.{Utils, SparkException} import java.util.concurrent.TimeoutException /** @@ -29,24 +29,30 @@ private[spark] object AkkaUtils { def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt - val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt + val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt + val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" + // 10 seconds is the default akka timeout, but in a cluster, we need higher by default. + val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt + val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] akka.stdout-loglevel = "ERROR" akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" - akka.remote.log-remote-lifecycle-events = on akka.remote.netty.hostname = "%s" akka.remote.netty.port = %d akka.remote.netty.connection-timeout = %ds akka.remote.netty.message-frame-size = %d MiB akka.remote.netty.execution-pool-size = %d akka.actor.default-dispatcher.throughput = %d - """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize)) + akka.remote.log-remote-lifecycle-events = %s + akka.remote.netty.write-timeout = %ds + """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize, + lifecycleEvents, akkaWriteTimeout)) - val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader) + val actorSystem = ActorSystem(name, akkaConf) // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a // hack because Akka doesn't let you figure out the port through the public API yet. @@ -58,8 +64,9 @@ private[spark] object AkkaUtils { /** * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to * handle requests. Returns the bound port or throws a SparkException on failure. + * TODO: Not changing ip to host here - is it required ? */ - def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, + def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, name: String = "HttpServer"): ActorRef = { val ioWorker = new IoWorker(actorSystem).start() val httpService = actorSystem.actorOf(Props(new HttpService(route))) |