aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-09-29 11:53:28 -0700
committerAndrew Or <andrew@databricks.com>2015-09-29 11:53:28 -0700
commitdba95ea03216e6b8e623db4a36e1018c6ed95538 (patch)
treea73afd6c4439b399f8ec13e07b94b183a3e8ec77
parent9b9fe5f7bf55257269d8febcd64e95677075dfb6 (diff)
downloadspark-dba95ea03216e6b8e623db4a36e1018c6ed95538.tar.gz
spark-dba95ea03216e6b8e623db4a36e1018c6ed95538.tar.bz2
spark-dba95ea03216e6b8e623db4a36e1018c6ed95538.zip
[SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite
Fix the following issues in StandaloneDynamicAllocationSuite: 1. It should not assume master and workers start in order 2. It should not assume master and workers get ready at once 3. It should not assume the application is already registered with master after creating SparkContext 4. It should not access Master.app and idToApp which are not thread safe The changes includes: * Use `eventually` to wait until master and workers are ready to fix 1 and 2 * Use `eventually` to wait until the application is registered with master to fix 3 * Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4 Author: zsxwing <zsxwing@gmail.com> Closes #8914 from zsxwing/fix-StandaloneDynamicAllocationSuite.
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala305
1 files changed, 192 insertions, 113 deletions
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 1f2a0f0d30..2e2fa22eb4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -17,10 +17,15 @@
package org.apache.spark.deploy
+import scala.concurrent.duration._
+
import org.mockito.Mockito.{mock, when}
import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually._
import org.apache.spark._
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
@@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite
}
master = makeMaster()
workers = makeWorkers(10, 2048)
+ // Wait until all workers register with master successfully
+ eventually(timeout(60.seconds), interval(10.millis)) {
+ assert(getMasterState.workers.size === numWorkers)
+ }
}
override def afterAll(): Unit = {
@@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite
test("dynamic allocation default behavior") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
- assert(master.apps.size === 1)
- assert(master.apps.head.id === appId)
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
// kill all executors
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ var apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.getExecutorLimit === 1)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 1)
// request 1 more
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.getExecutorLimit === 2)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === 2)
// request 1 more; this one won't go through
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.getExecutorLimit === 3)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 2 = 1 executor
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.getExecutorLimit === 1)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 1)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.getExecutorLimit === 1000)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with max cores <= cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "8"))
val appId = sc.applicationId
- assert(master.apps.size === 1)
- assert(master.apps.head.id === appId)
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
- assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
// kill all executors
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ var apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.executors.values.head.cores === 8)
- assert(master.apps.head.getExecutorLimit === 1)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.executors.values.head.cores === 8)
+ assert(apps.head.getExecutorLimit === 1)
// request 1 more; this one won't go through because we're already at max cores.
// This highlights a limitation of using dynamic allocation with max cores WITHOUT
// setting cores per executor: once an application scales down and then scales back
// up, its executors may not be spread out anymore!
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.getExecutorLimit === 2)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 2)
// request 1 more; this one also won't go through for the same reason
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.getExecutorLimit === 3)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 1 = 2 executor
// Note: we scheduled these executors together, so their cores should be evenly distributed
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
- assert(master.apps.head.getExecutorLimit === 2)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+ assert(apps.head.getExecutorLimit === 2)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
- assert(master.apps.head.getExecutorLimit === 1000)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+ assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with max cores > cores per worker") {
sc = new SparkContext(appConf.set("spark.cores.max", "16"))
val appId = sc.applicationId
- assert(master.apps.size === 1)
- assert(master.apps.head.id === appId)
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
- assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
// kill all executors
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ var apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.executors.values.head.cores === 10)
- assert(master.apps.head.getExecutorLimit === 1)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.executors.values.head.cores === 10)
+ assert(apps.head.getExecutorLimit === 1)
// request 1 more
// Note: the cores are not evenly distributed because we scheduled these executors 1 by 1
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
- assert(master.apps.head.getExecutorLimit === 2)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
+ assert(apps.head.getExecutorLimit === 2)
// request 1 more; this one won't go through
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.getExecutorLimit === 3)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === 3)
// kill all existing executors; we should end up with 3 - 2 = 1 executor
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.executors.values.head.cores === 10)
- assert(master.apps.head.getExecutorLimit === 1)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.executors.values.head.cores === 10)
+ assert(apps.head.getExecutorLimit === 1)
// kill all executors again; this time we'll have 1 - 1 = 0 executors left
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
- assert(master.apps.head.getExecutorLimit === 1000)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+ assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with cores per executor") {
sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
val appId = sc.applicationId
- assert(master.apps.size === 1)
- assert(master.apps.head.id === appId)
- assert(master.apps.head.executors.size === 10) // 20 cores total
- assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 10) // 20 cores total
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
// kill all executors
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ var apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.getExecutorLimit === 1)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 1)
// request 3 more
assert(sc.requestExecutors(3))
- assert(master.apps.head.executors.size === 4)
- assert(master.apps.head.getExecutorLimit === 4)
+ apps = getApplications()
+ assert(apps.head.executors.size === 4)
+ assert(apps.head.getExecutorLimit === 4)
// request 10 more; only 6 will go through
assert(sc.requestExecutors(10))
- assert(master.apps.head.executors.size === 10)
- assert(master.apps.head.getExecutorLimit === 14)
+ apps = getApplications()
+ assert(apps.head.executors.size === 10)
+ assert(apps.head.getExecutorLimit === 14)
// kill 2 executors; we should get 2 back immediately
assert(killNExecutors(sc, 2))
- assert(master.apps.head.executors.size === 10)
- assert(master.apps.head.getExecutorLimit === 12)
+ apps = getApplications()
+ assert(apps.head.executors.size === 10)
+ assert(apps.head.getExecutorLimit === 12)
// kill 4 executors; we should end up with 12 - 4 = 8 executors
assert(killNExecutors(sc, 4))
- assert(master.apps.head.executors.size === 8)
- assert(master.apps.head.getExecutorLimit === 8)
+ apps = getApplications()
+ assert(apps.head.executors.size === 8)
+ assert(apps.head.getExecutorLimit === 8)
// kill all executors; this time we'll have 8 - 8 = 0 executors left
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
- assert(master.apps.head.executors.size === 10)
- assert(master.apps.head.getExecutorLimit === 1000)
+ apps = getApplications()
+ assert(apps.head.executors.size === 10)
+ assert(apps.head.getExecutorLimit === 1000)
}
test("dynamic allocation with cores per executor AND max cores") {
@@ -241,55 +291,70 @@ class StandaloneDynamicAllocationSuite
.set("spark.executor.cores", "2")
.set("spark.cores.max", "8"))
val appId = sc.applicationId
- assert(master.apps.size === 1)
- assert(master.apps.head.id === appId)
- assert(master.apps.head.executors.size === 4) // 8 cores total
- assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 4) // 8 cores total
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
// kill all executors
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ var apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request 1
assert(sc.requestExecutors(1))
- assert(master.apps.head.executors.size === 1)
- assert(master.apps.head.getExecutorLimit === 1)
+ apps = getApplications()
+ assert(apps.head.executors.size === 1)
+ assert(apps.head.getExecutorLimit === 1)
// request 3 more
assert(sc.requestExecutors(3))
- assert(master.apps.head.executors.size === 4)
- assert(master.apps.head.getExecutorLimit === 4)
+ apps = getApplications()
+ assert(apps.head.executors.size === 4)
+ assert(apps.head.getExecutorLimit === 4)
// request 10 more; none will go through
assert(sc.requestExecutors(10))
- assert(master.apps.head.executors.size === 4)
- assert(master.apps.head.getExecutorLimit === 14)
+ apps = getApplications()
+ assert(apps.head.executors.size === 4)
+ assert(apps.head.getExecutorLimit === 14)
// kill all executors; 4 executors will be launched immediately
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 4)
- assert(master.apps.head.getExecutorLimit === 10)
+ apps = getApplications()
+ assert(apps.head.executors.size === 4)
+ assert(apps.head.getExecutorLimit === 10)
// ... and again
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 4)
- assert(master.apps.head.getExecutorLimit === 6)
+ apps = getApplications()
+ assert(apps.head.executors.size === 4)
+ assert(apps.head.getExecutorLimit === 6)
// ... and again; now we end up with 6 - 4 = 2 executors left
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.getExecutorLimit === 2)
+ apps = getApplications()
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === 2)
// ... and again; this time we have 2 - 2 = 0 executors left
assert(killAllExecutors(sc))
- assert(master.apps.head.executors.size === 0)
- assert(master.apps.head.getExecutorLimit === 0)
+ apps = getApplications()
+ assert(apps.head.executors.size === 0)
+ assert(apps.head.getExecutorLimit === 0)
// request many more; this increases the limit well beyond the cluster capacity
assert(sc.requestExecutors(1000))
- assert(master.apps.head.executors.size === 4)
- assert(master.apps.head.getExecutorLimit === 1000)
+ apps = getApplications()
+ assert(apps.head.executors.size === 4)
+ assert(apps.head.getExecutorLimit === 1000)
}
test("kill the same executor twice (SPARK-9795)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
- assert(master.apps.size === 1)
- assert(master.apps.head.id === appId)
- assert(master.apps.head.executors.size === 2)
- assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ val apps = getApplications()
+ assert(apps.size === 1)
+ assert(apps.head.id === appId)
+ assert(apps.head.executors.size === 2)
+ assert(apps.head.getExecutorLimit === Int.MaxValue)
+ }
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
@@ -298,9 +363,10 @@ class StandaloneDynamicAllocationSuite
assert(executors.size === 2)
assert(sc.killExecutor(executors.head))
assert(sc.killExecutor(executors.head))
- assert(master.apps.head.executors.size === 1)
+ val apps = getApplications()
+ assert(apps.head.executors.size === 1)
// The limit should not be lowered twice
- assert(master.apps.head.getExecutorLimit === 1)
+ assert(apps.head.getExecutorLimit === 1)
}
// ===============================
@@ -333,6 +399,16 @@ class StandaloneDynamicAllocationSuite
}
}
+ /** Get the Master state */
+ private def getMasterState: MasterStateResponse = {
+ master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+ }
+
+ /** Get the applictions that are active from Master */
+ private def getApplications(): Seq[ApplicationInfo] = {
+ getMasterState.activeApps
+ }
+
/** Kill all executors belonging to this application. */
private def killAllExecutors(sc: SparkContext): Boolean = {
killNExecutors(sc, Int.MaxValue)
@@ -352,8 +428,11 @@ class StandaloneDynamicAllocationSuite
* don't wait for executors to register. Otherwise the tests will take much longer to run.
*/
private def getExecutorIds(sc: SparkContext): Seq[String] = {
- assert(master.idToApp.contains(sc.applicationId))
- master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq
+ val app = getApplications().find(_.id == sc.applicationId)
+ assert(app.isDefined)
+ // Although executors is transient, master is in the same process so the message won't be
+ // serialized and it's safe here.
+ app.get.executors.keys.map(_.toString).toSeq
}
/**