aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWangTaoTheTonic <barneystinson@aliyun.com>2015-01-13 09:28:21 -0800
committerAndrew Or <andrew@databricks.com>2015-01-21 13:05:09 -0800
commitbb8bd11da51b3b4b59b921d9d2a550c78a865ee5 (patch)
tree1eb5d505b60c25bdad64bfb580a37d94c8e0d736
parent37db20c9414d26ebd423e9500825bedc037b20f5 (diff)
downloadspark-bb8bd11da51b3b4b59b921d9d2a550c78a865ee5.tar.gz
spark-bb8bd11da51b3b4b59b921d9d2a550c78a865ee5.tar.bz2
spark-bb8bd11da51b3b4b59b921d9d2a550c78a865ee5.zip
[SPARK-5006][Deploy]spark.port.maxRetries doesn't work
https://issues.apache.org/jira/browse/SPARK-5006 I think the issue is produced in https://github.com/apache/spark/pull/1777. Not digging mesos's backend yet. Maybe should add same logic either. Author: WangTaoTheTonic <barneystinson@aliyun.com> Author: WangTao <barneystinson@aliyun.com> Closes #3841 from WangTaoTheTonic/SPARK-5006 and squashes the following commits: 8cdf96d [WangTao] indent thing 2d86d65 [WangTaoTheTonic] fix line length 7cdfd98 [WangTaoTheTonic] fit for new HttpServer constructor 61a370d [WangTaoTheTonic] some minor fixes bc6e1ec [WangTaoTheTonic] rebase 67bcb46 [WangTaoTheTonic] put conf at 3rd position, modify suite class, add comments f450cd1 [WangTaoTheTonic] startServiceOnPort will use a SparkConf arg 29b751b [WangTaoTheTonic] rebase as ExecutorRunnableUtil changed to ExecutorRunnable 396c226 [WangTaoTheTonic] make the grammar more like scala 191face [WangTaoTheTonic] invalid value name 62ec336 [WangTaoTheTonic] spark.port.maxRetries doesn't work Conflicts: external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/JettyUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala23
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala5
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala2
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala9
14 files changed, 34 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index edc3889c9a..677c5e0f89 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -24,6 +24,7 @@ import com.google.common.io.Files
import org.apache.spark.util.Utils
private[spark] class HttpFileServer(
+ conf: SparkConf,
securityManager: SecurityManager,
requestedPort: Int = 0)
extends Logging {
@@ -41,7 +42,7 @@ private[spark] class HttpFileServer(
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
+ httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index 912558d0ca..fa22787ce7 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -42,6 +42,7 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
* around a Jetty server.
*/
private[spark] class HttpServer(
+ conf: SparkConf,
resourceBase: File,
securityManager: SecurityManager,
requestedPort: Int = 0,
@@ -57,7 +58,7 @@ private[spark] class HttpServer(
} else {
logInfo("Starting HTTP Server")
val (actualServer, actualPort) =
- Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
+ Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
server = actualServer
port = actualPort
}
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4c6c86c7ba..dae170f942 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -370,7 +370,9 @@ private[spark] object SparkConf {
}
/**
- * Return whether the given config is a Spark port config.
+ * Return true if the given config matches either `spark.*.port` or `spark.port.*`.
*/
- def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
+ def isSparkPortConf(name: String): Boolean = {
+ (name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index c04e23dd31..48a9d98e2e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -316,7 +316,7 @@ object SparkEnv extends Logging {
val httpFileServer =
if (isDriver) {
val fileServerPort = conf.getInt("spark.fileserver.port", 0)
- val server = new HttpFileServer(securityManager, fileServerPort)
+ val server = new HttpFileServer(conf, securityManager, fileServerPort)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 31f0a462f8..31d6958c40 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging {
private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
- server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
+ server =
+ new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index df4b085d22..302b496b8a 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -164,7 +164,7 @@ private[nio] class ConnectionManager(
serverChannel.socket.bind(new InetSocketAddress(port))
(serverChannel, serverChannel.socket.getLocalPort)
}
- Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
+ Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 2a27d49d2d..88fed833f9 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -201,7 +201,7 @@ private[spark] object JettyUtils extends Logging {
}
}
- val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, serverName)
+ val (server, boundPort) = Utils.startServiceOnPort[Server](port, connect, conf, serverName)
ServerInfo(server, boundPort, collection)
}
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 10010bdfa1..3505346ac4 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -53,7 +53,7 @@ private[spark] object AkkaUtils extends Logging {
val startService: Int => (ActorSystem, Int) = { actualPort =>
doCreateActorSystem(name, host, actualPort, conf, securityManager)
}
- Utils.startServiceOnPort(port, startService, name)
+ Utils.startServiceOnPort(port, startService, conf, name)
}
private def doCreateActorSystem(
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index e7160f164a..cdb322de3b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1689,17 +1689,15 @@ private[spark] object Utils extends Logging {
}
/**
- * Default maximum number of retries when binding to a port before giving up.
+ * Maximum number of retries when binding to a port before giving up.
*/
- val portMaxRetries: Int = {
- if (sys.props.contains("spark.testing")) {
+ def portMaxRetries(conf: SparkConf): Int = {
+ val maxRetries = conf.getOption("spark.port.maxRetries").map(_.toInt)
+ if (conf.contains("spark.testing")) {
// Set a higher number of retries for tests...
- sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
+ maxRetries.getOrElse(100)
} else {
- Option(SparkEnv.get)
- .flatMap(_.conf.getOption("spark.port.maxRetries"))
- .map(_.toInt)
- .getOrElse(16)
+ maxRetries.getOrElse(16)
}
}
@@ -1708,17 +1706,18 @@ private[spark] object Utils extends Logging {
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
*
* @param startPort The initial port to start the service on.
- * @param maxRetries Maximum number of retries to attempt.
- * A value of 3 means attempting ports n, n+1, n+2, and n+3, for example.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
+ * @param conf A SparkConf used to get the maximum number of retries when binding to a port.
+ * @param serviceName Name of the service.
*/
def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
- serviceName: String = "",
- maxRetries: Int = portMaxRetries): (T, Int) = {
+ conf: SparkConf,
+ serviceName: String = ""): (T, Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
+ val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 13943ed544..f333e3891b 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -80,7 +80,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
- })._2
+ }, conf)._2
}
/** Setup and start the streaming context */
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 98fe6cb301..e816255aef 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.mqtt
import java.net.{URI, ServerSocket}
import org.apache.activemq.broker.{TransportConnector, BrokerService}
-import org.apache.spark.util.Utils
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Eventually
import scala.concurrent.duration._
@@ -29,6 +28,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
@@ -101,7 +102,7 @@ class MQTTStreamSuite extends FunSuite with Eventually with BeforeAndAfter {
val socket = new ServerSocket(trialPort)
socket.close()
(null, trialPort)
- })._2
+ }, new SparkConf())._2
}
def publishData(data: String): Unit = {
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
index 646c68e60c..b646f0b6f0 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala
@@ -106,7 +106,7 @@ import org.apache.spark.util.Utils
val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles
/** Jetty server that will serve our classes to worker nodes */
val classServerPort = conf.getInt("spark.replClassServer.port", 0)
- val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
+ val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
private var currentSettings: Settings = initialSettings
var printResults = true // whether to print result lines
var totalSilence = false // whether to print anything
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 5e93a71995..69e44d4f91 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -32,7 +32,7 @@ object Main extends Logging {
val s = new Settings()
s.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-Yrepl-sync"), true)
- val classServer = new HttpServer(outputDir, new SecurityManager(conf))
+ val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf))
var sparkContext: SparkContext = _
var interp = new SparkILoop // this is a public var because tests reset it.
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index 88dad0febd..3f4e9e377e 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -75,12 +75,9 @@ trait ExecutorRunnableUtil extends Logging {
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
- sparkConf.getAll.
- filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
- foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
-
- sparkConf.getAkkaConf.
- foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+ sparkConf.getAll
+ .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
+ .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.