aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala6
-rw-r--r--core/src/main/scala/spark/api/java/function/Function.java8
-rw-r--r--core/src/main/scala/spark/api/java/function/Function2.java9
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java12
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFunction.java12
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala45
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala43
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala6
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala5
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala4
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala37
19 files changed, 110 insertions, 107 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 04c26b2e40..a73438208a 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -5,12 +5,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import akka.actor._
-import akka.dispatch._
+import scala.concurrent.Await
import akka.pattern.ask
import akka.remote._
-import akka.util.Duration
+import scala.concurrent.duration.Duration
import akka.util.Timeout
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.storage.BlockManager
import spark.storage.StorageLevel
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 70eb9f702e..08d2956782 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -8,12 +8,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import akka.actor._
-import akka.dispatch._
+import scala.concurrent.Await
import akka.pattern.ask
import akka.remote._
-import akka.util.Duration
+import scala.concurrent.duration.Duration
import akka.util.Timeout
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java
index dae8295f21..9f6ab9a592 100644
--- a/core/src/main/scala/spark/api/java/function/Function.java
+++ b/core/src/main/scala/spark/api/java/function/Function.java
@@ -1,7 +1,7 @@
package spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;
@@ -15,8 +15,8 @@ import java.io.Serializable;
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
public abstract R call(T t) throws Exception;
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java
index 69bf12c8c9..b32c178aa3 100644
--- a/core/src/main/scala/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/spark/api/java/function/Function2.java
@@ -1,7 +1,7 @@
package spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;
import java.io.Serializable;
@@ -14,8 +14,9 @@ public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
public abstract R call(T1 t1, T2 t2) throws Exception;
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
+
}
diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
index b3cc4df6aa..296a3b5044 100644
--- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
@@ -1,8 +1,8 @@
package spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;
@@ -19,11 +19,11 @@ public abstract class PairFlatMapFunction<T, K, V>
public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java
index 9fc6df4b88..e3f94788e2 100644
--- a/core/src/main/scala/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFunction.java
@@ -1,8 +1,8 @@
package spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;
@@ -18,11 +18,11 @@ public abstract class PairFunction<T, K, V>
public abstract Tuple2<K, V> call(T t) throws Exception;
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index 732fa08064..12069947fc 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -2,7 +2,7 @@ package spark.deploy
import master.{JobInfo, WorkerInfo}
import worker.ExecutorRunner
-import cc.spray.json._
+import spray.json._
/**
* spray-json helper class containing implicit conversion to json for marshalling responses
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index 90fe9508cd..0aee759796 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -3,7 +3,7 @@ package spark.deploy.client
import spark.deploy._
import akka.actor._
import akka.pattern.ask
-import akka.util.duration._
+import scala.concurrent.duration._
import akka.pattern.AskTimeoutException
import spark.{SparkException, Logging}
import akka.remote.RemoteClientLifeCycleEvent
@@ -11,7 +11,7 @@ import akka.remote.RemoteClientShutdown
import spark.deploy.RegisterJob
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
-import akka.dispatch.Await
+import scala.concurrent.Await
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 6ecebe626a..e034312c12 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -49,7 +49,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def startWebUi() {
- val webUi = new MasterWebUI(context.system, self)
+ val webUi = new MasterWebUI(self)
try {
AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
} catch {
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 458ee2d665..a4dadb6ef9 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -1,39 +1,42 @@
package spark.deploy.master
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Await
+import akka.actor.{ActorRef, ActorContext, ActorRefFactory}
+import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
-import akka.util.duration._
-import cc.spray.Directives
-import cc.spray.directives._
-import cc.spray.typeconversion.TwirlSupport._
-import cc.spray.http.MediaTypes
-import cc.spray.typeconversion.SprayJsonSupport._
+import scala.concurrent.duration._
+import spray.routing.Directives
+import spray.routing.directives._
+import spray.httpx.TwirlSupport._
+import spray.httpx.SprayJsonSupport._
+import spray.http.MediaTypes._
import spark.deploy._
import spark.deploy.JsonProtocol._
private[spark]
-class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
+class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends Directives {
+ import context.dispatcher
+
+ val actorSystem = context.system
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
+
implicit val timeout = Timeout(1 seconds)
-
+
val handler = {
get {
(path("") & parameters('format ?)) {
case Some(js) if js.equalsIgnoreCase("json") =>
- val future = master ? RequestMasterState
- respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ val future = (master ? RequestMasterState).mapTo[MasterState]
+ respondWithMediaType(`application/json`) { ctx =>
ctx.complete(future.mapTo[MasterState])
}
case _ =>
- completeWith {
- val future = master ? RequestMasterState
+ complete {
+ val future = (master ? RequestMasterState).mapTo[MasterState]
future.map {
- masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ masterState => spark.deploy.master.html.index.render(masterState)
}
}
} ~
@@ -50,15 +53,13 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
}
}
}
- respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ respondWithMediaType(`application/json`) { ctx =>
ctx.complete(jobInfo.mapTo[JobInfo])
}
case (jobId, _) =>
- completeWith {
- val future = master ? RequestMasterState
- future.map { state =>
- val masterState = state.asInstanceOf[MasterState]
-
+ complete {
+ val future = (master ? RequestMasterState).mapTo[MasterState]
+ future.map { masterState =>
masterState.activeJobs.find(_.id == jobId) match {
case Some(job) => spark.deploy.master.html.job_details.render(job)
case _ => masterState.completedJobs.find(_.id == jobId) match {
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 7c9e588ea2..ec25a19e7b 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -100,7 +100,7 @@ private[spark] class Worker(
}
def startWebUi() {
- val webUi = new WorkerWebUI(context.system, self)
+ val webUi = new WorkerWebUI(self)
try {
AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
} catch {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f9489d99fc..7dd1781900 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -1,46 +1,49 @@
package spark.deploy.worker
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Await
+import akka.actor.{ActorRef, ActorContext}
+import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
-import akka.util.duration._
-import cc.spray.Directives
-import cc.spray.typeconversion.TwirlSupport._
-import cc.spray.http.MediaTypes
-import cc.spray.typeconversion.SprayJsonSupport._
+import scala.concurrent.duration._
+import spray.routing.Directives
+import spray.httpx.TwirlSupport._
+import spray.httpx.SprayJsonSupport._
+import spray.http.MediaTypes._
import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
private[spark]
-class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
- val RESOURCE_DIR = "spark/deploy/worker/webui"
+class WorkerWebUI(worker: ActorRef)(implicit val context: ActorContext) extends Directives {
+ import context.dispatcher
+
+ val actorSystem = context.system
+ val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
+
implicit val timeout = Timeout(1 seconds)
-
+
val handler = {
get {
- (path("") & parameters('format ?)) {
+ (path("") & parameters('format ?)) {
case Some(js) if js.equalsIgnoreCase("json") => {
- val future = worker ? RequestWorkerState
- respondWithMediaType(MediaTypes.`application/json`) { ctx =>
- ctx.complete(future.mapTo[WorkerState])
+ val future = (worker ? RequestWorkerState).mapTo[WorkerState]
+ respondWithMediaType(`application/json`) { ctx =>
+ ctx.complete(future)
}
}
case _ =>
- completeWith{
- val future = worker ? RequestWorkerState
+ complete {
+ val future = (worker ? RequestWorkerState).mapTo[WorkerState]
future.map { workerState =>
- spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ spark.deploy.worker.html.index(workerState)
}
}
} ~
path("log") {
parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
- respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) {
- getFromFileName("work/" + jobId + "/" + executorId + "/" + logType)
+ respondWithMediaType(`text/plain`) {
+ getFromFile("work/" + jobId + "/" + executorId + "/" + logType)
}
}
} ~
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 36c01ad629..04b303afe0 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -14,9 +14,9 @@ import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
-import akka.dispatch.{Await, Promise, ExecutionContext, Future}
-import akka.util.Duration
-import akka.util.duration._
+import scala.concurrent.{Await, Promise, ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
private[spark] case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port)
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 533e4610f3..1b5b0935c2 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -8,8 +8,8 @@ import scala.io.Source
import java.nio.ByteBuffer
import java.net.InetAddress
-import akka.dispatch.Await
-import akka.util.duration._
+import scala.concurrent.Await
+import scala.concurrent.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index eeaae23dc8..03dc5f4b9b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -3,11 +3,11 @@ package spark.scheduler.cluster
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
-import akka.util.duration._
+import scala.concurrent.duration._
import akka.pattern.ask
import spark.{SparkException, Logging, TaskState}
-import akka.dispatch.Await
+import scala.concurrent.Await
import java.util.concurrent.atomic.AtomicInteger
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 7a8ac10cdd..7a1344668f 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -8,9 +8,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
import akka.actor.{ActorSystem, Cancellable, Props}
-import akka.dispatch.{Await, Future}
-import akka.util.Duration
-import akka.util.duration._
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index a3d8671834..46e1860d09 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -4,10 +4,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.dispatch.Await
+import scala.concurrent.Await
import akka.pattern.ask
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.{Logging, SparkException, Utils}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index f4d026da33..9bf9161c31 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -7,8 +7,7 @@ import scala.collection.JavaConversions._
import scala.util.Random
import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.{Logging, Utils}
@@ -42,6 +41,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting) {
+ import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index e67cb0336d..6fd9aa70fc 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,16 +1,16 @@
package spark.util
-import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{Props, ActorSystem, ExtendedActorSystem}
import com.typesafe.config.ConfigFactory
-import akka.util.duration._
+import scala.concurrent.duration._
import akka.pattern.ask
import akka.remote.RemoteActorRefProvider
-import cc.spray.Route
-import cc.spray.io.IoWorker
-import cc.spray.{SprayCanRootService, HttpService}
-import cc.spray.can.server.HttpServer
-import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler
-import akka.dispatch.Await
+import spray.routing.Route
+import spray.io.IOExtension
+import spray.routing.HttpServiceActor
+import spray.can.server.{HttpServer, ServerSettings}
+import spray.io.SingletonHandler
+import scala.concurrent.Await
import spark.SparkException
import java.util.concurrent.TimeoutException
@@ -23,11 +23,11 @@ private[spark] object AkkaUtils {
* ActorSystem itself and its port (which is hard to get from Akka).
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
- val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
+ val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
+ val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
- val akkaConf = ConfigFactory.parseString("""
+ val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
@@ -44,7 +44,7 @@ private[spark] object AkkaUtils {
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.
- val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
+ val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
return (actorSystem, boundPort)
}
@@ -54,14 +54,13 @@ private[spark] object AkkaUtils {
* handle requests. Throws a SparkException if this fails.
*/
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) {
- val ioWorker = new IoWorker(actorSystem).start()
- val httpService = actorSystem.actorOf(Props(new HttpService(route)))
- val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
- val server = actorSystem.actorOf(
- Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
- actorSystem.registerOnTermination { ioWorker.stop() }
+ val ioWorker = IOExtension(actorSystem).ioBridge()
+ val httpService = actorSystem.actorOf(Props(HttpServiceActor(route)))
+ val server = actorSystem.actorOf(
+ Props(new HttpServer(ioWorker, SingletonHandler(httpService), ServerSettings())), name = "HttpServer")
+ actorSystem.registerOnTermination { actorSystem.stop(ioWorker) }
val timeout = 3.seconds
- val future = server.ask(HttpServer.Bind(ip, port))(timeout)
+ val future = server.ask(HttpServer.Bind(ip, port))(timeout)
try {
Await.result(future, timeout) match {
case bound: HttpServer.Bound =>