diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-09-06 21:30:42 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-10-24 11:59:51 -0700 |
commit | 05a0df2b9e0e4c3d032404187c0adf6d6d881860 (patch) | |
tree | f0b8d432c03562c875f3a00c0f4e0d63b728327f /core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala | |
parent | dadfc63b0314060876ac1787d4de72b37221139c (diff) | |
download | spark-05a0df2b9e0e4c3d032404187c0adf6d6d881860.tar.gz spark-05a0df2b9e0e4c3d032404187c0adf6d6d881860.tar.bz2 spark-05a0df2b9e0e4c3d032404187c0adf6d6d881860.zip |
Makes Spark SIMR ready.
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala new file mode 100644 index 0000000000..ae56244979 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -0,0 +1,69 @@ +package org.apache.spark.scheduler.cluster + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} + +private[spark] class SimrSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + driverFilePath: String) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + val tmpPath = new Path(driverFilePath + "_tmp"); + val filePath = new Path(driverFilePath); + + val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt + + override def start() { + super.start() + + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + logInfo("Writing to HDFS file: " + driverFilePath); + logInfo("Writing AKKA address: " + driverUrl); + + // Create temporary file to prevent race condition where executors get empty driverUrl file + val temp = fs.create(tmpPath, true) + temp.writeUTF(driverUrl) + temp.writeInt(maxCores) + temp.close() + + // "Atomic" rename + fs.rename(tmpPath, filePath); + } + + override def stop() { + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.delete(new Path(driverFilePath), false); + super.stopExecutors() + super.stop() + } +} + + |