aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorThomas Graves <tgraves@apache.org>2014-04-29 09:19:48 -0500
committerThomas Graves <tgraves@apache.org>2014-04-29 09:19:48 -0500
commit8db0f7e28f5f0330a3344705ff48d8e7b97c383f (patch)
tree44e20d86c2bb75908585542ab32baff2ccb17d6b /core
parent9a1184a8a9fa679b5ba8cf376b2c71ed1fb6e961 (diff)
downloadspark-8db0f7e28f5f0330a3344705ff48d8e7b97c383f.tar.gz
spark-8db0f7e28f5f0330a3344705ff48d8e7b97c383f.tar.bz2
spark-8db0f7e28f5f0330a3344705ff48d8e7b97c383f.zip
SPARK-1557 Set permissions on event log files/directories
This adds minimal setting of event log directory/files permissions. To have a secure environment the user must manually create the top level event log directory and set permissions up. We can add logic to do that automatically later if we want. Author: Thomas Graves <tgraves@apache.org> Closes #538 from tgravescs/SPARK-1557 and squashes the following commits: e471d8e [Thomas Graves] rework d8b6620 [Thomas Graves] update use of octal 3ca9b79 [Thomas Graves] Updated based on comments 5a09709 [Thomas Graves] add in missing import 3150ed6 [Thomas Graves] SPARK-1557 Set permissions on event log files/directories
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala22
2 files changed, 22 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 2fe65cd944..d822a8e551 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SparkContext}
@@ -54,7 +55,7 @@ private[spark] class EventLoggingListener(
private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
- shouldOverwrite)
+ shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
/**
* Begin logging events.
@@ -124,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging {
val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
+ val LOG_FILE_PERMISSIONS: FsPermission =
+ FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
+
// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec]
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 1ed3b70bb2..0965e0f0f7 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -24,6 +24,7 @@ import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
+import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io.CompressionCodec
@@ -42,7 +43,8 @@ private[spark] class FileLogger(
hadoopConfiguration: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false,
- overwrite: Boolean = true)
+ overwrite: Boolean = true,
+ dirPermissions: Option[FsPermission] = None)
extends Logging {
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
@@ -79,16 +81,25 @@ private[spark] class FileLogger(
if (!fileSystem.mkdirs(path)) {
throw new IOException("Error in creating log directory: %s".format(logDir))
}
+ if (dirPermissions.isDefined) {
+ val fsStatus = fileSystem.getFileStatus(path)
+ if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
+ fileSystem.setPermission(path, dirPermissions.get)
+ }
+ }
}
/**
* Create a new writer for the file identified by the given path.
+ * If the permissions are not passed in, it will default to use the permissions
+ * (dirpermissions) used when class was instantiated.
*/
- private def createWriter(fileName: String): PrintWriter = {
+ private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
val isDefaultLocal = (defaultFs == null || defaultFs == "file")
+ val path = new Path(logPath)
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */
@@ -97,11 +108,11 @@ private[spark] class FileLogger(
// Second parameter is whether to append
new FileOutputStream(uri.getPath, !overwrite)
} else {
- val path = new Path(logPath)
hadoopDataStream = Some(fileSystem.create(path, overwrite))
hadoopDataStream.get
}
+ perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
@@ -152,15 +163,16 @@ private[spark] class FileLogger(
/**
* Start a writer for a new file, closing the existing one if it exists.
* @param fileName Name of the new file, defaulting to the file index if not provided.
+ * @param perms Permissions to put on the new file.
*/
- def newFile(fileName: String = "") {
+ def newFile(fileName: String = "", perms: Option[FsPermission] = None) {
fileIndex += 1
writer.foreach(_.close())
val name = fileName match {
case "" => fileIndex.toString
case _ => fileName
}
- writer = Some(createWriter(name))
+ writer = Some(createWriter(name, perms))
}
/**