aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
committerSean Owen <sowen@cloudera.com>2015-08-25 12:33:13 +0100
commit69c9c177160e32a2fbc9b36ecc52156077fca6fc (patch)
tree57345aaf19c3149038bfca5c4ddccf33d41bdd5b /yarn
parent7f1e507bf7e82bff323c5dec3c1ee044687c4173 (diff)
downloadspark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.gz
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.tar.bz2
spark-69c9c177160e32a2fbc9b36ecc52156077fca6fc.zip
[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen <sowen@cloudera.com> Closes #8033 from srowen/SPARK-9613.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala24
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala19
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala8
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala8
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala5
7 files changed, 41 insertions, 42 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index bff585b46c..e9a02baafd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -25,7 +25,7 @@ import java.security.PrivilegedExceptionAction
import java.util.{Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
import scala.reflect.runtime.universe
import scala.util.{Try, Success, Failure}
@@ -511,7 +511,7 @@ private[spark] class Client(
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
- val t = creds.getAllTokens
+ val t = creds.getAllTokens.asScala
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.head
val newExpiration = t.renew(hadoopConf)
@@ -650,8 +650,8 @@ private[spark] class Client(
distCacheMgr.setDistArchivesEnv(launchEnv)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
- amContainer.setLocalResources(localResources)
- amContainer.setEnvironment(launchEnv)
+ amContainer.setLocalResources(localResources.asJava)
+ amContainer.setEnvironment(launchEnv.asJava)
val javaOpts = ListBuffer[String]()
@@ -782,7 +782,7 @@ private[spark] class Client(
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
- amContainer.setCommands(printableCommands)
+ amContainer.setCommands(printableCommands.asJava)
logDebug("===============================================================================")
logDebug("YARN AM launch context:")
@@ -797,7 +797,8 @@ private[spark] class Client(
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
- amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
+ amContainer.setApplicationACLs(
+ YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
setupSecurityToken(amContainer)
UserGroupInformation.getCurrentUser().addCredentials(credentials)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 4cc50483a1..9abd09b3cc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -20,14 +20,13 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
import java.nio.ByteBuffer
+import java.util.Collections
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.spark.util.Utils
-
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
@@ -40,6 +39,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
class ExecutorRunnable(
container: Container,
@@ -74,9 +74,9 @@ class ExecutorRunnable(
.asInstanceOf[ContainerLaunchContext]
val localResources = prepareLocalResources
- ctx.setLocalResources(localResources)
+ ctx.setLocalResources(localResources.asJava)
- ctx.setEnvironment(env)
+ ctx.setEnvironment(env.asJava)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
@@ -96,8 +96,9 @@ class ExecutorRunnable(
|===============================================================================
""".stripMargin)
- ctx.setCommands(commands)
- ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+ ctx.setCommands(commands.asJava)
+ ctx.setApplicationACLs(
+ YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
// If external shuffle service is enabled, register with the Yarn shuffle service already
// started on the NodeManager and, if authentication is enabled, provide it with our secret
@@ -112,7 +113,7 @@ class ExecutorRunnable(
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
- ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
+ ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
}
// Send the start request to the ContainerManager
@@ -314,7 +315,8 @@ class ExecutorRunnable(
env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
}
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
+ System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
+ .foreach { case (k, v) => env(k) = v }
env
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ccf753e69f..5f897cbcb4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,9 +21,7 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
-import org.apache.spark.util.Utils
-
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -39,8 +37,8 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.util.Utils
/**
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
@@ -164,7 +162,7 @@ private[yarn] class YarnAllocator(
* Number of container requests at the given location that have not yet been fulfilled.
*/
private def getNumPendingAtLocation(location: String): Int =
- amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum
+ amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala.map(_.size).sum
/**
* Request as many executors from the ResourceManager as needed to reach the desired total. If
@@ -231,14 +229,14 @@ private[yarn] class YarnAllocator(
numExecutorsRunning,
allocateResponse.getAvailableResources))
- handleAllocatedContainers(allocatedContainers)
+ handleAllocatedContainers(allocatedContainers.asScala)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
- processCompletedContainers(completedContainers)
+ processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
@@ -271,7 +269,7 @@ private[yarn] class YarnAllocator(
val request = createContainerRequest(resource, locality.nodes, locality.racks)
amClient.addContainerRequest(request)
val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
logInfo(s"Container request (host: $hostStr, capability: $resource)")
}
} else if (missing < 0) {
@@ -280,7 +278,8 @@ private[yarn] class YarnAllocator(
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
- matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest)
+ matchingRequests.iterator().next().asScala
+ .take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
@@ -459,7 +458,7 @@ private[yarn] class YarnAllocator(
}
}
- if (allocatedContainerToHostMap.containsKey(containerId)) {
+ if (allocatedContainerToHostMap.contains(containerId)) {
val host = allocatedContainerToHostMap.get(containerId).get
val containerSet = allocatedHostToContainersMap.get(host).get
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 4999f9c062..df042bf291 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -19,17 +19,15 @@ package org.apache.spark.deploy.yarn
import java.util.{List => JList}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.{Map, Set}
import scala.util.Try
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -108,8 +106,8 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
classOf[Configuration])
val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
- val hosts = proxies.map { proxy => proxy.split(":")(0) }
- val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+ val hosts = proxies.asScala.map { proxy => proxy.split(":")(0) }
+ val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
} catch {
case e: NoSuchMethodException =>
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 128e996b71..b4f8049bff 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -21,7 +21,7 @@ import java.io.{File, FileOutputStream, OutputStreamWriter}
import java.util.Properties
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
@@ -132,7 +132,7 @@ abstract class BaseYarnClusterSuite
props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
- yarnCluster.getConfig().foreach { e =>
+ yarnCluster.getConfig.asScala.foreach { e =>
props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
}
@@ -149,7 +149,7 @@ abstract class BaseYarnClusterSuite
props.store(writer, "Spark properties.")
writer.close()
- val extraJarArgs = if (!extraJars.isEmpty()) Seq("--jars", extraJars.mkString(",")) else Nil
+ val extraJarArgs = if (extraJars.nonEmpty) Seq("--jars", extraJars.mkString(",")) else Nil
val mainArgs =
if (klass.endsWith(".py")) {
Seq(klass)
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 0a5402c89e..e7f2501e78 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap => MutableHashMap }
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap => MutableHashMap}
import scala.reflect.ClassTag
import scala.util.Try
@@ -38,7 +38,7 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
-import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Utils
class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
@@ -201,7 +201,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>
val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
tags should contain allOf ("tag1", "dup", "tag2", "multi word")
- tags.filter(!_.isEmpty).size should be (4)
+ tags.asScala.filter(_.nonEmpty).size should be (4)
}
appContext.getMaxAppAttempts should be (42)
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 128350b648..5a4ea2ea2f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.net.URL
import scala.collection.mutable
-import scala.collection.JavaConversions._
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
@@ -216,8 +215,8 @@ private object YarnClusterDriver extends Logging with Matchers {
assert(listener.driverLogs.nonEmpty)
val driverLogs = listener.driverLogs.get
assert(driverLogs.size === 2)
- assert(driverLogs.containsKey("stderr"))
- assert(driverLogs.containsKey("stdout"))
+ assert(driverLogs.contains("stderr"))
+ assert(driverLogs.contains("stdout"))
val urlStr = driverLogs("stderr")
// Ensure that this is a valid URL, else this will throw an exception
new URL(urlStr)