aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-06 00:02:08 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-06 00:02:08 -0700
commit718e8c20526847657a58ab7ea5e4c86c367ae6d9 (patch)
tree7124fc627754b4d4ba55c735788d44428ef04384 /core
parente1190229e13453cdb1e7c28fdf300d1f8dd717c2 (diff)
downloadspark-718e8c20526847657a58ab7ea5e4c86c367ae6d9.tar.gz
spark-718e8c20526847657a58ab7ea5e4c86c367ae6d9.tar.bz2
spark-718e8c20526847657a58ab7ea5e4c86c367ae6d9.zip
Change url format to spark://host1:port1,host2:port2
This replaces the format of spark://host1:port1,spark://host2:port2 and is more consistent with ZooKeeper's zk:// urls.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala2
5 files changed, 14 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 5318847276..b2643879a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -153,7 +153,7 @@ class SparkContext(
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
- val SPARK_REGEX = """(spark://.*)""".r
+ val SPARK_REGEX = """spark://(.*)""".r
//Regular expression for connection to Mesos cluster
val MESOS_REGEX = """(mesos://.*)""".r
@@ -169,7 +169,7 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
- val masterUrls = sparkUrl.split(",")
+ val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
scheduler
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index f9e40187c8..8bac62b860 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -36,6 +36,8 @@ import org.apache.spark.deploy.master.RecoveryState
/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
+ * Execute using
+ * ./spark-class org.apache.spark.deploy.FaultToleranceTest
*
* In order to mimic a real distributed cluster more closely, Docker is used.
* Unfortunately, this dependency means that the suite cannot be run automatically without a
@@ -56,7 +58,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")
val containerSparkHome = "/opt/spark"
- val dockerMountString = "%s:%s".format(sparkHome, containerSparkHome)
+ val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip
@@ -172,12 +174,12 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
def addMasters(num: Int) {
- (1 to num).foreach { _ => masters += SparkDocker.startMaster(sparkHome) }
+ (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
}
def addWorkers(num: Int) {
val masterUrls = getMasterUrls(masters)
- (1 to num).foreach { _ => workers += SparkDocker.startWorker(sparkHome, masterUrls) }
+ (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}
/** Creates a SparkContext, which constructs a Client to interact with our cluster. */
@@ -190,7 +192,7 @@ private[spark] object FaultToleranceTest extends App with Logging {
}
def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
- masters.map(master => "spark://" + master.ip + ":7077").mkString(",")
+ "spark://" + masters.map(master => master.ip + ":7077").mkString(",")
}
def getLeader: TestMasterInfo = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 198d5cee7b..0d4682fcc1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -38,6 +38,8 @@ import org.apache.spark.deploy.master.Master
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
+ *
+ * @param masterUrls Each url should look like spark://host:port.
*/
private[spark] class Client(
actorSystem: ActorSystem,
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 25ba75619a..216d9d44ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -35,6 +35,9 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
+/**
+ * @param masterUrls Each url should look like spark://host:port.
+ */
private[spark] class Worker(
host: String,
port: Int,
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 16d8686490..3ed528e6b3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -89,7 +89,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
if (masters != null) { // Two positional arguments were given
printUsageAndExit(1)
}
- masters = value.split(",")
+ masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
parse(tail)
case Nil =>