aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala2
4 files changed, 15 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 20a9faa178..22ef701d83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -53,7 +53,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
- new ExternalShuffleBlockHandler(conf)
+ new ExternalShuffleBlockHandler(conf, null)
}
/** Starts the external shuffle service if the user has configured us to. */
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
index 061857476a..12337a940a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -34,7 +34,7 @@ import org.apache.spark.network.util.TransportConf
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
*/
private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
- extends ExternalShuffleBlockHandler(transportConf) with Logging {
+ extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
// Stores a map of driver socket addresses to app ids
private val connectedApps = new mutable.HashMap[SocketAddress, String]
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index eedb27942e..fefaef0ab8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -93,8 +93,17 @@ private[spark] class BlockManager(
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
- private val externalShuffleServicePort =
- Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
+ private val externalShuffleServicePort = {
+ val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
+ if (tmpPort == 0) {
+ // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
+ // an open port. But we still need to tell our spark apps the right port to use. So
+ // only if the yarn config has the port set to 0, we prefer the value in the spark config
+ conf.get("spark.shuffle.service.port").toInt
+ } else {
+ tmpPort
+ }
+ }
// Check that we're not using external shuffle service with consolidated shuffle files.
if (externalShuffleServiceEnabled
@@ -191,6 +200,7 @@ private[spark] class BlockManager(
executorId, blockTransferService.hostName, blockTransferService.port)
shuffleServerId = if (externalShuffleServiceEnabled) {
+ logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index c38d70252a..e846a72c88 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -36,7 +36,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
- rpcHandler = new ExternalShuffleBlockHandler(transportConf)
+ rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()