aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-01-07 23:01:30 -0800
committerReynold Xin <rxin@databricks.com>2015-01-07 23:01:30 -0800
commit2b729d22500c682435ef7adde566551b45a3c6e3 (patch)
tree2574980dc07ca1d422900b529aca5f9759be9e23 /core
parentd345ebebd554ac3faa4e870bd7800ed02e89da58 (diff)
downloadspark-2b729d22500c682435ef7adde566551b45a3c6e3.tar.gz
spark-2b729d22500c682435ef7adde566551b45a3c6e3.tar.bz2
spark-2b729d22500c682435ef7adde566551b45a3c6e3.zip
[SPARK-5126][Core] Verify Spark urls before creating Actors so that invalid urls can crash the process.
Because `actorSelection` will return `deadLetters` for an invalid path, Worker keeps quiet for an invalid master url. It's better to log an error so that people can find such problem quickly. This PR will check the url before sending to `actorSelection`, throw and log a SparkException for an invalid url. Author: zsxwing <zsxwing@gmail.com> Closes #3927 from zsxwing/SPARK-5126 and squashes the following commits: 9d429ee [zsxwing] Create a utility method in Utils to parse Spark url; verify urls before creating Actors so that invalid urls can crash the process. 8286e51 [zsxwing] Check the url before sending to Akka and log the error if the url is invalid
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala25
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala29
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala50
6 files changed, 116 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index f2687ce6b4..7c1c831c24 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -160,6 +160,8 @@ object Client {
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
+ // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
+ Master.toAkkaUrl(driverArgs.master)
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
actorSystem.awaitTermination()
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 4efebcaa35..39a7b0319b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -26,7 +26,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
-import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -47,6 +47,8 @@ private[spark] class AppClient(
conf: SparkConf)
extends Logging {
+ val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
+
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -75,9 +77,9 @@ private[spark] class AppClient(
}
def tryRegisterAllMasters() {
- for (masterUrl <- masterUrls) {
- logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+ for (masterAkkaUrl <- masterAkkaUrls) {
+ logInfo("Connecting to master " + masterAkkaUrl + "...")
+ val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}
@@ -103,20 +105,14 @@ private[spark] class AppClient(
}
def changeMaster(url: String) {
+ // activeMasterUrl is a valid Spark url since we receive it from master.
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
- masterAddress = activeMasterUrl match {
- case Master.sparkUrlRegex(host, port) =>
- Address("akka.tcp", Master.systemName, host, port.toInt)
- case x =>
- throw new SparkException("Invalid spark URL: " + x)
- }
+ masterAddress = Master.toAkkaAddress(activeMasterUrl)
}
private def isPossibleMaster(remoteUrl: Address) = {
- masterUrls.map(s => Master.toAkkaUrl(s))
- .map(u => AddressFromURIString(u).hostPort)
- .contains(remoteUrl.hostPort)
+ masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort)
}
override def receiveWithLogging = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e8a5cfc746..b1c015246b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -845,7 +845,6 @@ private[spark] class Master(
private[spark] object Master extends Logging {
val systemName = "sparkMaster"
private val actorName = "Master"
- val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
SignalLogger.register(log)
@@ -855,14 +854,24 @@ private[spark] object Master extends Logging {
actorSystem.awaitTermination()
}
- /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+ /**
+ * Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`.
+ *
+ * @throws SparkException if the url is invalid
+ */
def toAkkaUrl(sparkUrl: String): String = {
- sparkUrl match {
- case sparkUrlRegex(host, port) =>
- "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
- case _ =>
- throw new SparkException("Invalid master URL: " + sparkUrl)
- }
+ val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
+ "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ }
+
+ /**
+ * Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
+ *
+ * @throws SparkException if the url is invalid
+ */
+ def toAkkaAddress(sparkUrl: String): Address = {
+ val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
+ Address("akka.tcp", systemName, host, port)
}
def startSystemAndActor(
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index f0f3da5eec..1359983012 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
/**
- * @param masterUrls Each url should look like spark://host:port.
+ * @param masterAkkaUrls Each url should be a valid akka url.
*/
private[spark] class Worker(
host: String,
@@ -48,7 +48,7 @@ private[spark] class Worker(
webUiPort: Int,
cores: Int,
memory: Int,
- masterUrls: Array[String],
+ masterAkkaUrls: Array[String],
actorSystemName: String,
actorName: String,
workDirPath: String = null,
@@ -171,15 +171,11 @@ private[spark] class Worker(
}
def changeMaster(url: String, uiUrl: String) {
+ // activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
- masterAddress = activeMasterUrl match {
- case Master.sparkUrlRegex(_host, _port) =>
- Address("akka.tcp", Master.systemName, _host, _port.toInt)
- case x =>
- throw new SparkException("Invalid spark URL: " + x)
- }
+ masterAddress = Master.toAkkaAddress(activeMasterUrl)
connected = true
// Cancel any outstanding re-registration attempts because we found a new master
registrationRetryTimer.foreach(_.cancel())
@@ -187,9 +183,9 @@ private[spark] class Worker(
}
private def tryRegisterAllMasters() {
- for (masterUrl <- masterUrls) {
- logInfo("Connecting to master " + masterUrl + "...")
- val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
+ for (masterAkkaUrl <- masterAkkaUrls) {
+ logInfo("Connecting to master " + masterAkkaUrl + "...")
+ val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
@@ -527,8 +523,9 @@ private[spark] object Worker extends Logging {
val securityMgr = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
+ val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
+ masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0d771baaa6..9d6b6161ce 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1842,6 +1842,35 @@ private[spark] object Utils extends Logging {
sparkValue
}
}
+
+ /**
+ * Return a pair of host and port extracted from the `sparkUrl`.
+ *
+ * A spark url (`spark://host:port`) is a special URI that its scheme is `spark` and only contains
+ * host and port.
+ *
+ * @throws SparkException if `sparkUrl` is invalid.
+ */
+ def extractHostPortFromSparkUrl(sparkUrl: String): (String, Int) = {
+ try {
+ val uri = new java.net.URI(sparkUrl)
+ val host = uri.getHost
+ val port = uri.getPort
+ if (uri.getScheme != "spark" ||
+ host == null ||
+ port < 0 ||
+ (uri.getPath != null && !uri.getPath.isEmpty) || // uri.getPath returns "" instead of null
+ uri.getFragment != null ||
+ uri.getQuery != null ||
+ uri.getUserInfo != null) {
+ throw new SparkException("Invalid master URL: " + sparkUrl)
+ }
+ (host, port)
+ } catch {
+ case e: java.net.URISyntaxException =>
+ throw new SparkException("Invalid master URL: " + sparkUrl, e)
+ }
+ }
}
/**
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
new file mode 100644
index 0000000000..3d2335f9b3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import akka.actor.Address
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkException
+
+class MasterSuite extends FunSuite {
+
+ test("toAkkaUrl") {
+ val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234")
+ assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl)
+ }
+
+ test("toAkkaUrl: a typo url") {
+ val e = intercept[SparkException] {
+ Master.toAkkaUrl("spark://1.2. 3.4:1234")
+ }
+ assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
+ }
+
+ test("toAkkaAddress") {
+ val address = Master.toAkkaAddress("spark://1.2.3.4:1234")
+ assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address)
+ }
+
+ test("toAkkaAddress: a typo url") {
+ val e = intercept[SparkException] {
+ Master.toAkkaAddress("spark://1.2. 3.4:1234")
+ }
+ assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage)
+ }
+}