aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-31 00:47:56 -0700
committerReynold Xin <rxin@databricks.com>2015-05-31 00:47:56 -0700
commit564bc11e9827915c8652bc06f4bd591809dea4b1 (patch)
tree1a3fe6ff9f78bef3d991897effe554098018a1b4 /yarn
parent74fdc97c7206c6d715f128ef7c46055e0bb90760 (diff)
downloadspark-564bc11e9827915c8652bc06f4bd591809dea4b1.tar.gz
spark-564bc11e9827915c8652bc06f4bd591809dea4b1.tar.bz2
spark-564bc11e9827915c8652bc06f4bd591809dea4b1.zip
[SPARK-3850] Trim trailing spaces for examples/streaming/yarn.
Author: Reynold Xin <rxin@databricks.com> Closes #6530 from rxin/trim-whitespace-1 and squashes the following commits: 7b7b3a0 [Reynold Xin] Reset again. dc14597 [Reynold Xin] Reset scalastyle. cd556c4 [Reynold Xin] YARN, Kinesis, Flume. 4223fe1 [Reynold Xin] [SPARK-3850] Trim trailing spaces for examples/streaming.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala26
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala24
3 files changed, 27 insertions, 27 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 4ca6c903fc..3d3a966960 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -43,22 +43,22 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Add a resource to the list of distributed cache resources. This list can
* be sent to the ApplicationMaster and possibly the executors so that it can
* be downloaded into the Hadoop distributed cache for use by this application.
- * Adds the LocalResource to the localResources HashMap passed in and saves
+ * Adds the LocalResource to the localResources HashMap passed in and saves
* the stats of the resources to they can be sent to the executors and verified.
*
* @param fs FileSystem
* @param conf Configuration
* @param destPath path to the resource
* @param localResources localResource hashMap to insert the resource into
- * @param resourceType LocalResourceType
+ * @param resourceType LocalResourceType
* @param link link presented in the distributed cache to the destination
- * @param statCache cache to store the file/directory stats
+ * @param statCache cache to store the file/directory stats
* @param appMasterOnly Whether to only add the resource to the app master
*/
def addResource(
fs: FileSystem,
conf: Configuration,
- destPath: Path,
+ destPath: Path,
localResources: HashMap[String, LocalResource],
resourceType: LocalResourceType,
link: String,
@@ -74,15 +74,15 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
amJarRsrc.setSize(destStatus.getLen())
if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
localResources(link) = amJarRsrc
-
+
if (!appMasterOnly) {
val uri = destPath.toUri()
val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
if (resourceType == LocalResourceType.FILE) {
- distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
+ distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
} else {
- distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
+ distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(),
destStatus.getModificationTime().toString(), visibility.name())
}
}
@@ -96,11 +96,11 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
+ env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
+ env("SPARK_YARN_CACHE_FILES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
+ env("SPARK_YARN_CACHE_FILES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
}
}
@@ -113,11 +113,11 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
val (sizes, timeStamps, visibilities) = tupleValues.unzip3
if (keys.size > 0) {
env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
+ env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") =
timeStamps.reduceLeft[String] { (acc, n) => acc + "," + n }
env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") =
sizes.reduceLeft[String] { (acc, n) => acc + "," + n }
- env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
+ env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") =
visibilities.reduceLeft[String] { (acc, n) => acc + "," + n }
}
}
@@ -197,7 +197,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {
val stat = statCache.get(uri) match {
case Some(existstat) => existstat
- case None =>
+ case None =>
val newStat = fs.getFileStatus(new Path(uri))
statCache.put(uri, newStat)
newStat
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5e6531895c..68d01c17ef 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -144,9 +144,9 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
object YarnSparkHadoopUtil {
- // Additional memory overhead
+ // Additional memory overhead
// 10% was arrived at experimentally. In the interest of minimizing memory waste while covering
- // the common cases. Memory overhead tends to grow with container size.
+ // the common cases. Memory overhead tends to grow with container size.
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN = 384
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index 43a7334db8..804dfecde7 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -41,12 +41,12 @@ import org.apache.spark.SparkFunSuite
class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar {
class MockClientDistributedCacheManager extends ClientDistributedCacheManager {
- override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
+ override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
LocalResourceVisibility = {
LocalResourceVisibility.PRIVATE
}
}
-
+
test("test getFileStatus empty") {
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
@@ -61,7 +61,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
val distMgr = new ClientDistributedCacheManager()
val fs = mock[FileSystem]
val uri = new URI("/tmp/testing")
- val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
+ val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus())
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus)
@@ -78,7 +78,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
@@ -101,11 +101,11 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None)
// add another one and verify both there and order correct
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing2"))
val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
- distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
+ distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
statCache, false)
val resource2 = localResources("link2")
assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
@@ -117,7 +117,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
val env2 = new HashMap[String, String]()
distMgr.setDistFilesEnv(env2)
val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',')
- val files = env2("SPARK_YARN_CACHE_FILES").split(',')
+ val files = env2("SPARK_YARN_CACHE_FILES").split(',')
val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',')
val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',')
assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link")
@@ -141,7 +141,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())
intercept[Exception] {
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
statCache, false)
}
assert(localResources.get("link") === None)
@@ -155,11 +155,11 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, true)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
@@ -189,11 +189,11 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing")
val localResources = HashMap[String, LocalResource]()
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
+ val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner",
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)
- distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
+ distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)