aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala3
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala20
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala11
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala15
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala1
-rw-r--r--core/src/test/scala/spark/AccumulatorSuite.scala2
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala8
-rw-r--r--project/SparkBuild.scala8
-rw-r--r--repl/src/main/scala/spark/repl/SparkILoop.scala15
-rw-r--r--repl/src/main/scala/spark/repl/SparkILoopInit.scala2
-rw-r--r--repl/src/test/scala/spark/repl/ReplSuiteMixin.scala1
-rwxr-xr-xrun2
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala2
18 files changed, 52 insertions, 75 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index e070a15a54..d1428bcfc6 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -3,7 +3,7 @@ package spark.deploy.master
import akka.actor._
import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
-import akka.util.duration._
+import scala.concurrent.duration._
import java.text.SimpleDateFormat
import java.util.Date
@@ -50,6 +50,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi()
+ import context.dispatcher
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 59d59dde78..fe859d48c3 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -27,6 +27,7 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends
implicit val timeout = Timeout(10 seconds)
+
val handler = {
get {
(path("") & parameters('format ?)) {
@@ -52,27 +53,11 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends
masterState.completedApps.find(_.id == appId).getOrElse(null)
})
}
-<<<<<<< HEAD
respondWithMediaType(`application/json`) { ctx =>
- ctx.complete(jobInfo.mapTo[JobInfo])
- }
- case (jobId, _) =>
- complete {
- val future = (master ? RequestMasterState).mapTo[MasterState]
- future.map { masterState =>
- masterState.activeJobs.find(_.id == jobId) match {
- case Some(job) => spark.deploy.master.html.job_details.render(job)
- case _ => masterState.completedJobs.find(_.id == jobId) match {
- case Some(job) => spark.deploy.master.html.job_details.render(job)
- case _ => null
- }
- }
-=======
- respondWithMediaType(MediaTypes.`application/json`) { ctx =>
ctx.complete(appInfo.mapTo[ApplicationInfo])
}
case (appId, _) =>
- completeWith {
+ complete {
val future = master ? RequestMasterState
future.map { state =>
val masterState = state.asInstanceOf[MasterState]
@@ -80,7 +65,6 @@ class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends
masterState.completedApps.find(_.id == appId).getOrElse(null)
})
spark.deploy.master.html.app_details.render(app)
->>>>>>> 17e076de800ea0d4c55f2bd657348641f6f9c55b
}
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 700d87b1c1..5bcf00443c 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -2,7 +2,7 @@ package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap}
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-import akka.util.duration._
+import scala.concurrent.duration._
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
@@ -15,6 +15,7 @@ import spark.deploy.RegisterWorkerFailed
import spark.deploy.master.Master
import java.io.File
+
private[spark] class Worker(
ip: String,
port: Int,
@@ -81,7 +82,7 @@ private[spark] class Worker(
}
def startWebUi() {
- val webUi = new WorkerWebUI(context.system, self, workDir)
+ val webUi = new WorkerWebUI(self, workDir)
try {
AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
} catch {
@@ -95,6 +96,7 @@ private[spark] class Worker(
case RegisteredWorker(url) =>
masterWebUiUrl = url
logInfo("Successfully registered with master")
+ import context.dispatcher
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
master ! Heartbeat(workerId)
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index 99c3b506fa..33a2a9516e 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -18,8 +18,11 @@ import java.io.File
* Web UI server for the standalone worker.
*/
private[spark]
-class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives {
- val RESOURCE_DIR = "spark/deploy/worker/webui"
+class WorkerWebUI(worker: ActorRef, workDir: File)(implicit val context: ActorContext) extends Directives {
+ import context.dispatcher
+
+ val actorSystem = context.system
+ val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(10 seconds)
@@ -42,9 +45,9 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File)
}
} ~
path("log") {
- parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
+ parameters("appId", "executorId", "logType") { (appId, executorId, logType) =>
respondWithMediaType(`text/plain`) {
- getFromFileName(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType)
+ getFromFile(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType)
}
}
} ~
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 8f737c5c6a..d3f6cd78dc 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -873,7 +873,7 @@ object BlockManager extends Logging {
}
def getHeartBeatFrequencyFromSystemProperties: Long =
- System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
+ System.getProperty("spark.storage.blockManagerHeartBeatMs", "10000").toLong
def getDisableHeartBeatsForTesting: Boolean =
System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index 9e6721ec17..a3397a0fb4 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -2,9 +2,9 @@ package spark.storage
import akka.actor.{ActorRef, ActorSystem}
import akka.util.Timeout
-import akka.util.duration._
-import cc.spray.typeconversion.TwirlSupport._
-import cc.spray.Directives
+import scala.concurrent.duration._
+import spray.httpx.TwirlSupport._
+import spray.routing.Directives
import spark.{Logging, SparkContext}
import spark.util.AkkaUtils
import spark.Utils
@@ -17,7 +17,8 @@ private[spark]
class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
extends Directives with Logging {
- val STATIC_RESOURCE_DIR = "spark/deploy/static"
+ implicit val implicitActorSystem = actorSystem
+ val STATIC_RESOURCE_DIR = "spark/deploy/static"
implicit val timeout = Timeout(10 seconds)
@@ -31,7 +32,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
// random port it bound to, so we have to try to find a local one by creating a socket.
Utils.findFreePort()
}
- AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer")
+ AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler)
logInfo("Started BlockManager web UI at http://%s:%d".format(Utils.localHostName(), port))
} catch {
case e: Exception =>
@@ -43,7 +44,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
val handler = {
get {
path("") {
- completeWith {
+ complete {
// Request the current storage status from the Master
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
@@ -58,7 +59,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
} ~
path("rdd") {
parameter("id") { id =>
- completeWith {
+ complete {
val prefix = "rdd_" + id.toString
val storageStatusList = sc.getExecutorStorageStatus
val filteredStorageStatusList = StorageUtils.
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 6f551b2b9c..70338ec4dc 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -32,7 +32,6 @@ private[spark] object AkkaUtils {
val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean
- val lifecycleEvents = System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean
val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
index f59334a033..fb54ccb51e 100644
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/spark/AccumulatorSuite.scala
@@ -8,7 +8,7 @@ import scala.math.exp
import scala.math.signum
import spark.SparkContext._
-class AccumulatorSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
+class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
implicit def setAccum[A] = new AccumulableParam[mutable.Set[A], A] {
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 4104b33c8b..46b74fe5ee 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -257,7 +257,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
object DistributedSuite {
// Indicates whether this JVM is marked for failure.
var mark = false
-
+
// Set by test to remember if we are in the driver program so we can assert
// that we are not.
var amMaster = false
@@ -274,9 +274,9 @@ object DistributedSuite {
// Act like an identity function, but if mark was set to true previously, fail,
// crashing the entire JVM.
def failOnMarkedIdentity(item: Boolean): Boolean = {
- if (mark) {
+ if (mark) {
System.exit(42)
- }
+ }
item
- }
+ }
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 2571e54b04..7b61e2ba3e 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -178,10 +178,10 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
- "com.github.sgroschupf" % "zkclient" % "0.1",
- "org.twitter4j" % "twitter4j-stream" % "3.0.3",
- "com.typesafe.akka" % "akka-zeromq" % "2.0.3"
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
+ "com.github.sgroschupf" % "zkclient" % "0.1",
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3",
+ "com.typesafe.akka" % "akka-zeromq" % "2.1-M1" excludeAll(ExclusionRule(name = "akka-actor"), ExclusionRule(organization = "org.scala-lang"))
)
) ++ assemblySettings ++ extraAssemblySettings
diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala
index e83e403760..2b6e7b68bf 100644
--- a/repl/src/main/scala/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoop.scala
@@ -222,21 +222,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
}
}
- /** Print a welcome message */
- def printWelcome() {
- echo("""Welcome to
- ____ __
- / __/__ ___ _____/ /__
- _\ \/ _ \/ _ `/ __/ '_/
- /___/ .__/\_,_/_/ /_/\_\ version 0.8.0
- /_/
-""")
- import Properties._
- val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
- versionString, javaVmName, javaVersion)
- echo(welcomeMsg)
- }
-
/** Show the history */
lazy val historyCommand = new LoopCommand("history", "show the history (optional num is commands to show)") {
override def usage = "[num]"
diff --git a/repl/src/main/scala/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/spark/repl/SparkILoopInit.scala
index 6ae535c4e6..8b7da3d3c6 100644
--- a/repl/src/main/scala/spark/repl/SparkILoopInit.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoopInit.scala
@@ -24,7 +24,7 @@ trait SparkILoopInit {
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
- /___/ .__/\_,_/_/ /_/\_\ version 0.7.1-SNAPSHOT
+ /___/ .__/\_,_/_/ /_/\_\ version 0.8.0
/_/
""")
import Properties._
diff --git a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala
index fd1a1b1e7c..8f439f0681 100644
--- a/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala
+++ b/repl/src/test/scala/spark/repl/ReplSuiteMixin.scala
@@ -17,6 +17,7 @@ trait ReplSuiteMixin {
val localIp = "127.0.1.2"
val port = "7089"
val sparkUrl = s"spark://$localIp:$port"
+
def setupStandaloneCluster() {
future { Master.main(Array("-i", localIp, "-p", port, "--webui-port", "0")) }
Thread.sleep(2000)
diff --git a/run b/run
index 4755d562a7..96c7f8a095 100755
--- a/run
+++ b/run
@@ -164,4 +164,4 @@ else
EXTRA_ARGS="$JAVA_OPTS"
fi
-exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"
+exec "$RUNNER" -cp "$CLASSPATH" $EXTRA_ARGS "$@"
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index b159d26c02..e5bb654578 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -11,8 +11,8 @@ import scala.collection.mutable.Queue
import akka.actor._
import akka.pattern.ask
-import akka.util.duration._
-import akka.dispatch._
+import scala.concurrent.duration._
+// import akka.dispatch._
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 7385474963..5347374730 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -7,13 +7,15 @@ import spark.rdd.BlockRDD
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
import java.nio.ByteBuffer
import akka.actor.{Props, Actor}
import akka.pattern.ask
-import akka.dispatch.Await
-import akka.util.duration._
+import scala.concurrent.Await
+import akka.util.Timeout
+
import spark.streaming.util.{RecurringTimer, SystemClock}
import java.util.concurrent.ArrayBlockingQueue
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
index b3201d0b28..6c9e373de3 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -3,6 +3,8 @@ package spark.streaming.receivers
import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
import akka.actor.{ actorRef2Scala, ActorRef }
import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+import akka.actor.SupervisorStrategy._
+import scala.concurrent.duration._
import spark.storage.StorageLevel
import spark.streaming.dstream.NetworkReceiver
@@ -12,9 +14,6 @@ import java.util.concurrent.atomic.AtomicInteger
/** A helper with set of defaults for supervisor strategy **/
object ReceiverSupervisorStrategy {
- import akka.util.duration._
- import akka.actor.SupervisorStrategy._
-
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException ⇒ Restart
@@ -27,10 +26,10 @@ object ReceiverSupervisorStrategy {
* pushBlock API.
*
* @example {{{
- * class MyActor extends Actor with Receiver{
- * def receive {
- * case anything :String ⇒ pushBlock(anything)
- * }
+ * class MyActor extends Actor with Receiver{
+ * def receive {
+ * case anything :String ⇒ pushBlock(anything)
+ * }
* }
* //Can be plugged in actorStream as follows
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
@@ -74,12 +73,12 @@ private[streaming] case class Data[T: ClassManifest](data: T)
* his own Actor to run as receiver for Spark Streaming input source.
*
* This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
- *
+ * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ *
* Here's a way to start more supervisor/workers as its children.
*
* @example {{{
- * context.parent ! Props(new Supervisor)
+ * context.parent ! Props(new Supervisor)
* }}} OR {{{
* context.parent ! Props(new Worker,"Worker")
* }}}
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
index 5533c3cf1e..e7608f08ae 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -13,7 +13,7 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+ override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
Connect(publisherUrl), subscribe)
def receive: Receive = {