aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2013-10-23 09:42:25 +0800
committerRaymond Liu <raymond.liu@intel.com>2013-11-22 09:23:27 +0800
commitab3cefde5349d0de85b23b49feef493ff0b2d1ed (patch)
tree1d9bc4c9e5f2ec1a9c1d2a4d77bfb7f1f1efd806 /core
parent2fead510f74b962b293de4d724136c24a9825271 (diff)
downloadspark-ab3cefde5349d0de85b23b49feef493ff0b2d1ed.tar.gz
spark-ab3cefde5349d0de85b23b49feef493ff0b2d1ed.tar.bz2
spark-ab3cefde5349d0de85b23b49feef493ff0b2d1ed.zip
Add YarnClientClusterScheduler and Backend.
With this scheduler, the user application is launched locally, While the executor will be launched by YARN on remote nodes. This enables spark-shell to run upon YARN.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala25
1 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 42b2985b50..3a80241daa 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -226,6 +226,31 @@ class SparkContext(
scheduler.initialize(backend)
scheduler
+ case "yarn-client" =>
+ val scheduler = try {
+ val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
+ val cons = clazz.getConstructor(classOf[SparkContext])
+ cons.newInstance(this).asInstanceOf[ClusterScheduler]
+
+ } catch {
+ case th: Throwable => {
+ throw new SparkException("YARN mode not available ?", th)
+ }
+ }
+
+ val backend = try {
+ val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
+ val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext])
+ cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend]
+ } catch {
+ case th: Throwable => {
+ throw new SparkException("YARN mode not available ?", th)
+ }
+ }
+
+ scheduler.initialize(backend)
+ scheduler
+
case MESOS_REGEX(mesosUrl) =>
MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(this)