aboutsummaryrefslogtreecommitdiff
path: root/new-yarn
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-29 15:08:08 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-29 15:08:08 -0500
commitb4ceed40d6e511a1d475b3f4fbcdd2ad24c02b5a (patch)
tree486226041e35962c1543902d8ffc10a81f4223a5 /new-yarn
parent58c6fa2041b99160f254b17c2b71de9d82c53f8c (diff)
parentad3dfd153196497fefe6c1925e0a495a4373f1c5 (diff)
downloadspark-b4ceed40d6e511a1d475b3f4fbcdd2ad24c02b5a.tar.gz
spark-b4ceed40d6e511a1d475b3f4fbcdd2ad24c02b5a.tar.bz2
spark-b4ceed40d6e511a1d475b3f4fbcdd2ad24c02b5a.zip
Merge remote-tracking branch 'origin/master' into conf2
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
Diffstat (limited to 'new-yarn')
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala10
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala3
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala3
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala3
5 files changed, 14 insertions, 9 deletions
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 963b5b88be..1bba6a5ae4 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -437,8 +437,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
def monitorApplication(appId: ApplicationId): Boolean = {
+ val interval = new SparkConf().getOrElse("spark.yarn.report.interval", "1000").toLong
+
while (true) {
- Thread.sleep(1000)
+ Thread.sleep(interval)
val report = super.getApplicationReport(appId)
logInfo("Application report from ASM: \n" +
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 71d1cbd416..abc3447746 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -27,8 +27,8 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import org.apache.spark.Logging
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
+import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
@@ -233,9 +233,9 @@ private[yarn] class YarnAllocationHandler(
// Note that the list we create below tries to ensure that not all containers end up within
// a host if there is a sufficiently large number of hosts/containers.
val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
- allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(dataLocalContainers)
- allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(rackLocalContainers)
- allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(offRackContainers)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
// Run each of the allocated containers.
for (container <- allocatedContainersToProcess) {
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 63a0449e5a..522e0a9ad7 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -20,13 +20,14 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnAllocationHandler
+import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
/**
*
* This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
*/
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
def this(sc: SparkContext) = this(sc, new Configuration())
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6feaaff014..4b69f5078b 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class YarnClientSchedulerBackend(
- scheduler: ClusterScheduler,
+ scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
diff --git a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 29b3f22e13..a4638cc863 100644
--- a/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
@@ -26,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
*
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
*/
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
+private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
logInfo("Created YarnClusterScheduler")