aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Zhihui <zhihui.li@intel.com>2014-10-24 13:01:36 -0700
committerAndrew Or <andrew@databricks.com>2014-10-24 13:01:36 -0700
commit7aacb7bfad4ec73fd8f18555c72ef6962c14358f (patch)
tree27d2484547f3ae665baf6d2ce67829c54ff96b74
parent6a40a76848203d7266c134a26191579138c76903 (diff)
downloadspark-7aacb7bfad4ec73fd8f18555c72ef6962c14358f.tar.gz
spark-7aacb7bfad4ec73fd8f18555c72ef6962c14358f.tar.bz2
spark-7aacb7bfad4ec73fd8f18555c72ef6962c14358f.zip
[SPARK-2713] Executors of same application in same host should only download files & jars once
If Spark lunched multiple executors in one host for one application, every executor would download it dependent files and jars (if not using local: url) independently. It maybe result in huge latency. In my case, it result in 20 seconds latency to download dependent jars(size about 17M) when I lunched 32 executors in every host(total 4 hosts). This patch will cache downloaded files and jars for executors to reduce network throughput and download latency. In my case, the latency was reduced from 20 seconds to less than 1 second. Author: Li Zhihui <zhihui.li@intel.com> Author: li-zhihui <zhihui.li@intel.com> Closes #1616 from li-zhihui/cachefiles and squashes the following commits: 36940df [Li Zhihui] Close cache for local mode 935fed6 [Li Zhihui] Clean code. f9330d4 [Li Zhihui] Clean code again 7050d46 [Li Zhihui] Clean code 074a422 [Li Zhihui] Fix: deal with spark.files.overwrite 03ed3a8 [li-zhihui] rename cache file name as XXXXXXXXX_cache 2766055 [li-zhihui] Use url.hashCode + timestamp as cachedFileName 76a7b66 [Li Zhihui] Clean code & use applcation work directory as cache directory 3510eb0 [Li Zhihui] Keep fetchFile private 2ffd742 [Li Zhihui] add comment for FileLock e0ebd48 [Li Zhihui] Try and finally lock.release 7fb7c0b [Li Zhihui] Release lock before copy files 6b997bf [Li Zhihui] Executors of same application in same host should only download files & jars once
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala87
3 files changed, 82 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ac7935b8c2..55602a9082 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -837,11 +837,12 @@ class SparkContext(config: SparkConf) extends Logging {
case "local" => "file:" + uri.getPath
case _ => path
}
- addedFiles(key) = System.currentTimeMillis
+ val timestamp = System.currentTimeMillis
+ addedFiles(key) = timestamp
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
- hadoopConfiguration)
+ hadoopConfiguration, timestamp, useCache = false)
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 616c7e6a46..0b75b9b21f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -322,14 +322,16 @@ private[spark] class Executor(
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
- hadoopConf)
+ // Fetch file with useCache mode, close cache for local mode.
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+ env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
- hadoopConf)
+ // Fetch file with useCache mode, close cache for local mode.
+ Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
+ env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0aeff6455b..ccbddd985a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -347,15 +347,84 @@ private[spark] object Utils extends Logging {
}
/**
- * Download a file requested by the executor. Supports fetching the file in a variety of ways,
+ * Download a file to target directory. Supports fetching the file in a variety of ways,
+ * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
+ *
+ * If `useCache` is true, first attempts to fetch the file to a local cache that's shared
+ * across executors running the same application. `useCache` is used mainly for
+ * the executors, and not in local mode.
+ *
+ * Throws SparkException if the target file already exists and has different contents than
+ * the requested file.
+ */
+ def fetchFile(
+ url: String,
+ targetDir: File,
+ conf: SparkConf,
+ securityMgr: SecurityManager,
+ hadoopConf: Configuration,
+ timestamp: Long,
+ useCache: Boolean) {
+ val fileName = url.split("/").last
+ val targetFile = new File(targetDir, fileName)
+ if (useCache) {
+ val cachedFileName = s"${url.hashCode}${timestamp}_cache"
+ val lockFileName = s"${url.hashCode}${timestamp}_lock"
+ val localDir = new File(getLocalDir(conf))
+ val lockFile = new File(localDir, lockFileName)
+ val raf = new RandomAccessFile(lockFile, "rw")
+ // Only one executor entry.
+ // The FileLock is only used to control synchronization for executors download file,
+ // it's always safe regardless of lock type (mandatory or advisory).
+ val lock = raf.getChannel().lock()
+ val cachedFile = new File(localDir, cachedFileName)
+ try {
+ if (!cachedFile.exists()) {
+ doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
+ }
+ } finally {
+ lock.release()
+ }
+ if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
+ if (conf.getBoolean("spark.files.overwrite", false)) {
+ targetFile.delete()
+ logInfo((s"File $targetFile exists and does not match contents of $url, " +
+ s"replacing it with $url"))
+ } else {
+ throw new SparkException(s"File $targetFile exists and does not match contents of $url")
+ }
+ }
+ Files.copy(cachedFile, targetFile)
+ } else {
+ doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
+ }
+
+ // Decompress the file if it's a .tar or .tar.gz
+ if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
+ logInfo("Untarring " + fileName)
+ Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
+ } else if (fileName.endsWith(".tar")) {
+ logInfo("Untarring " + fileName)
+ Utils.execute(Seq("tar", "-xf", fileName), targetDir)
+ }
+ // Make the file executable - That's necessary for scripts
+ FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
+ }
+
+ /**
+ * Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
- def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
- hadoopConf: Configuration) {
- val filename = url.split("/").last
+ private def doFetchFile(
+ url: String,
+ targetDir: File,
+ filename: String,
+ conf: SparkConf,
+ securityMgr: SecurityManager,
+ hadoopConf: Configuration) {
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
@@ -443,16 +512,6 @@ private[spark] object Utils extends Logging {
}
Files.move(tempFile, targetFile)
}
- // Decompress the file if it's a .tar or .tar.gz
- if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
- logInfo("Untarring " + filename)
- Utils.execute(Seq("tar", "-xzf", filename), targetDir)
- } else if (filename.endsWith(".tar")) {
- logInfo("Untarring " + filename)
- Utils.execute(Seq("tar", "-xf", filename), targetDir)
- }
- // Make the file executable - That's necessary for scripts
- FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
}
/**