From 7aacb7bfad4ec73fd8f18555c72ef6962c14358f Mon Sep 17 00:00:00 2001 From: Li Zhihui Date: Fri, 24 Oct 2014 13:01:36 -0700 Subject: [SPARK-2713] Executors of same application in same host should only download files & jars once If Spark lunched multiple executors in one host for one application, every executor would download it dependent files and jars (if not using local: url) independently. It maybe result in huge latency. In my case, it result in 20 seconds latency to download dependent jars(size about 17M) when I lunched 32 executors in every host(total 4 hosts). This patch will cache downloaded files and jars for executors to reduce network throughput and download latency. In my case, the latency was reduced from 20 seconds to less than 1 second. Author: Li Zhihui Author: li-zhihui Closes #1616 from li-zhihui/cachefiles and squashes the following commits: 36940df [Li Zhihui] Close cache for local mode 935fed6 [Li Zhihui] Clean code. f9330d4 [Li Zhihui] Clean code again 7050d46 [Li Zhihui] Clean code 074a422 [Li Zhihui] Fix: deal with spark.files.overwrite 03ed3a8 [li-zhihui] rename cache file name as XXXXXXXXX_cache 2766055 [li-zhihui] Use url.hashCode + timestamp as cachedFileName 76a7b66 [Li Zhihui] Clean code & use applcation work directory as cache directory 3510eb0 [Li Zhihui] Keep fetchFile private 2ffd742 [Li Zhihui] add comment for FileLock e0ebd48 [Li Zhihui] Try and finally lock.release 7fb7c0b [Li Zhihui] Release lock before copy files 6b997bf [Li Zhihui] Executors of same application in same host should only download files & jars once --- .../main/scala/org/apache/spark/SparkContext.scala | 5 +- .../scala/org/apache/spark/executor/Executor.scala | 10 ++- .../main/scala/org/apache/spark/util/Utils.scala | 87 ++++++++++++++++++---- 3 files changed, 82 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ac7935b8c2..55602a9082 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -837,11 +837,12 @@ class SparkContext(config: SparkConf) extends Logging { case "local" => "file:" + uri.getPath case _ => path } - addedFiles(key) = System.currentTimeMillis + val timestamp = System.currentTimeMillis + addedFiles(key) = timestamp // Fetch the file locally in case a job is executed using DAGScheduler.runLocally(). Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - hadoopConfiguration) + hadoopConfiguration, timestamp, useCache = false) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 616c7e6a46..0b75b9b21f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -322,14 +322,16 @@ private[spark] class Executor( // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, - hadoopConf) + // Fetch file with useCache mode, close cache for local mode. + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, + env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentFiles(name) = timestamp } for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager, - hadoopConf) + // Fetch file with useCache mode, close cache for local mode. + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, + env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentJars(name) = timestamp // Add it to our class loader val localName = name.split("/").last diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0aeff6455b..ccbddd985a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -347,15 +347,84 @@ private[spark] object Utils extends Logging { } /** - * Download a file requested by the executor. Supports fetching the file in a variety of ways, + * Download a file to target directory. Supports fetching the file in a variety of ways, + * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. + * + * If `useCache` is true, first attempts to fetch the file to a local cache that's shared + * across executors running the same application. `useCache` is used mainly for + * the executors, and not in local mode. + * + * Throws SparkException if the target file already exists and has different contents than + * the requested file. + */ + def fetchFile( + url: String, + targetDir: File, + conf: SparkConf, + securityMgr: SecurityManager, + hadoopConf: Configuration, + timestamp: Long, + useCache: Boolean) { + val fileName = url.split("/").last + val targetFile = new File(targetDir, fileName) + if (useCache) { + val cachedFileName = s"${url.hashCode}${timestamp}_cache" + val lockFileName = s"${url.hashCode}${timestamp}_lock" + val localDir = new File(getLocalDir(conf)) + val lockFile = new File(localDir, lockFileName) + val raf = new RandomAccessFile(lockFile, "rw") + // Only one executor entry. + // The FileLock is only used to control synchronization for executors download file, + // it's always safe regardless of lock type (mandatory or advisory). + val lock = raf.getChannel().lock() + val cachedFile = new File(localDir, cachedFileName) + try { + if (!cachedFile.exists()) { + doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) + } + } finally { + lock.release() + } + if (targetFile.exists && !Files.equal(cachedFile, targetFile)) { + if (conf.getBoolean("spark.files.overwrite", false)) { + targetFile.delete() + logInfo((s"File $targetFile exists and does not match contents of $url, " + + s"replacing it with $url")) + } else { + throw new SparkException(s"File $targetFile exists and does not match contents of $url") + } + } + Files.copy(cachedFile, targetFile) + } else { + doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) + } + + // Decompress the file if it's a .tar or .tar.gz + if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { + logInfo("Untarring " + fileName) + Utils.execute(Seq("tar", "-xzf", fileName), targetDir) + } else if (fileName.endsWith(".tar")) { + logInfo("Untarring " + fileName) + Utils.execute(Seq("tar", "-xf", fileName), targetDir) + } + // Make the file executable - That's necessary for scripts + FileUtil.chmod(targetFile.getAbsolutePath, "a+x") + } + + /** + * Download a file to target directory. Supports fetching the file in a variety of ways, * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter. * * Throws SparkException if the target file already exists and has different contents than * the requested file. */ - def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, - hadoopConf: Configuration) { - val filename = url.split("/").last + private def doFetchFile( + url: String, + targetDir: File, + filename: String, + conf: SparkConf, + securityMgr: SecurityManager, + hadoopConf: Configuration) { val tempDir = getLocalDir(conf) val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir)) val targetFile = new File(targetDir, filename) @@ -443,16 +512,6 @@ private[spark] object Utils extends Logging { } Files.move(tempFile, targetFile) } - // Decompress the file if it's a .tar or .tar.gz - if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) { - logInfo("Untarring " + filename) - Utils.execute(Seq("tar", "-xzf", filename), targetDir) - } else if (filename.endsWith(".tar")) { - logInfo("Untarring " + filename) - Utils.execute(Seq("tar", "-xf", filename), targetDir) - } - // Make the file executable - That's necessary for scripts - FileUtil.chmod(targetFile.getAbsolutePath, "a+x") } /** -- cgit v1.2.3