From a73b25826be808a1be1de8b61c4c7d2df2bcd5aa Mon Sep 17 00:00:00 2001 From: root Date: Sun, 7 Oct 2012 04:19:54 +0000 Subject: Made Akka thread pool and message batch sizes configurable --- core/src/main/scala/spark/util/AkkaUtils.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index f670ccb709..b466b5239c 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -23,6 +23,8 @@ private[spark] object AkkaUtils { * ActorSystem itself and its port (which is hard to get from Akka). */ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt + val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] @@ -31,9 +33,9 @@ private[spark] object AkkaUtils { akka.remote.netty.hostname = "%s" akka.remote.netty.port = %d akka.remote.netty.connection-timeout = 1s - akka.remote.netty.execution-pool-size = 8 - akka.actor.default-dispatcher.throughput = 30 - """.format(host, port)) + akka.remote.netty.execution-pool-size = %d + akka.actor.default-dispatcher.throughput = %d + """.format(host, port, akkaThreads, akkaBatchSize)) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) -- cgit v1.2.3