diff options
author | liguoqiang <liguoqiang@rd.tuan800.com> | 2013-12-26 11:05:07 +0800 |
---|---|---|
committer | liguoqiang <liguoqiang@rd.tuan800.com> | 2013-12-26 11:05:07 +0800 |
commit | 14fcef72db765d0313d4ce3c986c08069a1a01ae (patch) | |
tree | 1314d64584a2752754cac2801605407b30f35967 | |
parent | 56094bcd8d3ba3442b88af01393d06fd7cd79bde (diff) | |
download | spark-14fcef72db765d0313d4ce3c986c08069a1a01ae.tar.gz spark-14fcef72db765d0313d4ce3c986c08069a1a01ae.tar.bz2 spark-14fcef72db765d0313d4ce3c986c08069a1a01ae.zip |
Renamed ClusterScheduler to TaskSchedulerImpl for yarn and new-yarn
8 files changed, 22 insertions, 15 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index c27257cda4..96a24cd2b1 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.Logging import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration @@ -233,9 +234,9 @@ private[yarn] class YarnAllocationHandler( // Note that the list we create below tries to ensure that not all containers end up within // a host if there is a sufficiently large number of hosts/containers. val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(dataLocalContainers) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(rackLocalContainers) - allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(offRackContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) + allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // Run each of the allocated containers. for (container <- allocatedContainersToProcess) { diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 63a0449e5a..40307ab972 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -21,12 +21,13 @@ import org.apache.spark._ import org.apache.hadoop.conf.Configuration import org.apache.spark.deploy.yarn.YarnAllocationHandler import org.apache.spark.util.Utils +import org.apache.spark.scheduler.TaskSchedulerImpl /** * * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index b206780c78..350fc760a4 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 29b3f22e13..b318270f75 100644 --- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,12 +21,13 @@ import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration +import org.apache.spark.scheduler.TaskSchedulerImpl /** * * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done */ -private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 9ab2073529..eeee78f8ad 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.Logging import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration @@ -214,9 +215,9 @@ private[yarn] class YarnAllocationHandler( // host if there are sufficiently large number of hosts/containers. val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(rackLocalContainers) - allocatedContainers ++= ClusterScheduler.prioritizeContainers(offRackContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) + allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // Run each of the allocated containers for (container <- allocatedContainers) { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 63a0449e5a..522e0a9ad7 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -20,13 +20,14 @@ package org.apache.spark.scheduler.cluster import org.apache.spark._ import org.apache.hadoop.conf.Configuration import org.apache.spark.deploy.yarn.YarnAllocationHandler +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** * * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM. */ -private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) { +private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { def this(sc: SparkContext) = this(sc, new Configuration()) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index b206780c78..350fc760a4 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments} +import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( - scheduler: ClusterScheduler, + scheduler: TaskSchedulerImpl, sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 4e988b8017..2d9fbcb400 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler} -import org.apache.spark.scheduler.ClusterScheduler +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.util.Utils /** @@ -30,7 +30,7 @@ import org.apache.spark.util.Utils * ApplicationMaster, etc. is done */ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) - extends ClusterScheduler(sc) { + extends TaskSchedulerImpl(sc) { logInfo("Created YarnClusterScheduler") |