aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java11
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala49
2 files changed, 33 insertions, 27 deletions
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 43c8df721d..ea726e3c82 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -105,7 +105,8 @@ public class YarnShuffleService extends AuxiliaryService {
// An entity that manages the shuffle secret per application
// This is used only if authentication is enabled
- private ShuffleSecretManager secretManager;
+ @VisibleForTesting
+ ShuffleSecretManager secretManager;
// The actual server that serves shuffle files
private TransportServer shuffleServer = null;
@@ -197,7 +198,7 @@ public class YarnShuffleService extends AuxiliaryService {
private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager();
secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
-
+
// Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf);
fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700));
@@ -306,7 +307,7 @@ public class YarnShuffleService extends AuxiliaryService {
}
if (db != null) {
db.close();
- }
+ }
} catch (Exception e) {
logger.error("Exception when stopping service", e);
}
@@ -329,7 +330,7 @@ public class YarnShuffleService extends AuxiliaryService {
/**
* Get the path specific to this auxiliary service to use for recovery.
- */
+ */
protected Path getRecoveryPath(String fileName) {
return _recoveryPath;
}
@@ -345,7 +346,7 @@ public class YarnShuffleService extends AuxiliaryService {
if (recoveryFile.exists()) {
return recoveryFile;
}
- }
+ }
// db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index c86bf7f70c..a58784f596 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.network.yarn
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermission._
import java.util.EnumSet
@@ -40,15 +41,17 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.util.Utils
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
- private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
+ private[yarn] var yarnConfig: YarnConfiguration = null
private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
override def beforeEach(): Unit = {
super.beforeEach()
+ yarnConfig = new YarnConfiguration()
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)
+ yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
val localDir = Utils.createTempDir()
yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
}
@@ -82,12 +85,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s1.initializeApplication(app2Data)
val execStateFile = s1.registeredExecutorFile
@@ -160,12 +161,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val secretsFile = s1.secretsFile
secretsFile should be (null)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s1.initializeApplication(app2Data)
val execStateFile = s1.registeredExecutorFile
@@ -193,8 +192,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s1 = new YarnShuffleService
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val execStateFile = s1.registeredExecutorFile
@@ -227,8 +225,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.initializeApplication(app1Data)
// however, when we initialize a totally new app2, everything is still happy
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s2.initializeApplication(app2Data)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
@@ -278,14 +275,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
- val app1Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app1Id, null)
+ val app1Data = makeAppInfo("user", app1Id)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
- val app2Data: ApplicationInitializationContext =
- new ApplicationInitializationContext("user", app2Id, null)
+ val app2Data = makeAppInfo("user", app2Id)
s1.initializeApplication(app2Data)
+ assert(s1.secretManager.getSecretKey(app1Id.toString()) != null)
+ assert(s1.secretManager.getSecretKey(app2Id.toString()) != null)
+
val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
val secretsFile = s1.secretsFile
@@ -315,6 +313,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.setRecoveryPath(recoveryPath)
s2.init(yarnConfig)
+ // Ensure that s2 has loaded known apps from the secrets db.
+ assert(s2.secretManager.getSecretKey(app1Id.toString()) != null)
+ assert(s2.secretManager.getSecretKey(app2Id.toString()) != null)
+
val execStateFile2 = s2.registeredExecutorFile
val secretsFile2 = s2.secretsFile
@@ -342,19 +344,17 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
}
test("service throws error if cannot start") {
- // Create a different config with a read-only local dir.
- val roConfig = new YarnConfiguration(yarnConfig)
+ // Set up a read-only local dir.
val roDir = Utils.createTempDir()
Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE))
- roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
- roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
+ yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath())
// Try to start the shuffle service, it should fail.
val service = new YarnShuffleService()
try {
val error = intercept[ServiceStateException] {
- service.init(roConfig)
+ service.init(yarnConfig)
}
assert(error.getCause().isInstanceOf[IOException])
} finally {
@@ -364,4 +364,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
}
}
+ private def makeAppInfo(user: String, appId: ApplicationId): ApplicationInitializationContext = {
+ val secret = ByteBuffer.wrap(new Array[Byte](0))
+ new ApplicationInitializationContext(user, appId, secret)
+ }
+
}