aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-01-21 14:38:14 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-21 14:38:14 -0800
commit079b3be81264b446f927739f26ed9f426611d83f (patch)
treead16ca85e879042aa5b07c24539d328349fddeeb /core
parentbb8bd11da51b3b4b59b921d9d2a550c78a865ee5 (diff)
downloadspark-079b3be81264b446f927739f26ed9f426611d83f.tar.gz
spark-079b3be81264b446f927739f26ed9f426611d83f.tar.bz2
spark-079b3be81264b446f927739f26ed9f426611d83f.zip
Make sure only owner can read / write to directories created for the job.
Whenever a directory is created by the utility method, immediately restrict its permissions so that only the owner has access to its contents. Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala77
5 files changed, 66 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 677c5e0f89..3f33332a81 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -36,7 +36,7 @@ private[spark] class HttpFileServer(
var serverUri : String = null
def initialize() {
- baseDir = Utils.createTempDir()
+ baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 48a9d98e2e..e6ebbff087 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -339,7 +339,7 @@ object SparkEnv extends Logging {
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
- Utils.createTempDir().getAbsolutePath
+ Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
} else {
"."
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 31d6958c40..ea98051532 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -151,7 +151,7 @@ private[broadcast] object HttpBroadcast extends Logging {
}
private def createServer(conf: SparkConf) {
- broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
+ broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast")
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server =
new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index d79ed76542..ffaac4b176 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -17,9 +17,8 @@
package org.apache.spark.storage
-import java.io.File
-import java.text.SimpleDateFormat
-import java.util.{Date, Random, UUID}
+import java.util.UUID
+import java.io.{IOException, File}
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.executor.ExecutorExitCode
@@ -37,7 +36,6 @@ import org.apache.spark.util.Utils
private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
extends Logging {
- private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
private[spark]
val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
@@ -121,33 +119,15 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private def createLocalDirs(conf: SparkConf): Array[File] = {
- val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
- var foundLocalDir = false
- var localDir: File = null
- var localDirId: String = null
- var tries = 0
- val rand = new Random()
- while (!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS) {
- tries += 1
- try {
- localDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
- localDir = new File(rootDir, s"spark-local-$localDirId")
- if (!localDir.exists) {
- foundLocalDir = localDir.mkdirs()
- }
- } catch {
- case e: Exception =>
- logWarning(s"Attempt $tries to create local dir $localDir failed", e)
- }
- }
- if (!foundLocalDir) {
- logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir." +
- " Ignoring this directory.")
- None
- } else {
+ try {
+ val localDir = Utils.createDirectory(rootDir, "blockmgr")
logInfo(s"Created local directory at $localDir")
Some(localDir)
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
+ None
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index cdb322de3b..8d230fff76 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -60,6 +60,8 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()
+ private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -247,12 +249,27 @@ private[spark] object Utils extends Logging {
}
/**
+ * JDK equivalent of `chmod 700 file`.
+ *
+ * @param file the file whose permissions will be modified
+ * @return true if the permissions were successfully changed, false otherwise.
+ */
+ def chmod700(file: File): Boolean = {
+ file.setReadable(false, false) &&
+ file.setReadable(true, true) &&
+ file.setWritable(false, false) &&
+ file.setWritable(true, true) &&
+ file.setExecutable(false, false) &&
+ file.setExecutable(true, true)
+ }
+
+ /**
* Create a directory inside the given parent directory. The directory is guaranteed to be
* newly created, and is not marked for automatic deletion.
*/
- def createDirectory(root: String): File = {
+ def createDirectory(root: String, namePrefix: String = "spark"): File = {
var attempts = 0
- val maxAttempts = 10
+ val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
var dir: File = null
while (dir == null) {
attempts += 1
@@ -264,6 +281,11 @@ private[spark] object Utils extends Logging {
dir = new File(root, "spark-" + UUID.randomUUID.toString)
if (dir.exists() || !dir.mkdirs()) {
dir = null
+ } else {
+ if (!chmod700(dir)) {
+ dir.delete()
+ dir = null
+ }
}
} catch { case e: SecurityException => dir = null; }
}
@@ -275,8 +297,10 @@ private[spark] object Utils extends Logging {
* Create a temporary directory inside the given parent directory. The directory will be
* automatically deleted when the VM shuts down.
*/
- def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
- val dir = createDirectory(root)
+ def createTempDir(
+ root: String = System.getProperty("java.io.tmpdir"),
+ namePrefix: String = "spark"): File = {
+ val dir = createDirectory(root, namePrefix)
registerShutdownDeleteDir(dir)
dir
}
@@ -599,26 +623,35 @@ private[spark] object Utils extends Logging {
* If no directories could be created, this will return an empty list.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
- val confValue = if (isRunningInYarnContainer(conf)) {
+ if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
- // to what Yarn on this system said was available.
- getYarnLocalDirs(conf)
+ // to what Yarn on this system said was available. Note this assumes that Yarn has
+ // created the directories already, and that they are secured so that only the
+ // user has access to them.
+ getYarnLocalDirs(conf).split(",")
} else {
- Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
- conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
- }
- val rootDirs = confValue.split(',')
- logDebug(s"Getting/creating local root dirs at '$confValue'")
-
- rootDirs.flatMap { rootDir =>
- val localDir: File = new File(rootDir)
- val foundLocalDir = localDir.exists || localDir.mkdirs()
- if (!foundLocalDir) {
- logError(s"Failed to create local root dir in $rootDir. Ignoring this directory.")
- None
- } else {
- Some(rootDir)
- }
+ // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
+ // configuration to point to a secure directory. So create a subdirectory with restricted
+ // permissions under each listed directory.
+ Option(conf.getenv("SPARK_LOCAL_DIRS"))
+ .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+ .split(",")
+ .flatMap { root =>
+ try {
+ val rootDir = new File(root)
+ if (rootDir.exists || rootDir.mkdirs()) {
+ Some(createDirectory(root).getAbsolutePath())
+ } else {
+ logError(s"Failed to create dir in $root. Ignoring this directory.")
+ None
+ }
+ } catch {
+ case e: IOException =>
+ logError(s"Failed to create local root dir in $root. Ignoring this directory.")
+ None
+ }
+ }
+ .toArray
}
}