From 079b3be81264b446f927739f26ed9f426611d83f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 21 Jan 2015 14:38:14 -0800 Subject: Make sure only owner can read / write to directories created for the job. Whenever a directory is created by the utility method, immediately restrict its permissions so that only the owner has access to its contents. Signed-off-by: Josh Rosen --- .../scala/org/apache/spark/HttpFileServer.scala | 2 +- .../src/main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../org/apache/spark/broadcast/HttpBroadcast.scala | 2 +- .../apache/spark/storage/DiskBlockManager.scala | 36 +++------- .../main/scala/org/apache/spark/util/Utils.scala | 77 +++++++++++++++------- 5 files changed, 66 insertions(+), 53 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index 677c5e0f89..3f33332a81 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -36,7 +36,7 @@ private[spark] class HttpFileServer( var serverUri : String = null def initialize() { - baseDir = Utils.createTempDir() + baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd") fileDir = new File(baseDir, "files") jarDir = new File(baseDir, "jars") fileDir.mkdir() diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 48a9d98e2e..e6ebbff087 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -339,7 +339,7 @@ object SparkEnv extends Logging { // this is a temporary directory; in distributed mode, this is the executor's current working // directory. val sparkFilesDir: String = if (isDriver) { - Utils.createTempDir().getAbsolutePath + Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath } else { "." } diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 31d6958c40..ea98051532 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -151,7 +151,7 @@ private[broadcast] object HttpBroadcast extends Logging { } private def createServer(conf: SparkConf) { - broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) + broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast") val broadcastPort = conf.getInt("spark.broadcast.port", 0) server = new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index d79ed76542..ffaac4b176 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -17,9 +17,8 @@ package org.apache.spark.storage -import java.io.File -import java.text.SimpleDateFormat -import java.util.{Date, Random, UUID} +import java.util.UUID +import java.io.{IOException, File} import org.apache.spark.{SparkConf, Logging} import org.apache.spark.executor.ExecutorExitCode @@ -37,7 +36,6 @@ import org.apache.spark.util.Utils private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf) extends Logging { - private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 private[spark] val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64) @@ -121,33 +119,15 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon } private def createLocalDirs(conf: SparkConf): Array[File] = { - val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir => - var foundLocalDir = false - var localDir: File = null - var localDirId: String = null - var tries = 0 - val rand = new Random() - while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) { - tries += 1 - try { - localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - localDir = new File(rootDir, s"spark-local-$localDirId") - if (!localDir.exists) { - foundLocalDir = localDir.mkdirs() - } - } catch { - case e: Exception => - logWarning(s"Attempt $tries to create local dir $localDir failed", e) - } - } - if (!foundLocalDir) { - logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." + - " Ignoring this directory.") - None - } else { + try { + val localDir = Utils.createDirectory(rootDir, "blockmgr") logInfo(s"Created local directory at $localDir") Some(localDir) + } catch { + case e: IOException => + logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) + None } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index cdb322de3b..8d230fff76 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,6 +60,8 @@ private[spark] object CallSite { private[spark] object Utils extends Logging { val random = new Random() + private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -246,13 +248,28 @@ private[spark] object Utils extends Logging { retval } + /** + * JDK equivalent of `chmod 700 file`. + * + * @param file the file whose permissions will be modified + * @return true if the permissions were successfully changed, false otherwise. + */ + def chmod700(file: File): Boolean = { + file.setReadable(false, false) && + file.setReadable(true, true) && + file.setWritable(false, false) && + file.setWritable(true, true) && + file.setExecutable(false, false) && + file.setExecutable(true, true) + } + /** * Create a directory inside the given parent directory. The directory is guaranteed to be * newly created, and is not marked for automatic deletion. */ - def createDirectory(root: String): File = { + def createDirectory(root: String, namePrefix: String = "spark"): File = { var attempts = 0 - val maxAttempts = 10 + val maxAttempts = MAX_DIR_CREATION_ATTEMPTS var dir: File = null while (dir == null) { attempts += 1 @@ -264,6 +281,11 @@ private[spark] object Utils extends Logging { dir = new File(root, "spark-" + UUID.randomUUID.toString) if (dir.exists() || !dir.mkdirs()) { dir = null + } else { + if (!chmod700(dir)) { + dir.delete() + dir = null + } } } catch { case e: SecurityException => dir = null; } } @@ -275,8 +297,10 @@ private[spark] object Utils extends Logging { * Create a temporary directory inside the given parent directory. The directory will be * automatically deleted when the VM shuts down. */ - def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = { - val dir = createDirectory(root) + def createTempDir( + root: String = System.getProperty("java.io.tmpdir"), + namePrefix: String = "spark"): File = { + val dir = createDirectory(root, namePrefix) registerShutdownDeleteDir(dir) dir } @@ -599,26 +623,35 @@ private[spark] object Utils extends Logging { * If no directories could be created, this will return an empty list. */ private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { - val confValue = if (isRunningInYarnContainer(conf)) { + if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it - // to what Yarn on this system said was available. - getYarnLocalDirs(conf) + // to what Yarn on this system said was available. Note this assumes that Yarn has + // created the directories already, and that they are secured so that only the + // user has access to them. + getYarnLocalDirs(conf).split(",") } else { - Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse( - conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) - } - val rootDirs = confValue.split(',') - logDebug(s"Getting/creating local root dirs at '$confValue'") - - rootDirs.flatMap { rootDir => - val localDir: File = new File(rootDir) - val foundLocalDir = localDir.exists || localDir.mkdirs() - if (!foundLocalDir) { - logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.") - None - } else { - Some(rootDir) - } + // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user + // configuration to point to a secure directory. So create a subdirectory with restricted + // permissions under each listed directory. + Option(conf.getenv("SPARK_LOCAL_DIRS")) + .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) + .split(",") + .flatMap { root => + try { + val rootDir = new File(root) + if (rootDir.exists || rootDir.mkdirs()) { + Some(createDirectory(root).getAbsolutePath()) + } else { + logError(s"Failed to create dir in $root. Ignoring this directory.") + None + } + } catch { + case e: IOException => + logError(s"Failed to create local root dir in $root. Ignoring this directory.") + None + } + } + .toArray } } -- cgit v1.2.3