aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala8
-rw-r--r--project/SparkBuild.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala7
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
23 files changed, 109 insertions, 80 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 1e63b54b7a..a267407c67 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -169,7 +169,7 @@ object SparkEnv extends Logging {
val driverHost: String = System.getProperty("spark.driver.host", "localhost")
val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
+ val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
actorSystem.actorFor(url)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 14a90934f6..164386782c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -26,9 +26,7 @@ import akka.actor._
import akka.actor.Terminated
import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.remote.RemoteClientDisconnected
-import akka.remote.RemoteClientLifeCycleEvent
-import akka.remote.RemoteClientShutdown
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
import org.apache.spark.Logging
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -61,7 +59,7 @@ private[spark] class Client(
master = context.actorFor(Master.toAkkaUrl(masterUrl))
masterAddress = master.path.address
master ! RegisterApplication(appDescription)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
@@ -99,12 +97,12 @@ private[spark] class Client(
markDisconnected()
context.stop(self)
- case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+ case DisassociatedEvent(_, address, _) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
- case RemoteClientShutdown(transport, address) if address == masterAddress =>
+ case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2efd16bca0..cb0fe6a850 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -25,9 +25,8 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
-import akka.actor.Terminated
import akka.pattern.ask
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.remote._
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -36,6 +35,22 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
import akka.util.Timeout
+import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
+import org.apache.spark.deploy.DeployMessages.KillExecutor
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import scala.Some
+import org.apache.spark.deploy.DeployMessages.WebUIPortResponse
+import org.apache.spark.deploy.DeployMessages.LaunchExecutor
+import org.apache.spark.deploy.DeployMessages.RegisteredApplication
+import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.deploy.DeployMessages.ExecutorUpdated
+import org.apache.spark.deploy.DeployMessages.MasterStateResponse
+import org.apache.spark.deploy.DeployMessages.ExecutorAdded
+import org.apache.spark.deploy.DeployMessages.RegisterApplication
+import org.apache.spark.deploy.DeployMessages.ApplicationRemoved
+import org.apache.spark.deploy.DeployMessages.Heartbeat
+import org.apache.spark.deploy.DeployMessages.RegisteredWorker
+import akka.actor.Terminated
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -81,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def preStart() {
logInfo("Starting Spark master at spark://" + host + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
import context.dispatcher
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -165,13 +180,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
actorToApp.get(actor).foreach(finishApplication)
}
- case RemoteClientDisconnected(transport, address) => {
+ case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
}
- case RemoteClientShutdown(transport, address) => {
+ case AssociationErrorEvent(_, _, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
@@ -376,11 +391,11 @@ private[spark] object Master {
actorSystem.awaitTermination()
}
- /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+ /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
def toAkkaUrl(sparkUrl: String): String = {
sparkUrl match {
case sparkUrlRegex(host, port) =>
- "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
case _ =>
throw new SparkException("Invalid master URL: " + sparkUrl)
}
@@ -388,7 +403,7 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
val timeoutDuration = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
implicit val timeout = Timeout(timeoutDuration)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a0a9d1040a..1f04c1eea5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,9 +25,9 @@ import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -113,7 +113,7 @@ private[spark] class Worker(
logInfo("Connecting to master " + masterUrl)
master = context.actorFor(Master.toAkkaUrl(masterUrl))
master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
}
@@ -165,7 +165,7 @@ private[spark] class Worker(
logInfo("Asked to kill unknown executor " + fullId)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case _: Terminated | DisassociatedEvent | AssociationErrorEvent =>
masterDisconnected()
case RequestWorkerState => {
@@ -207,8 +207,8 @@ private[spark] object Worker {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
- masterUrl, workDir)), name = "Worker")
+ actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
+ masterUrl, workDir), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ceae3b8289..99a4a95e82 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -105,7 +105,7 @@ private[spark] class Executor(
SparkEnv.set(env)
env.metricsSystem.registerSource(executorSource)
- private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
+ private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size")
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index 7839023868..46f0ef2cc6 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -19,13 +19,25 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
-import akka.actor.{ActorRef, Actor, Props, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.actor._
+import akka.remote._
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.DisassociatedEvent
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.AssociationErrorEvent
+import akka.remote.DisassociatedEvent
+import akka.actor.Terminated
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
private[spark] class StandaloneExecutorBackend(
@@ -40,14 +52,14 @@ private[spark] class StandaloneExecutorBackend(
Utils.checkHostPort(hostPort, "Expected hostport")
var executor: Executor = null
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
+ driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ // context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
@@ -69,7 +81,7 @@ private[spark] class StandaloneExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case _: Terminated | DisassociatedEvent | AssociationErrorEvent =>
logError("Driver terminated or disconnected! Shutting down.")
System.exit(1)
}
@@ -90,8 +102,8 @@ private[spark] object StandaloneExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
- val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+ actorSystem.actorOf(
+ Props(classOf[StandaloneExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c49768c0c..fa83ae19d6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index addfa077c1..49f668eb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
@@ -53,7 +53,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
@@ -101,11 +101,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
- case RemoteClientDisconnected(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
+ case DisassociatedEvent(_, remoteAddress, _) =>
+ addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
- case RemoteClientShutdown(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
+ case AssociationErrorEvent(_, _, remoteAddress, _) =>
+ addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all executors
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 3dbe61d706..babe875fa1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -119,7 +119,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index e674d120ea..af1c36b34d 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -48,28 +48,27 @@ private[spark] object AkkaUtils {
val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
- akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
+ akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
akka.stdout-loglevel = "ERROR"
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
- akka.remote.netty.hostname = "%s"
- akka.remote.netty.port = %d
- akka.remote.netty.connection-timeout = %ds
- akka.remote.netty.message-frame-size = %d MiB
- akka.remote.netty.execution-pool-size = %d
+ akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
+ akka.remote.netty.tcp.hostname = "%s"
+ akka.remote.netty.tcp.port = %d
+ akka.remote.netty.tcp.connection-timeout = %ds
+ akka.remote.netty.tcp.message-frame-size = %d MiB
+ akka.remote.netty.tcp.execution-pool-size = %d
akka.actor.default-dispatcher.throughput = %d
akka.remote.log-remote-lifecycle-events = %s
- akka.remote.netty.write-timeout = %ds
- """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
- lifecycleEvents, akkaWriteTimeout))
+ """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
+ lifecycleEvents))
val actorSystem = ActorSystem(name, akkaConf)
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
- return (actorSystem, boundPort)
+ val boundPort = provider.getDefaultAddress.port.get
+ (actorSystem, boundPort)
}
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 7a856d4081..c719a54a61 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -325,7 +325,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
// when running under LocalScheduler:
sc = new SparkContext("local-cluster[1,1,512]", "test")
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
val exception = intercept[SparkException] {
rdd.reduce((x, y) => x)
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 01a72d8401..6d1695eae7 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts {
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
- failAfter(30 seconds) {
+ failAfter(60 seconds) {
Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6013320eaa..18fb1bf590 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -109,7 +109,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
slaveTracker.trackerActor = slaveSystem.actorFor(
- "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
+ "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker")
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 13aa24fa1a..08e399f9ee 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -165,7 +165,7 @@ object ActorWordCount {
*/
val lines = ssc.actorStream[String](
- Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
+ Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
//compute wordcount
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index c8743b9e25..e83ce78aa5 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -23,6 +23,7 @@ import akka.zeromq._
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
+import akka.util.ByteString
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
@@ -40,10 +41,11 @@ object SimpleZeroMQPublisher {
val acs: ActorSystem = ActorSystem()
val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
- val messages: Array[String] = Array("words ", "may ", "count ")
+ implicit def stringToByteString(x: String) = ByteString(x)
+ val messages: List[ByteString] = List("words ", "may ", "count ")
while (true) {
Thread.sleep(1000)
- pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList)
+ pubSocket ! ZMQMessage(ByteString(topic) :: messages)
}
acs.awaitTermination()
}
@@ -78,7 +80,7 @@ object ZeroMQWordCount {
val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
+ def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
//For this stream, a zeroMQ publisher should be running.
val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 5e7ed81c1e..f18ebf1400 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -81,7 +81,7 @@ object SparkBuild extends Build {
organization := "org.apache.spark",
version := "0.8.0-SNAPSHOT",
scalaVersion := "2.10.2",
- scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION),
+// scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION),
javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
@@ -150,7 +150,7 @@ object SparkBuild extends Build {
*/
libraryDependencies ++= Seq(
- "io.netty" % "netty-all" % "4.0.0.Beta2",
+ "io.netty" % "netty-all" % "4.0.0.CR1",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
@@ -183,9 +183,9 @@ object SparkBuild extends Build {
def coreSettings = sharedSettings ++ Seq(
name := "spark-core",
resolvers ++= Seq(
- "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
- "Spray Repository" at "http://repo.spray.cc/",
- "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
+ // "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
+ // "Spray Repository" at "http://repo.spray.cc/",
+ "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/"
),
libraryDependencies ++= Seq(
@@ -200,9 +200,9 @@ object SparkBuild extends Build {
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.22",
- "com.typesafe.akka" %% "akka-remote" % "2.1.4" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-slf4j" % "2.1.4" excludeAll(excludeNetty),
- "net.liftweb" %% "lift-json" % "2.5.1",
+ "com.typesafe.akka" %% "akka-remote" % "2.2.1" excludeAll(excludeNetty),
+ "com.typesafe.akka" %% "akka-slf4j" % "2.2.1" excludeAll(excludeNetty),
+ "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
"org.apache.mesos" % "mesos" % "0.12.1",
@@ -271,7 +271,7 @@ object SparkBuild extends Build {
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-zeromq" % "2.1.4" excludeAll(excludeNetty)
+ "com.typesafe.akka" %% "akka-zeromq" % "2.2.1" excludeAll(excludeNetty)
)
)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9e14c8ace7..c722aa15ab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
+import akka.util.ByteString
/**
@@ -231,11 +232,11 @@ class StreamingContext private (
def zeroMQStream[T: ClassTag](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
- actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+ actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 8135d2499e..8242af6d5f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -29,6 +29,7 @@ import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
+import akka.util.ByteString
import twitter4j.auth.Authorization
@@ -475,7 +476,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def zeroMQStream[T](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
@@ -502,7 +503,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
@@ -522,7 +523,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index a61a1780f1..394a39fbb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -177,7 +177,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
logInfo("Attempting to register with tracker")
val ip = System.getProperty("spark.driver.host", "localhost")
val port = System.getProperty("spark.driver.port", "7077").toInt
- val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
+ val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorFor(url)
val timeout = 5.seconds
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index c220127c00..ee087a1cf0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -51,7 +51,7 @@ object ReceiverSupervisorStrategy {
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {
- * case anything :String ⇒ pushBlock(anything)
+ * case anything :String => pushBlock(anything)
* }
* }
* //Can be plugged in actorStream as follows
@@ -121,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassTag](
protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
- private class Supervisor extends Actor {
+ class Supervisor extends Actor {
override val supervisorStrategy = receiverSupervisorStrategy
val worker = context.actorOf(props, name)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
index e009325b67..ce8c56fa8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.receivers
import akka.actor.Actor
+import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
@@ -29,7 +30,7 @@ import scala.reflect.ClassTag
*/
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
extends Actor with Receiver with Logging {
override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
@@ -40,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
case Connecting ⇒ logInfo("connecting ...")
case m: ZMQMessage ⇒
- logDebug("Received message for:" + m.firstFrameAsString)
+ logDebug("Received message for:" + m.frame(0))
//We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail.map(_.payload)
+ val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes))
case Closed ⇒ logInfo("received closed ")
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..783b8dea31 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -48,7 +48,7 @@ import java.util.*;
import akka.actor.Props;
import akka.zeromq.Subscribe;
-
+import akka.util.ByteString;
// The test suite itself is Serializable so that anonymous Function implementations can be
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 6d6ef149cc..d222f412a0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -209,7 +209,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else {
// deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)