diff options
author | folone <folone@gmail.com> | 2013-01-14 09:52:11 +0100 |
---|---|---|
committer | folone <folone@gmail.com> | 2013-01-14 09:52:11 +0100 |
commit | 25c0739bad7222d45b4818c7bf6987521a3509d2 (patch) | |
tree | ef3f8c92b805f99db3b178853be376968fc8b6f4 /core | |
parent | cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd (diff) | |
download | spark-25c0739bad7222d45b4818c7bf6987521a3509d2.tar.gz spark-25c0739bad7222d45b4818c7bf6987521a3509d2.tar.bz2 spark-25c0739bad7222d45b4818c7bf6987521a3509d2.zip |
Moved to scala 2.10.0. Notable changes are:
- akka 2.0.3 → 2.1.0
- spray 1.0-M1 → 1.1-M7
For now the repl subproject is commented out, as scala reflection api changed very much since the introduction of macros.
Diffstat (limited to 'core')
20 files changed, 115 insertions, 113 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 04c26b2e40..a73438208a 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -5,12 +5,12 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import akka.actor._ -import akka.dispatch._ +import scala.concurrent.Await import akka.pattern.ask import akka.remote._ -import akka.util.Duration +import scala.concurrent.duration.Duration import akka.util.Timeout -import akka.util.duration._ +import scala.concurrent.duration._ import spark.storage.BlockManager import spark.storage.StorageLevel diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 70eb9f702e..08d2956782 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -8,12 +8,12 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet import akka.actor._ -import akka.dispatch._ +import scala.concurrent.Await import akka.pattern.ask import akka.remote._ -import akka.util.Duration +import scala.concurrent.duration.Duration import akka.util.Timeout -import akka.util.duration._ +import scala.concurrent.duration._ import spark.scheduler.MapStatus import spark.storage.BlockManagerId diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java index dae8295f21..9f6ab9a592 100644 --- a/core/src/main/scala/spark/api/java/function/Function.java +++ b/core/src/main/scala/spark/api/java/function/Function.java @@ -1,7 +1,7 @@ package spark.api.java.function; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction1; import java.io.Serializable; @@ -15,8 +15,8 @@ import java.io.Serializable; public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable { public abstract R call(T t) throws Exception; - public ClassManifest<R> returnType() { - return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<R> returnType() { + return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java index 69bf12c8c9..b32c178aa3 100644 --- a/core/src/main/scala/spark/api/java/function/Function2.java +++ b/core/src/main/scala/spark/api/java/function/Function2.java @@ -1,7 +1,7 @@ package spark.api.java.function; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction2; import java.io.Serializable; @@ -14,8 +14,9 @@ public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R> public abstract R call(T1 t1, T2 t2) throws Exception; - public ClassManifest<R> returnType() { - return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<R> returnType() { + return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } + } diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java index b3cc4df6aa..296a3b5044 100644 --- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java @@ -1,8 +1,8 @@ package spark.api.java.function; import scala.Tuple2; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction1; import java.io.Serializable; @@ -19,11 +19,11 @@ public abstract class PairFlatMapFunction<T, K, V> public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception; - public ClassManifest<K> keyType() { - return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<K> keyType() { + return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class); } - public ClassManifest<V> valueType() { - return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<V> valueType() { + return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java index 9fc6df4b88..e3f94788e2 100644 --- a/core/src/main/scala/spark/api/java/function/PairFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFunction.java @@ -1,8 +1,8 @@ package spark.api.java.function; import scala.Tuple2; -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; import scala.runtime.AbstractFunction1; import java.io.Serializable; @@ -18,11 +18,11 @@ public abstract class PairFunction<T, K, V> public abstract Tuple2<K, V> call(T t) throws Exception; - public ClassManifest<K> keyType() { - return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<K> keyType() { + return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class); } - public ClassManifest<V> valueType() { - return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); + public ClassTag<V> valueType() { + return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class); } } diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala index 732fa08064..12069947fc 100644 --- a/core/src/main/scala/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala @@ -2,7 +2,7 @@ package spark.deploy import master.{JobInfo, WorkerInfo} import worker.ExecutorRunner -import cc.spray.json._ +import spray.json._ /** * spray-json helper class containing implicit conversion to json for marshalling responses diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 90fe9508cd..0aee759796 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -3,7 +3,7 @@ package spark.deploy.client import spark.deploy._ import akka.actor._ import akka.pattern.ask -import akka.util.duration._ +import scala.concurrent.duration._ import akka.pattern.AskTimeoutException import spark.{SparkException, Logging} import akka.remote.RemoteClientLifeCycleEvent @@ -11,7 +11,7 @@ import akka.remote.RemoteClientShutdown import spark.deploy.RegisterJob import akka.remote.RemoteClientDisconnected import akka.actor.Terminated -import akka.dispatch.Await +import scala.concurrent.Await /** * The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description, diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 6ecebe626a..e034312c12 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -49,7 +49,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } def startWebUi() { - val webUi = new MasterWebUI(context.system, self) + val webUi = new MasterWebUI(self) try { AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) } catch { diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index 458ee2d665..a4dadb6ef9 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -1,39 +1,42 @@ package spark.deploy.master -import akka.actor.{ActorRef, ActorSystem} -import akka.dispatch.Await +import akka.actor.{ActorRef, ActorContext, ActorRefFactory} +import scala.concurrent.Await import akka.pattern.ask import akka.util.Timeout -import akka.util.duration._ -import cc.spray.Directives -import cc.spray.directives._ -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.http.MediaTypes -import cc.spray.typeconversion.SprayJsonSupport._ +import scala.concurrent.duration._ +import spray.routing.Directives +import spray.routing.directives._ +import spray.httpx.TwirlSupport._ +import spray.httpx.SprayJsonSupport._ +import spray.http.MediaTypes._ import spark.deploy._ import spark.deploy.JsonProtocol._ private[spark] -class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { +class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends Directives { + import context.dispatcher + + val actorSystem = context.system val RESOURCE_DIR = "spark/deploy/master/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" - + implicit val timeout = Timeout(1 seconds) - + val handler = { get { (path("") & parameters('format ?)) { case Some(js) if js.equalsIgnoreCase("json") => - val future = master ? RequestMasterState - respondWithMediaType(MediaTypes.`application/json`) { ctx => + val future = (master ? RequestMasterState).mapTo[MasterState] + respondWithMediaType(`application/json`) { ctx => ctx.complete(future.mapTo[MasterState]) } case _ => - completeWith { - val future = master ? RequestMasterState + complete { + val future = (master ? RequestMasterState).mapTo[MasterState] future.map { - masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState]) + masterState => spark.deploy.master.html.index.render(masterState) } } } ~ @@ -50,15 +53,13 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct } } } - respondWithMediaType(MediaTypes.`application/json`) { ctx => + respondWithMediaType(`application/json`) { ctx => ctx.complete(jobInfo.mapTo[JobInfo]) } case (jobId, _) => - completeWith { - val future = master ? RequestMasterState - future.map { state => - val masterState = state.asInstanceOf[MasterState] - + complete { + val future = (master ? RequestMasterState).mapTo[MasterState] + future.map { masterState => masterState.activeJobs.find(_.id == jobId) match { case Some(job) => spark.deploy.master.html.job_details.render(job) case _ => masterState.completedJobs.find(_.id == jobId) match { diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 7c9e588ea2..ec25a19e7b 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -100,7 +100,7 @@ private[spark] class Worker( } def startWebUi() { - val webUi = new WorkerWebUI(context.system, self) + val webUi = new WorkerWebUI(self) try { AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) } catch { diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index f9489d99fc..7dd1781900 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -1,46 +1,49 @@ package spark.deploy.worker -import akka.actor.{ActorRef, ActorSystem} -import akka.dispatch.Await +import akka.actor.{ActorRef, ActorContext} +import scala.concurrent.Await import akka.pattern.ask import akka.util.Timeout -import akka.util.duration._ -import cc.spray.Directives -import cc.spray.typeconversion.TwirlSupport._ -import cc.spray.http.MediaTypes -import cc.spray.typeconversion.SprayJsonSupport._ +import scala.concurrent.duration._ +import spray.routing.Directives +import spray.httpx.TwirlSupport._ +import spray.httpx.SprayJsonSupport._ +import spray.http.MediaTypes._ import spark.deploy.{WorkerState, RequestWorkerState} import spark.deploy.JsonProtocol._ private[spark] -class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { - val RESOURCE_DIR = "spark/deploy/worker/webui" +class WorkerWebUI(worker: ActorRef)(implicit val context: ActorContext) extends Directives { + import context.dispatcher + + val actorSystem = context.system + val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" - + implicit val timeout = Timeout(1 seconds) - + val handler = { get { - (path("") & parameters('format ?)) { + (path("") & parameters('format ?)) { case Some(js) if js.equalsIgnoreCase("json") => { - val future = worker ? RequestWorkerState - respondWithMediaType(MediaTypes.`application/json`) { ctx => - ctx.complete(future.mapTo[WorkerState]) + val future = (worker ? RequestWorkerState).mapTo[WorkerState] + respondWithMediaType(`application/json`) { ctx => + ctx.complete(future) } } case _ => - completeWith{ - val future = worker ? RequestWorkerState + complete { + val future = (worker ? RequestWorkerState).mapTo[WorkerState] future.map { workerState => - spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState]) + spark.deploy.worker.html.index(workerState) } } } ~ path("log") { parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) => - respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) { - getFromFileName("work/" + jobId + "/" + executorId + "/" + logType) + respondWithMediaType(`text/plain`) { + getFromFile("work/" + jobId + "/" + executorId + "/" + logType) } } } ~ diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index 36c01ad629..04b303afe0 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -14,9 +14,9 @@ import scala.collection.mutable.SynchronizedQueue import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer -import akka.dispatch.{Await, Promise, ExecutionContext, Future} -import akka.util.Duration -import akka.util.duration._ +import scala.concurrent.{Await, Promise, ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.duration._ private[spark] case class ConnectionManagerId(host: String, port: Int) { def toSocketAddress() = new InetSocketAddress(host, port) diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala index 533e4610f3..1b5b0935c2 100644 --- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala +++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala @@ -8,8 +8,8 @@ import scala.io.Source import java.nio.ByteBuffer import java.net.InetAddress -import akka.dispatch.Await -import akka.util.duration._ +import scala.concurrent.Await +import scala.concurrent.duration._ private[spark] object ConnectionManagerTest extends Logging{ def main(args: Array[String]) { diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index eeaae23dc8..03dc5f4b9b 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -3,11 +3,11 @@ package spark.scheduler.cluster import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ -import akka.util.duration._ +import scala.concurrent.duration._ import akka.pattern.ask import spark.{SparkException, Logging, TaskState} -import akka.dispatch.Await +import scala.concurrent.Await import java.util.concurrent.atomic.AtomicInteger import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 7a8ac10cdd..7a1344668f 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -8,9 +8,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConversions._ import akka.actor.{ActorSystem, Cancellable, Props} -import akka.dispatch.{Await, Future} -import akka.util.Duration -import akka.util.duration._ +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index a3d8671834..46e1860d09 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -4,10 +4,9 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Random import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.dispatch.Await +import scala.concurrent.Await import akka.pattern.ask -import akka.util.{Duration, Timeout} -import akka.util.duration._ +import scala.concurrent.duration._ import spark.{Logging, SparkException, Utils} diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index f4d026da33..9bf9161c31 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -7,8 +7,7 @@ import scala.collection.JavaConversions._ import scala.util.Random import akka.actor.{Actor, ActorRef, Cancellable} -import akka.util.{Duration, Timeout} -import akka.util.duration._ +import scala.concurrent.duration._ import spark.{Logging, Utils} @@ -42,6 +41,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { override def preStart() { if (!BlockManager.getDisableHeartBeatsForTesting) { + import context.dispatcher timeoutCheckingTask = context.system.scheduler.schedule( 0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts) } diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index e67cb0336d..6fd9aa70fc 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -1,16 +1,16 @@ package spark.util -import akka.actor.{Props, ActorSystemImpl, ActorSystem} +import akka.actor.{Props, ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory -import akka.util.duration._ +import scala.concurrent.duration._ import akka.pattern.ask import akka.remote.RemoteActorRefProvider -import cc.spray.Route -import cc.spray.io.IoWorker -import cc.spray.{SprayCanRootService, HttpService} -import cc.spray.can.server.HttpServer -import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler -import akka.dispatch.Await +import spray.routing.Route +import spray.io.IOExtension +import spray.routing.HttpServiceActor +import spray.can.server.{HttpServer, ServerSettings} +import spray.io.SingletonHandler +import scala.concurrent.Await import spark.SparkException import java.util.concurrent.TimeoutException @@ -23,11 +23,11 @@ private[spark] object AkkaUtils { * ActorSystem itself and its port (which is hard to get from Akka). */ def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { - val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt + val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt - val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt + val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt - val akkaConf = ConfigFactory.parseString(""" + val akkaConf = ConfigFactory.parseString(""" akka.daemonic = on akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] akka.actor.provider = "akka.remote.RemoteActorRefProvider" @@ -44,7 +44,7 @@ private[spark] object AkkaUtils { // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a // hack because Akka doesn't let you figure out the port through the public API yet. - val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider + val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get return (actorSystem, boundPort) } @@ -54,14 +54,13 @@ private[spark] object AkkaUtils { * handle requests. Throws a SparkException if this fails. */ def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) { - val ioWorker = new IoWorker(actorSystem).start() - val httpService = actorSystem.actorOf(Props(new HttpService(route))) - val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService))) - val server = actorSystem.actorOf( - Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer") - actorSystem.registerOnTermination { ioWorker.stop() } + val ioWorker = IOExtension(actorSystem).ioBridge() + val httpService = actorSystem.actorOf(Props(HttpServiceActor(route))) + val server = actorSystem.actorOf( + Props(new HttpServer(ioWorker, SingletonHandler(httpService), ServerSettings())), name = "HttpServer") + actorSystem.registerOnTermination { actorSystem.stop(ioWorker) } val timeout = 3.seconds - val future = server.ask(HttpServer.Bind(ip, port))(timeout) + val future = server.ask(HttpServer.Bind(ip, port))(timeout) try { Await.result(future, timeout) match { case bound: HttpServer.Bound => diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala index 467605981b..d0c2bd47fc 100644 --- a/core/src/test/scala/spark/CacheTrackerSuite.scala +++ b/core/src/test/scala/spark/CacheTrackerSuite.scala @@ -5,20 +5,19 @@ import org.scalatest.FunSuite import scala.collection.mutable.HashMap import akka.actor._ -import akka.dispatch._ -import akka.pattern.ask +import scala.concurrent.{Await, Future} import akka.remote._ -import akka.util.Duration +import scala.concurrent.duration.Duration import akka.util.Timeout -import akka.util.duration._ +import scala.concurrent.duration._ class CacheTrackerSuite extends FunSuite { // Send a message to an actor and wait for a reply, in a blocking manner private def ask(actor: ActorRef, message: Any): Any = { try { val timeout = 10.seconds - val future = actor.ask(message)(timeout) - return Await.result(future, timeout) + val future: Future[Any] = akka.pattern.ask(actor, message)(timeout) + Await.result(future, timeout) } catch { case e: Exception => throw new SparkException("Error communicating with actor", e) |