aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2015-07-31 20:27:00 +0100
committerSean Owen <sowen@cloudera.com>2015-07-31 20:27:00 +0100
commitc0686668ae6a92b6bb4801a55c3b78aedbee816a (patch)
treef0a88c5c1a71e13b62f50268863483302410873e
parenta8340fa7df17e3f0a3658f8b8045ab840845a72a (diff)
downloadspark-c0686668ae6a92b6bb4801a55c3b78aedbee816a.tar.gz
spark-c0686668ae6a92b6bb4801a55c3b78aedbee816a.tar.bz2
spark-c0686668ae6a92b6bb4801a55c3b78aedbee816a.zip
[SPARK-9202] capping maximum number of executor&driver information kept in Worker
https://issues.apache.org/jira/browse/SPARK-9202 Author: CodingCat <zhunansjtu@gmail.com> Closes #7714 from CodingCat/SPARK-9202 and squashes the following commits: 23977fb [CodingCat] add comments about why we don't synchronize finishedExecutors & finishedDrivers dc9772d [CodingCat] addressing the comments e125241 [CodingCat] stylistic fix 80bfe52 [CodingCat] fix JsonProtocolSuite d7d9485 [CodingCat] styistic fix and respect insert ordering 031755f [CodingCat] add license info & stylistic fix c3b5361 [CodingCat] test cases and docs c557b3a [CodingCat] applications are fine 9cac751 [CodingCat] application is fine... ad87ed7 [CodingCat] trimFinishedExecutorsAndDrivers
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala124
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala89
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala59
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala133
-rw-r--r--docs/configuration.md14
6 files changed, 329 insertions, 94 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 82e9578bbc..0276c24f85 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,7 +25,7 @@ import java.util.concurrent._
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
import scala.util.Random
import scala.util.control.NonFatal
@@ -115,13 +115,18 @@ private[worker] class Worker(
}
var workDir: File = null
- val finishedExecutors = new HashMap[String, ExecutorRunner]
+ val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
- val finishedDrivers = new HashMap[String, DriverRunner]
+ val finishedDrivers = new LinkedHashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]
+ val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
+ WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
+ val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
+ WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
+
// The shuffle service is not actually started unless configured.
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
@@ -461,25 +466,7 @@ private[worker] class Worker(
}
case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
- sendToMaster(executorStateChanged)
- val fullId = appId + "/" + execId
- if (ExecutorState.isFinished(state)) {
- executors.get(fullId) match {
- case Some(executor) =>
- logInfo("Executor " + fullId + " finished with state " + state +
- message.map(" message " + _).getOrElse("") +
- exitStatus.map(" exitStatus " + _).getOrElse(""))
- executors -= fullId
- finishedExecutors(fullId) = executor
- coresUsed -= executor.cores
- memoryUsed -= executor.memory
- case None =>
- logInfo("Unknown Executor " + fullId + " finished with state " + state +
- message.map(" message " + _).getOrElse("") +
- exitStatus.map(" exitStatus " + _).getOrElse(""))
- }
- maybeCleanupApplication(appId)
- }
+ handleExecutorStateChanged(executorStateChanged)
case KillExecutor(masterUrl, appId, execId) =>
if (masterUrl != activeMasterUrl) {
@@ -523,24 +510,8 @@ private[worker] class Worker(
}
}
- case driverStageChanged @ DriverStateChanged(driverId, state, exception) => {
- state match {
- case DriverState.ERROR =>
- logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
- case DriverState.FAILED =>
- logWarning(s"Driver $driverId exited with failure")
- case DriverState.FINISHED =>
- logInfo(s"Driver $driverId exited successfully")
- case DriverState.KILLED =>
- logInfo(s"Driver $driverId was killed by user")
- case _ =>
- logDebug(s"Driver $driverId changed state to $state")
- }
- sendToMaster(driverStageChanged)
- val driver = drivers.remove(driverId).get
- finishedDrivers(driverId) = driver
- memoryUsed -= driver.driverDesc.mem
- coresUsed -= driver.driverDesc.cores
+ case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
+ handleDriverStateChanged(driverStateChanged)
}
case ReregisterWithMaster =>
@@ -614,6 +585,78 @@ private[worker] class Worker(
webUi.stop()
metricsSystem.stop()
}
+
+ private def trimFinishedExecutorsIfNecessary(): Unit = {
+ // do not need to protect with locks since both WorkerPage and Restful server get data through
+ // thread-safe RpcEndPoint
+ if (finishedExecutors.size > retainedExecutors) {
+ finishedExecutors.take(math.max(finishedExecutors.size / 10, 1)).foreach {
+ case (executorId, _) => finishedExecutors.remove(executorId)
+ }
+ }
+ }
+
+ private def trimFinishedDriversIfNecessary(): Unit = {
+ // do not need to protect with locks since both WorkerPage and Restful server get data through
+ // thread-safe RpcEndPoint
+ if (finishedDrivers.size > retainedDrivers) {
+ finishedDrivers.take(math.max(finishedDrivers.size / 10, 1)).foreach {
+ case (driverId, _) => finishedDrivers.remove(driverId)
+ }
+ }
+ }
+
+ private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
+ val driverId = driverStateChanged.driverId
+ val exception = driverStateChanged.exception
+ val state = driverStateChanged.state
+ state match {
+ case DriverState.ERROR =>
+ logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
+ case DriverState.FAILED =>
+ logWarning(s"Driver $driverId exited with failure")
+ case DriverState.FINISHED =>
+ logInfo(s"Driver $driverId exited successfully")
+ case DriverState.KILLED =>
+ logInfo(s"Driver $driverId was killed by user")
+ case _ =>
+ logDebug(s"Driver $driverId changed state to $state")
+ }
+ sendToMaster(driverStateChanged)
+ val driver = drivers.remove(driverId).get
+ finishedDrivers(driverId) = driver
+ trimFinishedDriversIfNecessary()
+ memoryUsed -= driver.driverDesc.mem
+ coresUsed -= driver.driverDesc.cores
+ }
+
+ private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
+ Unit = {
+ sendToMaster(executorStateChanged)
+ val state = executorStateChanged.state
+ if (ExecutorState.isFinished(state)) {
+ val appId = executorStateChanged.appId
+ val fullId = appId + "/" + executorStateChanged.execId
+ val message = executorStateChanged.message
+ val exitStatus = executorStateChanged.exitStatus
+ executors.get(fullId) match {
+ case Some(executor) =>
+ logInfo("Executor " + fullId + " finished with state " + state +
+ message.map(" message " + _).getOrElse("") +
+ exitStatus.map(" exitStatus " + _).getOrElse(""))
+ executors -= fullId
+ finishedExecutors(fullId) = executor
+ trimFinishedExecutorsIfNecessary()
+ coresUsed -= executor.cores
+ memoryUsed -= executor.memory
+ case None =>
+ logInfo("Unknown Executor " + fullId + " finished with state " + state +
+ message.map(" message " + _).getOrElse("") +
+ exitStatus.map(" exitStatus " + _).getOrElse(""))
+ }
+ maybeCleanupApplication(appId)
+ }
+ }
}
private[deploy] object Worker extends Logging {
@@ -669,5 +712,4 @@ private[deploy] object Worker extends Logging {
cmd
}
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 334a5b1014..709a272335 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -53,6 +53,8 @@ class WorkerWebUI(
}
}
-private[ui] object WorkerWebUI {
+private[worker] object WorkerWebUI {
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
+ val DEFAULT_RETAINED_DRIVERS = 1000
+ val DEFAULT_RETAINED_EXECUTORS = 1000
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
new file mode 100644
index 0000000000..967aa0976f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.io.File
+import java.util.Date
+
+import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
+import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
+import org.apache.spark.{SecurityManager, SparkConf}
+
+private[deploy] object DeployTestUtils {
+ def createAppDesc(): ApplicationDescription = {
+ val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
+ new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
+ }
+
+ def createAppInfo() : ApplicationInfo = {
+ val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
+ "id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
+ appInfo.endTime = JsonConstants.currTimeInMillis
+ appInfo
+ }
+
+ def createDriverCommand(): Command = new Command(
+ "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
+ Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
+ )
+
+ def createDriverDesc(): DriverDescription =
+ new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
+
+ def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
+ createDriverDesc(), new Date())
+
+ def createWorkerInfo(): WorkerInfo = {
+ val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
+ workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
+ workerInfo
+ }
+
+ def createExecutorRunner(execId: Int): ExecutorRunner = {
+ new ExecutorRunner(
+ "appId",
+ execId,
+ createAppDesc(),
+ 4,
+ 1234,
+ null,
+ "workerId",
+ "host",
+ 123,
+ "publicAddress",
+ new File("sparkHome"),
+ new File("workDir"),
+ "akka://worker",
+ new SparkConf,
+ Seq("localDir"),
+ ExecutorState.RUNNING)
+ }
+
+ def createDriverRunner(driverId: String): DriverRunner = {
+ val conf = new SparkConf()
+ new DriverRunner(
+ conf,
+ driverId,
+ new File("workDir"),
+ new File("sparkHome"),
+ createDriverDesc(),
+ null,
+ "akka://worker",
+ new SecurityManager(conf))
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 08529e0ef2..0a9f128a3a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.deploy
-import java.io.File
import java.util.Date
import com.fasterxml.jackson.core.JsonParseException
@@ -25,12 +24,14 @@ import org.json4s._
import org.json4s.jackson.JsonMethods
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, RecoveryState, WorkerInfo}
-import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
-import org.apache.spark.{JsonTestUtils, SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState}
+import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.{JsonTestUtils, SparkFunSuite}
class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
+ import org.apache.spark.deploy.DeployTestUtils._
+
test("writeApplicationInfo") {
val output = JsonProtocol.writeApplicationInfo(createAppInfo())
assertValidJson(output)
@@ -50,7 +51,7 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
}
test("writeExecutorRunner") {
- val output = JsonProtocol.writeExecutorRunner(createExecutorRunner())
+ val output = JsonProtocol.writeExecutorRunner(createExecutorRunner(123))
assertValidJson(output)
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr))
}
@@ -77,9 +78,10 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
test("writeWorkerState") {
val executors = List[ExecutorRunner]()
- val finishedExecutors = List[ExecutorRunner](createExecutorRunner(), createExecutorRunner())
- val drivers = List(createDriverRunner())
- val finishedDrivers = List(createDriverRunner(), createDriverRunner())
+ val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123),
+ createExecutorRunner(123))
+ val drivers = List(createDriverRunner("driverId"))
+ val finishedDrivers = List(createDriverRunner("driverId"), createDriverRunner("driverId"))
val stateResponse = new WorkerStateResponse("host", 8080, "workerId", executors,
finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl")
val output = JsonProtocol.writeWorkerState(stateResponse)
@@ -87,47 +89,6 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr))
}
- def createAppDesc(): ApplicationDescription = {
- val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq(), Seq())
- new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl")
- }
-
- def createAppInfo() : ApplicationInfo = {
- val appInfo = new ApplicationInfo(JsonConstants.appInfoStartTime,
- "id", createAppDesc(), JsonConstants.submitDate, null, Int.MaxValue)
- appInfo.endTime = JsonConstants.currTimeInMillis
- appInfo
- }
-
- def createDriverCommand(): Command = new Command(
- "org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
- Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
- )
-
- def createDriverDesc(): DriverDescription =
- new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())
-
- def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
- createDriverDesc(), new Date())
-
- def createWorkerInfo(): WorkerInfo = {
- val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress")
- workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
- workerInfo
- }
-
- def createExecutorRunner(): ExecutorRunner = {
- new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
- "publicAddress", new File("sparkHome"), new File("workDir"), "akka://worker",
- new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
- }
-
- def createDriverRunner(): DriverRunner = {
- val conf = new SparkConf()
- new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
- createDriverDesc(), null, "akka://worker", new SecurityManager(conf))
- }
-
def assertValidJson(json: JValue) {
try {
JsonMethods.parse(JsonMethods.compact(json))
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 0f4d3b28d0..faed4bdc68 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,13 +17,18 @@
package org.apache.spark.deploy.worker
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.Command
-
import org.scalatest.Matchers
+import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
+import org.apache.spark.deploy.master.DriverState
+import org.apache.spark.deploy.{Command, ExecutorState}
+import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+
class WorkerSuite extends SparkFunSuite with Matchers {
+ import org.apache.spark.deploy.DeployTestUtils._
+
def cmd(javaOpts: String*): Command = {
Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*))
}
@@ -56,4 +61,126 @@ class WorkerSuite extends SparkFunSuite with Matchers {
"-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=y", "-Dspark.ssl.opt2=z")
}
+
+ test("test clearing of finishedExecutors (small number of executors)") {
+ val conf = new SparkConf()
+ conf.set("spark.worker.ui.retainedExecutors", 2.toString)
+ val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+ val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
+ "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ // initialize workers
+ for (i <- 0 until 5) {
+ worker.executors += s"app1/$i" -> createExecutorRunner(i)
+ }
+ // initialize ExecutorStateChanged Message
+ worker.handleExecutorStateChanged(
+ ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
+ assert(worker.finishedExecutors.size === 1)
+ assert(worker.executors.size === 4)
+ for (i <- 1 until 5) {
+ worker.handleExecutorStateChanged(
+ ExecutorStateChanged("app1", i, ExecutorState.EXITED, None, None))
+ assert(worker.finishedExecutors.size === 2)
+ if (i > 1) {
+ assert(!worker.finishedExecutors.contains(s"app1/${i - 2}"))
+ }
+ assert(worker.executors.size === 4 - i)
+ }
+ }
+
+ test("test clearing of finishedExecutors (more executors)") {
+ val conf = new SparkConf()
+ conf.set("spark.worker.ui.retainedExecutors", 30.toString)
+ val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+ val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
+ "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ // initialize workers
+ for (i <- 0 until 50) {
+ worker.executors += s"app1/$i" -> createExecutorRunner(i)
+ }
+ // initialize ExecutorStateChanged Message
+ worker.handleExecutorStateChanged(
+ ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
+ assert(worker.finishedExecutors.size === 1)
+ assert(worker.executors.size === 49)
+ for (i <- 1 until 50) {
+ val expectedValue = {
+ if (worker.finishedExecutors.size < 30) {
+ worker.finishedExecutors.size + 1
+ } else {
+ 28
+ }
+ }
+ worker.handleExecutorStateChanged(
+ ExecutorStateChanged("app1", i, ExecutorState.EXITED, None, None))
+ if (expectedValue == 28) {
+ for (j <- i - 30 until i - 27) {
+ assert(!worker.finishedExecutors.contains(s"app1/$j"))
+ }
+ }
+ assert(worker.executors.size === 49 - i)
+ assert(worker.finishedExecutors.size === expectedValue)
+ }
+ }
+
+ test("test clearing of finishedDrivers (small number of drivers)") {
+ val conf = new SparkConf()
+ conf.set("spark.worker.ui.retainedDrivers", 2.toString)
+ val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+ val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
+ "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ // initialize workers
+ for (i <- 0 until 5) {
+ val driverId = s"driverId-$i"
+ worker.drivers += driverId -> createDriverRunner(driverId)
+ }
+ // initialize DriverStateChanged Message
+ worker.handleDriverStateChanged(DriverStateChanged("driverId-0", DriverState.FINISHED, None))
+ assert(worker.drivers.size === 4)
+ assert(worker.finishedDrivers.size === 1)
+ for (i <- 1 until 5) {
+ val driverId = s"driverId-$i"
+ worker.handleDriverStateChanged(DriverStateChanged(driverId, DriverState.FINISHED, None))
+ if (i > 1) {
+ assert(!worker.finishedDrivers.contains(s"driverId-${i - 2}"))
+ }
+ assert(worker.drivers.size === 4 - i)
+ assert(worker.finishedDrivers.size === 2)
+ }
+ }
+
+ test("test clearing of finishedDrivers (more drivers)") {
+ val conf = new SparkConf()
+ conf.set("spark.worker.ui.retainedDrivers", 30.toString)
+ val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
+ val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
+ "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ // initialize workers
+ for (i <- 0 until 50) {
+ val driverId = s"driverId-$i"
+ worker.drivers += driverId -> createDriverRunner(driverId)
+ }
+ // initialize DriverStateChanged Message
+ worker.handleDriverStateChanged(DriverStateChanged("driverId-0", DriverState.FINISHED, None))
+ assert(worker.finishedDrivers.size === 1)
+ assert(worker.drivers.size === 49)
+ for (i <- 1 until 50) {
+ val expectedValue = {
+ if (worker.finishedDrivers.size < 30) {
+ worker.finishedDrivers.size + 1
+ } else {
+ 28
+ }
+ }
+ val driverId = s"driverId-$i"
+ worker.handleDriverStateChanged(DriverStateChanged(driverId, DriverState.FINISHED, None))
+ if (expectedValue == 28) {
+ for (j <- i - 30 until i - 27) {
+ assert(!worker.finishedDrivers.contains(s"driverId-$j"))
+ }
+ }
+ assert(worker.drivers.size === 49 - i)
+ assert(worker.finishedDrivers.size === expectedValue)
+ }
+ }
}
diff --git a/docs/configuration.md b/docs/configuration.md
index fd236137cb..24b606356a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -557,6 +557,20 @@ Apart from these, the following properties are also available, and may be useful
collecting.
</td>
</tr>
+<tr>
+ <td><code>spark.worker.ui.retainedExecutors</code></td>
+ <td>1000</td>
+ <td>
+ How many finished executors the Spark UI and status APIs remember before garbage collecting.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.worker.ui.retainedDrivers</code></td>
+ <td>1000</td>
+ <td>
+ How many finished drivers the Spark UI and status APIs remember before garbage collecting.
+ </td>
+</tr>
</table>
#### Compression and Serialization