diff options
4 files changed, 5 insertions, 92 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index fa8c0f524b..c001df31aa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -56,7 +56,7 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SimrSchedulerBackend, +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend @@ -2453,12 +2453,6 @@ object SparkContext extends Logging { scheduler.initialize(backend) (backend, scheduler) - case SIMR_REGEX(simrUrl) => - val scheduler = new TaskSchedulerImpl(sc) - val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl) - scheduler.initialize(backend) - (backend, scheduler) - case zkUrl if zkUrl.startsWith("zk://") => logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " + "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.") @@ -2484,8 +2478,6 @@ private object SparkMasterRegex { val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url val MESOS_REGEX = """mesos://(.*)""".r - // Regular expression for connection to Simr cluster - val SIMR_REGEX = """simr://(.*)""".r } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala deleted file mode 100644 index a298cf5ef9..0000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster - -import org.apache.hadoop.fs.{FileSystem, Path} - -import org.apache.spark.{Logging, SparkContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rpc.RpcEndpointAddress -import org.apache.spark.scheduler.TaskSchedulerImpl - -private[spark] class SimrSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - driverFilePath: String) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with Logging { - - val tmpPath = new Path(driverFilePath + "_tmp") - val filePath = new Path(driverFilePath) - - val maxCores = conf.getInt("spark.simr.executor.cores", 1) - - override def start() { - super.start() - - val driverUrl = RpcEndpointAddress( - sc.conf.get("spark.driver.host"), - sc.conf.get("spark.driver.port").toInt, - CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - - val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) - val fs = FileSystem.get(conf) - val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - - logInfo("Writing to HDFS file: " + driverFilePath) - logInfo("Writing Driver address: " + driverUrl) - logInfo("Writing Spark UI Address: " + appUIAddress) - - // Create temporary file to prevent race condition where executors get empty driverUrl file - val temp = fs.create(tmpPath, true) - temp.writeUTF(driverUrl) - temp.writeInt(maxCores) - temp.writeUTF(appUIAddress) - temp.close() - - // "Atomic" rename - fs.rename(tmpPath, filePath) - } - - override def stop() { - val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) - val fs = FileSystem.get(conf) - if (!fs.delete(new Path(driverFilePath), false)) { - logWarning(s"error deleting ${driverFilePath}") - } - super.stop() - } - -} diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 52919c1ec0..b96c937f02 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark import org.scalatest.PrivateMethodTester import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend} +import org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.util.Utils @@ -115,13 +115,6 @@ class SparkContextSchedulerCreationSuite } } - test("simr") { - createTaskScheduler("simr://uri").backend match { - case s: SimrSchedulerBackend => // OK - case _ => fail() - } - } - test("local-cluster") { createTaskScheduler("local-cluster[3, 14, 1024]").backend match { case s: SparkDeploySchedulerBackend => // OK diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8f31a81ed0..97a1e8b433 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -261,6 +261,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.Graph.mapReduceTriplets$default$3"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.impl.GraphImpl.mapReduceTriplets") + ) ++Seq( + // SPARK-13426 Remove the support of SIMR + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkMasterRegex.SIMR_REGEX") ) case v if v.startsWith("1.6") => Seq( |