aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala6
-rw-r--r--core/src/main/scala/spark/api/java/function/Function.java8
-rw-r--r--core/src/main/scala/spark/api/java/function/Function2.java9
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java12
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFunction.java12
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala4
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala45
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala43
-rw-r--r--core/src/main/scala/spark/network/ConnectionManager.scala6
-rw-r--r--core/src/main/scala/spark/network/ConnectionManagerTest.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala5
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala4
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala37
-rw-r--r--core/src/test/scala/spark/CacheTrackerSuite.scala11
-rw-r--r--project/SparkBuild.scala70
-rw-r--r--project/build.properties2
-rw-r--r--project/plugins.sbt6
23 files changed, 155 insertions, 151 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 04c26b2e40..a73438208a 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -5,12 +5,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import akka.actor._
-import akka.dispatch._
+import scala.concurrent.Await
import akka.pattern.ask
import akka.remote._
-import akka.util.Duration
+import scala.concurrent.duration.Duration
import akka.util.Timeout
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.storage.BlockManager
import spark.storage.StorageLevel
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 70eb9f702e..08d2956782 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -8,12 +8,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import akka.actor._
-import akka.dispatch._
+import scala.concurrent.Await
import akka.pattern.ask
import akka.remote._
-import akka.util.Duration
+import scala.concurrent.duration.Duration
import akka.util.Timeout
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java
index dae8295f21..9f6ab9a592 100644
--- a/core/src/main/scala/spark/api/java/function/Function.java
+++ b/core/src/main/scala/spark/api/java/function/Function.java
@@ -1,7 +1,7 @@
package spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;
@@ -15,8 +15,8 @@ import java.io.Serializable;
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
public abstract R call(T t) throws Exception;
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java
index 69bf12c8c9..b32c178aa3 100644
--- a/core/src/main/scala/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/spark/api/java/function/Function2.java
@@ -1,7 +1,7 @@
package spark.api.java.function;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;
import java.io.Serializable;
@@ -14,8 +14,9 @@ public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
public abstract R call(T1 t1, T2 t2) throws Exception;
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<R> returnType() {
+ return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
+
}
diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
index b3cc4df6aa..296a3b5044 100644
--- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
@@ -1,8 +1,8 @@
package spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;
@@ -19,11 +19,11 @@ public abstract class PairFlatMapFunction<T, K, V>
public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java
index 9fc6df4b88..e3f94788e2 100644
--- a/core/src/main/scala/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFunction.java
@@ -1,8 +1,8 @@
package spark.api.java.function;
import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import java.io.Serializable;
@@ -18,11 +18,11 @@ public abstract class PairFunction<T, K, V>
public abstract Tuple2<K, V> call(T t) throws Exception;
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<K> keyType() {
+ return (ClassTag<K>) ClassTag$.MODULE$.apply(Object.class);
}
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+ public ClassTag<V> valueType() {
+ return (ClassTag<V>) ClassTag$.MODULE$.apply(Object.class);
}
}
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index 732fa08064..12069947fc 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -2,7 +2,7 @@ package spark.deploy
import master.{JobInfo, WorkerInfo}
import worker.ExecutorRunner
-import cc.spray.json._
+import spray.json._
/**
* spray-json helper class containing implicit conversion to json for marshalling responses
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index 90fe9508cd..0aee759796 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -3,7 +3,7 @@ package spark.deploy.client
import spark.deploy._
import akka.actor._
import akka.pattern.ask
-import akka.util.duration._
+import scala.concurrent.duration._
import akka.pattern.AskTimeoutException
import spark.{SparkException, Logging}
import akka.remote.RemoteClientLifeCycleEvent
@@ -11,7 +11,7 @@ import akka.remote.RemoteClientShutdown
import spark.deploy.RegisterJob
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
-import akka.dispatch.Await
+import scala.concurrent.Await
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 6ecebe626a..e034312c12 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -49,7 +49,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def startWebUi() {
- val webUi = new MasterWebUI(context.system, self)
+ val webUi = new MasterWebUI(self)
try {
AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
} catch {
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 458ee2d665..a4dadb6ef9 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -1,39 +1,42 @@
package spark.deploy.master
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Await
+import akka.actor.{ActorRef, ActorContext, ActorRefFactory}
+import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
-import akka.util.duration._
-import cc.spray.Directives
-import cc.spray.directives._
-import cc.spray.typeconversion.TwirlSupport._
-import cc.spray.http.MediaTypes
-import cc.spray.typeconversion.SprayJsonSupport._
+import scala.concurrent.duration._
+import spray.routing.Directives
+import spray.routing.directives._
+import spray.httpx.TwirlSupport._
+import spray.httpx.SprayJsonSupport._
+import spray.http.MediaTypes._
import spark.deploy._
import spark.deploy.JsonProtocol._
private[spark]
-class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
+class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends Directives {
+ import context.dispatcher
+
+ val actorSystem = context.system
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
+
implicit val timeout = Timeout(1 seconds)
-
+
val handler = {
get {
(path("") & parameters('format ?)) {
case Some(js) if js.equalsIgnoreCase("json") =>
- val future = master ? RequestMasterState
- respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ val future = (master ? RequestMasterState).mapTo[MasterState]
+ respondWithMediaType(`application/json`) { ctx =>
ctx.complete(future.mapTo[MasterState])
}
case _ =>
- completeWith {
- val future = master ? RequestMasterState
+ complete {
+ val future = (master ? RequestMasterState).mapTo[MasterState]
future.map {
- masterState => spark.deploy.master.html.index.render(masterState.asInstanceOf[MasterState])
+ masterState => spark.deploy.master.html.index.render(masterState)
}
}
} ~
@@ -50,15 +53,13 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
}
}
}
- respondWithMediaType(MediaTypes.`application/json`) { ctx =>
+ respondWithMediaType(`application/json`) { ctx =>
ctx.complete(jobInfo.mapTo[JobInfo])
}
case (jobId, _) =>
- completeWith {
- val future = master ? RequestMasterState
- future.map { state =>
- val masterState = state.asInstanceOf[MasterState]
-
+ complete {
+ val future = (master ? RequestMasterState).mapTo[MasterState]
+ future.map { masterState =>
masterState.activeJobs.find(_.id == jobId) match {
case Some(job) => spark.deploy.master.html.job_details.render(job)
case _ => masterState.completedJobs.find(_.id == jobId) match {
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 7c9e588ea2..ec25a19e7b 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -100,7 +100,7 @@ private[spark] class Worker(
}
def startWebUi() {
- val webUi = new WorkerWebUI(context.system, self)
+ val webUi = new WorkerWebUI(self)
try {
AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
} catch {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index f9489d99fc..7dd1781900 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -1,46 +1,49 @@
package spark.deploy.worker
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Await
+import akka.actor.{ActorRef, ActorContext}
+import scala.concurrent.Await
import akka.pattern.ask
import akka.util.Timeout
-import akka.util.duration._
-import cc.spray.Directives
-import cc.spray.typeconversion.TwirlSupport._
-import cc.spray.http.MediaTypes
-import cc.spray.typeconversion.SprayJsonSupport._
+import scala.concurrent.duration._
+import spray.routing.Directives
+import spray.httpx.TwirlSupport._
+import spray.httpx.SprayJsonSupport._
+import spray.http.MediaTypes._
import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._
private[spark]
-class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
- val RESOURCE_DIR = "spark/deploy/worker/webui"
+class WorkerWebUI(worker: ActorRef)(implicit val context: ActorContext) extends Directives {
+ import context.dispatcher
+
+ val actorSystem = context.system
+ val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
-
+
implicit val timeout = Timeout(1 seconds)
-
+
val handler = {
get {
- (path("") & parameters('format ?)) {
+ (path("") & parameters('format ?)) {
case Some(js) if js.equalsIgnoreCase("json") => {
- val future = worker ? RequestWorkerState
- respondWithMediaType(MediaTypes.`application/json`) { ctx =>
- ctx.complete(future.mapTo[WorkerState])
+ val future = (worker ? RequestWorkerState).mapTo[WorkerState]
+ respondWithMediaType(`application/json`) { ctx =>
+ ctx.complete(future)
}
}
case _ =>
- completeWith{
- val future = worker ? RequestWorkerState
+ complete {
+ val future = (worker ? RequestWorkerState).mapTo[WorkerState]
future.map { workerState =>
- spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
+ spark.deploy.worker.html.index(workerState)
}
}
} ~
path("log") {
parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
- respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) {
- getFromFileName("work/" + jobId + "/" + executorId + "/" + logType)
+ respondWithMediaType(`text/plain`) {
+ getFromFile("work/" + jobId + "/" + executorId + "/" + logType)
}
}
} ~
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
index 36c01ad629..04b303afe0 100644
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/spark/network/ConnectionManager.scala
@@ -14,9 +14,9 @@ import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
-import akka.dispatch.{Await, Promise, ExecutionContext, Future}
-import akka.util.Duration
-import akka.util.duration._
+import scala.concurrent.{Await, Promise, ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
private[spark] case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port)
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
index 533e4610f3..1b5b0935c2 100644
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
@@ -8,8 +8,8 @@ import scala.io.Source
import java.nio.ByteBuffer
import java.net.InetAddress
-import akka.dispatch.Await
-import akka.util.duration._
+import scala.concurrent.Await
+import scala.concurrent.duration._
private[spark] object ConnectionManagerTest extends Logging{
def main(args: Array[String]) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index eeaae23dc8..03dc5f4b9b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -3,11 +3,11 @@ package spark.scheduler.cluster
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
-import akka.util.duration._
+import scala.concurrent.duration._
import akka.pattern.ask
import spark.{SparkException, Logging, TaskState}
-import akka.dispatch.Await
+import scala.concurrent.Await
import java.util.concurrent.atomic.AtomicInteger
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 7a8ac10cdd..7a1344668f 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -8,9 +8,9 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
import akka.actor.{ActorSystem, Cancellable, Props}
-import akka.dispatch.{Await, Future}
-import akka.util.Duration
-import akka.util.duration._
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index a3d8671834..46e1860d09 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -4,10 +4,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.dispatch.Await
+import scala.concurrent.Await
import akka.pattern.ask
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.{Logging, SparkException, Utils}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index f4d026da33..9bf9161c31 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -7,8 +7,7 @@ import scala.collection.JavaConversions._
import scala.util.Random
import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.{Logging, Utils}
@@ -42,6 +41,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting) {
+ import context.dispatcher
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index e67cb0336d..6fd9aa70fc 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -1,16 +1,16 @@
package spark.util
-import akka.actor.{Props, ActorSystemImpl, ActorSystem}
+import akka.actor.{Props, ActorSystem, ExtendedActorSystem}
import com.typesafe.config.ConfigFactory
-import akka.util.duration._
+import scala.concurrent.duration._
import akka.pattern.ask
import akka.remote.RemoteActorRefProvider
-import cc.spray.Route
-import cc.spray.io.IoWorker
-import cc.spray.{SprayCanRootService, HttpService}
-import cc.spray.can.server.HttpServer
-import cc.spray.io.pipelines.MessageHandlerDispatch.SingletonHandler
-import akka.dispatch.Await
+import spray.routing.Route
+import spray.io.IOExtension
+import spray.routing.HttpServiceActor
+import spray.can.server.{HttpServer, ServerSettings}
+import spray.io.SingletonHandler
+import scala.concurrent.Await
import spark.SparkException
import java.util.concurrent.TimeoutException
@@ -23,11 +23,11 @@ private[spark] object AkkaUtils {
* ActorSystem itself and its port (which is hard to get from Akka).
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
- val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
+ val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
- val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
+ val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
- val akkaConf = ConfigFactory.parseString("""
+ val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
@@ -44,7 +44,7 @@ private[spark] object AkkaUtils {
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.
- val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
+ val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
return (actorSystem, boundPort)
}
@@ -54,14 +54,13 @@ private[spark] object AkkaUtils {
* handle requests. Throws a SparkException if this fails.
*/
def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route) {
- val ioWorker = new IoWorker(actorSystem).start()
- val httpService = actorSystem.actorOf(Props(new HttpService(route)))
- val rootService = actorSystem.actorOf(Props(new SprayCanRootService(httpService)))
- val server = actorSystem.actorOf(
- Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
- actorSystem.registerOnTermination { ioWorker.stop() }
+ val ioWorker = IOExtension(actorSystem).ioBridge()
+ val httpService = actorSystem.actorOf(Props(HttpServiceActor(route)))
+ val server = actorSystem.actorOf(
+ Props(new HttpServer(ioWorker, SingletonHandler(httpService), ServerSettings())), name = "HttpServer")
+ actorSystem.registerOnTermination { actorSystem.stop(ioWorker) }
val timeout = 3.seconds
- val future = server.ask(HttpServer.Bind(ip, port))(timeout)
+ val future = server.ask(HttpServer.Bind(ip, port))(timeout)
try {
Await.result(future, timeout) match {
case bound: HttpServer.Bound =>
diff --git a/core/src/test/scala/spark/CacheTrackerSuite.scala b/core/src/test/scala/spark/CacheTrackerSuite.scala
index 467605981b..d0c2bd47fc 100644
--- a/core/src/test/scala/spark/CacheTrackerSuite.scala
+++ b/core/src/test/scala/spark/CacheTrackerSuite.scala
@@ -5,20 +5,19 @@ import org.scalatest.FunSuite
import scala.collection.mutable.HashMap
import akka.actor._
-import akka.dispatch._
-import akka.pattern.ask
+import scala.concurrent.{Await, Future}
import akka.remote._
-import akka.util.Duration
+import scala.concurrent.duration.Duration
import akka.util.Timeout
-import akka.util.duration._
+import scala.concurrent.duration._
class CacheTrackerSuite extends FunSuite {
// Send a message to an actor and wait for a reply, in a blocking manner
private def ask(actor: ActorRef, message: Any): Any = {
try {
val timeout = 10.seconds
- val future = actor.ask(message)(timeout)
- return Await.result(future, timeout)
+ val future: Future[Any] = akka.pattern.ask(actor, message)(timeout)
+ Await.result(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error communicating with actor", e)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 219674028e..d0b3c350f1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -17,11 +17,11 @@ object SparkBuild extends Build {
//val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
//val HADOOP_MAJOR_VERSION = "2"
- lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel)
+ lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, /*repl,*/ examples, bagel)
lazy val core = Project("core", file("core"), settings = coreSettings)
- lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core)
+// lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core)
@@ -32,10 +32,10 @@ object SparkBuild extends Build {
lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy")
def sharedSettings = Defaults.defaultSettings ++ Seq(
- organization := "org.spark-project",
- version := "0.7.0-SNAPSHOT",
- scalaVersion := "2.9.2",
- scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
+ organization := "org.spark-project",
+ version := "0.7.0-SNAPSHOT",
+ scalaVersion := "2.10.0",
+ scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
@@ -87,11 +87,11 @@ object SparkBuild extends Build {
*/
libraryDependencies ++= Seq(
- "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
- "org.scalatest" %% "scalatest" % "1.8" % "test",
- "org.scalacheck" %% "scalacheck" % "1.9" % "test",
- "com.novocode" % "junit-interface" % "0.8" % "test"
- ),
+ "org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
+ "org.scalatest" %% "scalatest" % "1.9.1" % "test",
+ "org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
+ "com.novocode" % "junit-interface" % "0.8" % "test"
+ ),
parallelExecution := false,
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,
@@ -112,31 +112,33 @@ object SparkBuild extends Build {
name := "spark-core",
resolvers ++= Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
- "Spray Repository" at "http://repo.spray.cc/",
+ "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
+ "Spray Repository" at "http://repo.spray.cc/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
- "com.google.guava" % "guava" % "11.0.1",
- "log4j" % "log4j" % "1.2.16",
- "org.slf4j" % "slf4j-api" % slf4jVersion,
- "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
- "com.ning" % "compress-lzf" % "0.8.4",
- "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
- "asm" % "asm-all" % "3.3.1",
- "com.google.protobuf" % "protobuf-java" % "2.4.1",
- "de.javakaffee" % "kryo-serializers" % "0.20",
- "com.typesafe.akka" % "akka-actor" % "2.0.3",
- "com.typesafe.akka" % "akka-remote" % "2.0.3",
- "com.typesafe.akka" % "akka-slf4j" % "2.0.3",
- "it.unimi.dsi" % "fastutil" % "6.4.4",
- "colt" % "colt" % "1.2.0",
- "cc.spray" % "spray-can" % "1.0-M2.1",
- "cc.spray" % "spray-server" % "1.0-M2.1",
- "cc.spray" %% "spray-json" % "1.1.1",
- "org.apache.mesos" % "mesos" % "0.9.0-incubating"
- ) ++ (if (HADOOP_MAJOR_VERSION == "2") Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
+ "com.google.guava" % "guava" % "11.0.1",
+ "log4j" % "log4j" % "1.2.16",
+ "org.slf4j" % "slf4j-api" % slf4jVersion,
+ "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
+ "com.ning" % "compress-lzf" % "0.8.4",
+ "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
+ "asm" % "asm-all" % "3.3.1",
+ "com.google.protobuf" % "protobuf-java" % "2.4.1",
+ "de.javakaffee" % "kryo-serializers" % "0.20",
+ "com.typesafe.akka" %% "akka-remote" % "2.1.0",
+ "com.typesafe.akka" %% "akka-slf4j" % "2.1.0",
+ "it.unimi.dsi" % "fastutil" % "6.4.4",
+ "io.spray" % "spray-can" % "1.1-M7",
+ "io.spray" % "spray-io" % "1.1-M7",
+ "io.spray" % "spray-routing" % "1.1-M7",
+ "io.spray" %% "spray-json" % "1.2.3",
+ "colt" % "colt" % "1.2.0",
+ "org.apache.mesos" % "mesos" % "0.9.0-incubating",
+ "org.scala-lang" % "scala-actors" % "2.10.0"
+ ) ++ (if (HADOOP_MAJOR_VERSION == "2")
+ Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
@@ -144,10 +146,10 @@ object SparkBuild extends Build {
publish := {}
)
- def replSettings = sharedSettings ++ Seq(
+/* def replSettings = sharedSettings ++ Seq(
name := "spark-repl",
libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _)
- )
+ )*/
def examplesSettings = sharedSettings ++ Seq(
name := "spark-examples"
diff --git a/project/build.properties b/project/build.properties
index d4287112c6..4474a03e1a 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=0.11.3
+sbt.version=0.12.1
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 4d0e696a11..0e4eb7085c 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -6,11 +6,11 @@ resolvers += "Spray Repository" at "http://repo.spray.cc/"
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3")
-addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1")
+addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.1")
-addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")
+addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
-addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2")
+addSbtPlugin("io.spray" %% "sbt-twirl" % "0.6.1")
// For Sonatype publishing
//resolvers += Resolver.url("sbt-plugin-releases", new URL("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases/"))(Resolver.ivyStylePatterns)