aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-07-29 17:53:01 -0700
committerReynold Xin <reynoldx@gmail.com>2013-07-29 17:53:01 -0700
commit81720e13fc9e1f475dd1333babfa08f3f806a5d0 (patch)
tree38dafc772ce70dcf1a3f99253ded9769bf1f5b52 /core/src
parent23b5da14ed6413b0dcb4c0bfdb80c98c433f3c9d (diff)
downloadspark-81720e13fc9e1f475dd1333babfa08f3f806a5d0.tar.gz
spark-81720e13fc9e1f475dd1333babfa08f3f806a5d0.tar.bz2
spark-81720e13fc9e1f475dd1333babfa08f3f806a5d0.zip
Moved all StandaloneClusterMessage's into StandaloneClusterMessages object.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala61
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala9
3 files changed, 43 insertions, 44 deletions
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index f4003da732..e47fe50021 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -18,19 +18,16 @@
package spark.executor
import java.nio.ByteBuffer
-import spark.Logging
-import spark.TaskState.TaskState
-import spark.util.AkkaUtils
+
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredExecutor
-import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterExecutorFailed
-import spark.scheduler.cluster.RegisterExecutor
-import spark.Utils
+
+import spark.{Logging, Utils}
+import spark.TaskState.TaskState
import spark.deploy.SparkHadoopUtil
+import spark.scheduler.cluster.StandaloneClusterMessages._
+import spark.util.AkkaUtils
+
private[spark] class StandaloneExecutorBackend(
driverUrl: String,
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index ac9e5ef94d..05c29eb72f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -17,46 +17,47 @@
package spark.scheduler.cluster
-import spark.TaskState.TaskState
import java.nio.ByteBuffer
-import spark.util.SerializableBuffer
+
+import spark.TaskState.TaskState
import spark.Utils
+import spark.util.SerializableBuffer
+
private[spark] sealed trait StandaloneClusterMessage extends Serializable
-// Driver to executors
-private[spark]
-case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
+private[spark] object StandaloneClusterMessages {
-private[spark]
-case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
- extends StandaloneClusterMessage
+ // Driver to executors
+ case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
-private[spark]
-case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
+ case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+ extends StandaloneClusterMessage
-// Executors to driver
-private[spark]
-case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
- extends StandaloneClusterMessage {
- Utils.checkHostPort(hostPort, "Expected host port")
-}
+ case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-private[spark]
-case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
- extends StandaloneClusterMessage
+ // Executors to driver
+ case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+ extends StandaloneClusterMessage {
+ Utils.checkHostPort(hostPort, "Expected host port")
+ }
+
+ case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
+ data: SerializableBuffer) extends StandaloneClusterMessage
-private[spark]
-object StatusUpdate {
- /** Alternate factory method that takes a ByteBuffer directly for the data field */
- def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
- StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ object StatusUpdate {
+ /** Alternate factory method that takes a ByteBuffer directly for the data field */
+ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
+ : StatusUpdate = {
+ StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ }
}
-}
-// Internal messages in driver
-private[spark] case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case object StopDriver extends StandaloneClusterMessage
+ // Internal messages in driver
+ case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case class RemoveExecutor(executorId: String, reason: String)
- extends StandaloneClusterMessage
+ case object StopDriver extends StandaloneClusterMessage
+
+ case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
+
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 03a64e0192..075a7cbf7e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -17,17 +17,18 @@
package spark.scheduler.cluster
+import java.util.concurrent.atomic.AtomicInteger
+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
-import akka.util.duration._
+import akka.dispatch.Await
import akka.pattern.ask
+import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import akka.util.Duration
import spark.{Utils, SparkException, Logging, TaskState}
-import akka.dispatch.Await
-import java.util.concurrent.atomic.AtomicInteger
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import spark.scheduler.cluster.StandaloneClusterMessages._
/**
* A standalone scheduler backend, which waits for standalone executors to connect to it through