diff options
author | Thomas Graves <tgraves@apache.org> | 2014-04-29 09:19:48 -0500 |
---|---|---|
committer | Thomas Graves <tgraves@apache.org> | 2014-04-29 09:19:48 -0500 |
commit | 8db0f7e28f5f0330a3344705ff48d8e7b97c383f (patch) | |
tree | 44e20d86c2bb75908585542ab32baff2ccb17d6b /core | |
parent | 9a1184a8a9fa679b5ba8cf376b2c71ed1fb6e961 (diff) | |
download | spark-8db0f7e28f5f0330a3344705ff48d8e7b97c383f.tar.gz spark-8db0f7e28f5f0330a3344705ff48d8e7b97c383f.tar.bz2 spark-8db0f7e28f5f0330a3344705ff48d8e7b97c383f.zip |
SPARK-1557 Set permissions on event log files/directories
This adds minimal setting of event log directory/files permissions. To have a secure environment the user must manually create the top level event log directory and set permissions up. We can add logic to do that automatically later if we want.
Author: Thomas Graves <tgraves@apache.org>
Closes #538 from tgravescs/SPARK-1557 and squashes the following commits:
e471d8e [Thomas Graves] rework
d8b6620 [Thomas Graves] update use of octal
3ca9b79 [Thomas Graves] Updated based on comments
5a09709 [Thomas Graves] add in missing import
3150ed6 [Thomas Graves] SPARK-1557 Set permissions on event log files/directories
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/FileLogger.scala | 22 |
2 files changed, 22 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 2fe65cd944..d822a8e551 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.json4s.jackson.JsonMethods._ import org.apache.spark.{Logging, SparkConf, SparkContext} @@ -54,7 +55,7 @@ private[spark] class EventLoggingListener( private val logger = new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, - shouldOverwrite) + shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) /** * Begin logging events. @@ -124,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging { val SPARK_VERSION_PREFIX = "SPARK_VERSION_" val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" + val LOG_FILE_PERMISSIONS: FsPermission = + FsPermission.createImmutable(Integer.parseInt("770", 8).toShort) + // A cache for compression codecs to avoid creating the same codec many times private val codecMap = new mutable.HashMap[String, CompressionCodec] diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 1ed3b70bb2..0965e0f0f7 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -24,6 +24,7 @@ import java.util.Date import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} +import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.{Logging, SparkConf} import org.apache.spark.io.CompressionCodec @@ -42,7 +43,8 @@ private[spark] class FileLogger( hadoopConfiguration: Configuration, outputBufferSize: Int = 8 * 1024, // 8 KB compress: Boolean = false, - overwrite: Boolean = true) + overwrite: Boolean = true, + dirPermissions: Option[FsPermission] = None) extends Logging { private val dateFormat = new ThreadLocal[SimpleDateFormat]() { @@ -79,16 +81,25 @@ private[spark] class FileLogger( if (!fileSystem.mkdirs(path)) { throw new IOException("Error in creating log directory: %s".format(logDir)) } + if (dirPermissions.isDefined) { + val fsStatus = fileSystem.getFileStatus(path) + if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) { + fileSystem.setPermission(path, dirPermissions.get) + } + } } /** * Create a new writer for the file identified by the given path. + * If the permissions are not passed in, it will default to use the permissions + * (dirpermissions) used when class was instantiated. */ - private def createWriter(fileName: String): PrintWriter = { + private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = { val logPath = logDir + "/" + fileName val uri = new URI(logPath) val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme val isDefaultLocal = (defaultFs == null || defaultFs == "file") + val path = new Path(logPath) /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). * Therefore, for local files, use FileOutputStream instead. */ @@ -97,11 +108,11 @@ private[spark] class FileLogger( // Second parameter is whether to append new FileOutputStream(uri.getPath, !overwrite) } else { - val path = new Path(logPath) hadoopDataStream = Some(fileSystem.create(path, overwrite)) hadoopDataStream.get } + perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)} val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream) @@ -152,15 +163,16 @@ private[spark] class FileLogger( /** * Start a writer for a new file, closing the existing one if it exists. * @param fileName Name of the new file, defaulting to the file index if not provided. + * @param perms Permissions to put on the new file. */ - def newFile(fileName: String = "") { + def newFile(fileName: String = "", perms: Option[FsPermission] = None) { fileIndex += 1 writer.foreach(_.close()) val name = fileName match { case "" => fileIndex.toString case _ => fileName } - writer = Some(createWriter(name)) + writer = Some(createWriter(name, perms)) } /** |