diff options
author | Imran Rashid <irashid@cloudera.com> | 2015-08-21 08:41:36 -0500 |
---|---|---|
committer | Tom Graves <tgraves@yahoo-inc.com> | 2015-08-21 08:41:36 -0500 |
commit | 708036c1de52d674ceff30ac465e1dcedeb8dde8 (patch) | |
tree | c9ecf08801d862c96c166d7db5915a66f4c89b56 /core/src | |
parent | bb220f6570aa0b95598b30524224a3e82c1effbc (diff) | |
download | spark-708036c1de52d674ceff30ac465e1dcedeb8dde8.tar.gz spark-708036c1de52d674ceff30ac465e1dcedeb8dde8.tar.bz2 spark-708036c1de52d674ceff30ac465e1dcedeb8dde8.zip |
[SPARK-9439] [YARN] External shuffle service robust to NM restarts using leveldb
https://issues.apache.org/jira/browse/SPARK-9439
In general, Yarn apps should be robust to NodeManager restarts. However, if you run spark with the external shuffle service on, after a NM restart all shuffles fail, b/c the shuffle service has lost some state with info on each executor. (Note the shuffle data is perfectly fine on disk across a NM restart, the problem is we've lost the small bit of state that lets us *find* those files.)
The solution proposed here is that the external shuffle service can write out its state to leveldb (backed by a local file) every time an executor is added. When running with yarn, that file is in the NM's local dir. Whenever the service is started, it looks for that file, and if it exists, it reads the file and re-registers all executors there.
Nothing is changed in non-yarn modes with this patch. The service is not given a place to save the state to, so it operates the same as before. This should make it easy to update other cluster managers as well, by just supplying the right file & the equivalent of yarn's `initializeApplication` -- I'm not familiar enough with those modes to know how to do that.
Author: Imran Rashid <irashid@cloudera.com>
Closes #7943 from squito/leveldb_external_shuffle_service_NM_restart and squashes the following commits:
0d285d3 [Imran Rashid] review feedback
70951d6 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
5c71c8c [Imran Rashid] save executor to db before registering; style
2499c8c [Imran Rashid] explicit dependency on jackson-annotations
795d28f [Imran Rashid] review feedback
81f80e2 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
594d520 [Imran Rashid] use json to serialize application executor info
1a7980b [Imran Rashid] version
8267d2a [Imran Rashid] style
e9f99e8 [Imran Rashid] cleanup the handling of bad dbs a little
9378ba3 [Imran Rashid] fail gracefully on corrupt leveldb files
acedb62 [Imran Rashid] switch to writing out one record per executor
79922b7 [Imran Rashid] rely on yarn to call stopApplication; assorted cleanup
12b6a35 [Imran Rashid] save registered executors when apps are removed; add tests
c878fbe [Imran Rashid] better explanation of shuffle service port handling
694934c [Imran Rashid] only open leveldb connection once per service
d596410 [Imran Rashid] store executor data in leveldb
59800b7 [Imran Rashid] Files.move in case renaming is unsupported
32fe5ae [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
d7450f0 [Imran Rashid] style
f729e2b [Imran Rashid] debugging
4492835 [Imran Rashid] lol, dont use a PrintWriter b/c of scalastyle checks
0a39b98 [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
55f49fc [Imran Rashid] make sure the service doesnt die if the registered executor file is corrupt; add tests
245db19 [Imran Rashid] style
62586a6 [Imran Rashid] just serialize the whole executors map
bdbbf0d [Imran Rashid] comments, remove some unnecessary changes
857331a [Imran Rashid] better tests & comments
bb9d1e6 [Imran Rashid] formatting
bdc4b32 [Imran Rashid] rename
86e0cb9 [Imran Rashid] for tests, shuffle service finds an open port
23994ff [Imran Rashid] style
7504de8 [Imran Rashid] style
a36729c [Imran Rashid] cleanup
efb6195 [Imran Rashid] proper unit test, and no longer leak if apps stop during NM restart
dd93dc0 [Imran Rashid] test for shuffle service w/ NM restarts
d596969 [Imran Rashid] cleanup imports
0e9d69b [Imran Rashid] better names
9eae119 [Imran Rashid] cleanup lots of duplication
1136f44 [Imran Rashid] test needs to have an actual shuffle
0b588bd [Imran Rashid] more fixes ...
ad122ef [Imran Rashid] more fixes
5e5a7c3 [Imran Rashid] fix build
c69f46b [Imran Rashid] maybe working version, needs tests & cleanup ...
bb3ba49 [Imran Rashid] minor cleanup
36127d3 [Imran Rashid] wip
b9d2ced [Imran Rashid] incomplete setup for external shuffle service tests
Diffstat (limited to 'core/src')
4 files changed, 15 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 20a9faa178..22ef701d83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -53,7 +53,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana /** Create a new shuffle block handler. Factored out for subclasses to override. */ protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = { - new ExternalShuffleBlockHandler(conf) + new ExternalShuffleBlockHandler(conf, null) } /** Starts the external shuffle service if the user has configured us to. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index 061857476a..12337a940a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -34,7 +34,7 @@ import org.apache.spark.network.util.TransportConf * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. */ private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf) - extends ExternalShuffleBlockHandler(transportConf) with Logging { + extends ExternalShuffleBlockHandler(transportConf, null) with Logging { // Stores a map of driver socket addresses to app ids private val connectedApps = new mutable.HashMap[SocketAddress, String] diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index eedb27942e..fefaef0ab8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -93,8 +93,17 @@ private[spark] class BlockManager( // Port used by the external shuffle service. In Yarn mode, this may be already be // set through the Hadoop configuration as the server is launched in the Yarn NM. - private val externalShuffleServicePort = - Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt + private val externalShuffleServicePort = { + val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt + if (tmpPort == 0) { + // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds + // an open port. But we still need to tell our spark apps the right port to use. So + // only if the yarn config has the port set to 0, we prefer the value in the spark config + conf.get("spark.shuffle.service.port").toInt + } else { + tmpPort + } + } // Check that we're not using external shuffle service with consolidated shuffle files. if (externalShuffleServiceEnabled @@ -191,6 +200,7 @@ private[spark] class BlockManager( executorId, blockTransferService.hostName, blockTransferService.port) shuffleServerId = if (externalShuffleServiceEnabled) { + logInfo(s"external shuffle service port = $externalShuffleServicePort") BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index c38d70252a..e846a72c88 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -36,7 +36,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll { override def beforeAll() { val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2) - rpcHandler = new ExternalShuffleBlockHandler(transportConf) + rpcHandler = new ExternalShuffleBlockHandler(transportConf, null) val transportContext = new TransportContext(transportConf, rpcHandler) server = transportContext.createServer() |