aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-09-20 14:17:49 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-09-20 14:17:49 -0700
commit7e418e99cff4cf512ab2a9fa74221c4655048c8d (patch)
treef31bc7f44d3e7cefd9b97e0dac05a630ffdbde04 /yarn/src/test/scala/org
parent9ac68dbc5720026ea92acc61d295ca64d0d3d132 (diff)
downloadspark-7e418e99cff4cf512ab2a9fa74221c4655048c8d.tar.gz
spark-7e418e99cff4cf512ab2a9fa74221c4655048c8d.tar.bz2
spark-7e418e99cff4cf512ab2a9fa74221c4655048c8d.zip
[SPARK-17611][YARN][TEST] Make shuffle service test really test auth.
Currently, the code is just swallowing exceptions, and not really checking whether the auth information was being recorded properly. Fix both problems, and also avoid tests inadvertently affecting other tests by modifying the shared config variable (by making it not shared). Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #15161 from vanzin/SPARK-17611.
Diffstat (limited to 'yarn/src/test/scala/org')
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala49
1 files changed, 27 insertions, 22 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index c86bf7f70c..a58784f596 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.network.yarn
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission._
import java.util.EnumSet
@@ -40,15 +41,17 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.util.Utils
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
- private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
+ private[yarn] var yarnConfig: YarnConfiguration = null
private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
override def beforeEach(): Unit = {
super.beforeEach()
+ yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)
+ yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
val localDir = Utils.createTempDir()
yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
}
@@ -82,12 +85,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s1.initializeApplication(app2Data)
val execStateFile = s1.registeredExecutorFile
@@ -160,12 +161,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val secretsFile = s1.secretsFile
secretsFile should be (null)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s1.initializeApplication(app2Data)
val execStateFile = s1.registeredExecutorFile
@@ -193,8 +192,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s1 = new YarnShuffleService
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val execStateFile = s1.registeredExecutorFile
@@ -227,8 +225,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.initializeApplication(app1Data)
// however, when we initialize a totally new app2, everything is still happy
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s2.initializeApplication(app2Data)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
@@ -278,14 +275,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s1.initializeApplication(app2Data)
+ assert(s1.secretManager.getSecretKey(app1Id.toString()) != null)
+ assert(s1.secretManager.getSecretKey(app2Id.toString()) != null)
+
val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
val secretsFile = s1.secretsFile
@@ -315,6 +313,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.setRecoveryPath(recoveryPath)
s2.init(yarnConfig)
+ // Ensure that s2 has loaded known apps from the secrets db.
+ assert(s2.secretManager.getSecretKey(app1Id.toString()) != null)
+ assert(s2.secretManager.getSecretKey(app2Id.toString()) != null)
+
val execStateFile2 = s2.registeredExecutorFile
val secretsFile2 = s2.secretsFile
@@ -342,19 +344,17 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
}
test("service throws error if cannot start") {
- // Create a different config with a read-only local dir.
- val roConfig = new YarnConfiguration(yarnConfig)
+ // Set up a read-only local dir.
val roDir = Utils.createTempDir()
Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
- roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
- roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
+ yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
// Try to start the shuffle service, it should fail.
val service = new YarnShuffleService()
try {
val error = intercept[ServiceStateException] {
- service.init(roConfig)
+ service.init(yarnConfig)
}
assert(error.getCause().isInstanceOf[IOException])
} finally {
@@ -364,4 +364,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
}
}
+ private def makeAppInfo(user: String, appId: ApplicationId): ApplicationInitializationContext = {
+ val secret = ByteBuffer.wrap(new Array[Byte](0))
+ new ApplicationInitializationContext(user, appId, secret)
+ }
+
}