From 718e8c20526847657a58ab7ea5e4c86c367ae6d9 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 6 Oct 2013 00:02:08 -0700 Subject: Change url format to spark://host1:port1,host2:port2 This replaces the format of spark://host1:port1,spark://host2:port2 and is more consistent with ZooKeeper's zk:// urls. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- .../scala/org/apache/spark/deploy/FaultToleranceTest.scala | 10 ++++++---- .../src/main/scala/org/apache/spark/deploy/client/Client.scala | 2 ++ .../src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +++ .../scala/org/apache/spark/deploy/worker/WorkerArguments.scala | 2 +- 5 files changed, 14 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5318847276..b2643879a0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -153,7 +153,7 @@ class SparkContext( // Regular expression for simulating a Spark cluster of [N, cores, memory] locally val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters - val SPARK_REGEX = """(spark://.*)""".r + val SPARK_REGEX = """spark://(.*)""".r //Regular expression for connection to Mesos cluster val MESOS_REGEX = """(mesos://.*)""".r @@ -169,7 +169,7 @@ class SparkContext( case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) - val masterUrls = sparkUrl.split(",") + val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) scheduler.initialize(backend) scheduler diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index f9e40187c8..8bac62b860 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -36,6 +36,8 @@ import org.apache.spark.deploy.master.RecoveryState /** * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master. + * Execute using + * ./spark-class org.apache.spark.deploy.FaultToleranceTest * * In order to mimic a real distributed cluster more closely, Docker is used. * Unfortunately, this dependency means that the suite cannot be run automatically without a @@ -56,7 +58,7 @@ private[spark] object FaultToleranceTest extends App with Logging { assertTrue(sparkHome != null, "Run with a valid SPARK_HOME") val containerSparkHome = "/opt/spark" - val dockerMountString = "%s:%s".format(sparkHome, containerSparkHome) + val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome) System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip @@ -172,12 +174,12 @@ private[spark] object FaultToleranceTest extends App with Logging { } def addMasters(num: Int) { - (1 to num).foreach { _ => masters += SparkDocker.startMaster(sparkHome) } + (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) } } def addWorkers(num: Int) { val masterUrls = getMasterUrls(masters) - (1 to num).foreach { _ => workers += SparkDocker.startWorker(sparkHome, masterUrls) } + (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) } } /** Creates a SparkContext, which constructs a Client to interact with our cluster. */ @@ -190,7 +192,7 @@ private[spark] object FaultToleranceTest extends App with Logging { } def getMasterUrls(masters: Seq[TestMasterInfo]): String = { - masters.map(master => "spark://" + master.ip + ":7077").mkString(",") + "spark://" + masters.map(master => master.ip + ":7077").mkString(",") } def getLeader: TestMasterInfo = { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 198d5cee7b..0d4682fcc1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -38,6 +38,8 @@ import org.apache.spark.deploy.master.Master /** * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description, * and a listener for cluster events, and calls back the listener when various events occur. + * + * @param masterUrls Each url should look like spark://host:port. */ private[spark] class Client( actorSystem: ActorSystem, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 25ba75619a..216d9d44ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -35,6 +35,9 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} +/** + * @param masterUrls Each url should look like spark://host:port. + */ private[spark] class Worker( host: String, port: Int, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 16d8686490..3ed528e6b3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -89,7 +89,7 @@ private[spark] class WorkerArguments(args: Array[String]) { if (masters != null) { // Two positional arguments were given printUsageAndExit(1) } - masters = value.split(",") + masters = value.stripPrefix("spark://").split(",").map("spark://" + _) parse(tail) case Nil => -- cgit v1.2.3