diff options
author | Reynold Xin <reynoldx@gmail.com> | 2013-07-29 17:53:01 -0700 |
---|---|---|
committer | Reynold Xin <reynoldx@gmail.com> | 2013-07-29 17:53:01 -0700 |
commit | 81720e13fc9e1f475dd1333babfa08f3f806a5d0 (patch) | |
tree | 38dafc772ce70dcf1a3f99253ded9769bf1f5b52 | |
parent | 23b5da14ed6413b0dcb4c0bfdb80c98c433f3c9d (diff) | |
download | spark-81720e13fc9e1f475dd1333babfa08f3f806a5d0.tar.gz spark-81720e13fc9e1f475dd1333babfa08f3f806a5d0.tar.bz2 spark-81720e13fc9e1f475dd1333babfa08f3f806a5d0.zip |
Moved all StandaloneClusterMessage's into StandaloneClusterMessages object.
3 files changed, 43 insertions, 44 deletions
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index f4003da732..e47fe50021 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -18,19 +18,16 @@ package spark.executor import java.nio.ByteBuffer -import spark.Logging -import spark.TaskState.TaskState -import spark.util.AkkaUtils + import akka.actor.{ActorRef, Actor, Props, Terminated} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} -import spark.scheduler.cluster._ -import spark.scheduler.cluster.RegisteredExecutor -import spark.scheduler.cluster.LaunchTask -import spark.scheduler.cluster.RegisterExecutorFailed -import spark.scheduler.cluster.RegisterExecutor -import spark.Utils + +import spark.{Logging, Utils} +import spark.TaskState.TaskState import spark.deploy.SparkHadoopUtil +import spark.scheduler.cluster.StandaloneClusterMessages._ +import spark.util.AkkaUtils + private[spark] class StandaloneExecutorBackend( driverUrl: String, diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index ac9e5ef94d..05c29eb72f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -17,46 +17,47 @@ package spark.scheduler.cluster -import spark.TaskState.TaskState import java.nio.ByteBuffer -import spark.util.SerializableBuffer + +import spark.TaskState.TaskState import spark.Utils +import spark.util.SerializableBuffer + private[spark] sealed trait StandaloneClusterMessage extends Serializable -// Driver to executors -private[spark] -case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage +private[spark] object StandaloneClusterMessages { -private[spark] -case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) - extends StandaloneClusterMessage + // Driver to executors + case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage -private[spark] -case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage + case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) + extends StandaloneClusterMessage -// Executors to driver -private[spark] -case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) - extends StandaloneClusterMessage { - Utils.checkHostPort(hostPort, "Expected host port") -} + case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage -private[spark] -case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer) - extends StandaloneClusterMessage + // Executors to driver + case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) + extends StandaloneClusterMessage { + Utils.checkHostPort(hostPort, "Expected host port") + } + + case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, + data: SerializableBuffer) extends StandaloneClusterMessage -private[spark] -object StatusUpdate { - /** Alternate factory method that takes a ByteBuffer directly for the data field */ - def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { - StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) + object StatusUpdate { + /** Alternate factory method that takes a ByteBuffer directly for the data field */ + def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer) + : StatusUpdate = { + StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) + } } -} -// Internal messages in driver -private[spark] case object ReviveOffers extends StandaloneClusterMessage -private[spark] case object StopDriver extends StandaloneClusterMessage + // Internal messages in driver + case object ReviveOffers extends StandaloneClusterMessage -private[spark] case class RemoveExecutor(executorId: String, reason: String) - extends StandaloneClusterMessage + case object StopDriver extends StandaloneClusterMessage + + case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage + +} diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 03a64e0192..075a7cbf7e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -17,17 +17,18 @@ package spark.scheduler.cluster +import java.util.concurrent.atomic.AtomicInteger + import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ -import akka.util.duration._ +import akka.dispatch.Await import akka.pattern.ask +import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} import akka.util.Duration import spark.{Utils, SparkException, Logging, TaskState} -import akka.dispatch.Await -import java.util.concurrent.atomic.AtomicInteger -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import spark.scheduler.cluster.StandaloneClusterMessages._ /** * A standalone scheduler backend, which waits for standalone executors to connect to it through |