aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala28
2 files changed, 34 insertions, 17 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index bae7a3f307..9cc321af4b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
@@ -353,6 +354,28 @@ class SparkHadoopUtil extends Logging {
}
buffer.toString
}
+
+ private[spark] def checkAccessPermission(status: FileStatus, mode: FsAction): Boolean = {
+ val perm = status.getPermission
+ val ugi = UserGroupInformation.getCurrentUser
+
+ if (ugi.getShortUserName == status.getOwner) {
+ if (perm.getUserAction.implies(mode)) {
+ return true
+ }
+ } else if (ugi.getGroupNames.contains(status.getGroup)) {
+ if (perm.getGroupAction.implies(mode)) {
+ return true
+ }
+ } else if (perm.getOtherAction.implies(mode)) {
+ return true
+ }
+
+ logDebug(s"Permission denied: user=${ugi.getShortUserName}, " +
+ s"path=${status.getPath}:${status.getOwner}:${status.getGroup}" +
+ s"${if (status.isDirectory) "d" else "-"}$perm")
+ false
+ }
}
object SparkHadoopUtil {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 9012736bc2..f4235df245 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -27,7 +27,8 @@ import scala.xml.Node
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
@@ -318,21 +319,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
- try {
- val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
- !entry.isDirectory() &&
- // FsHistoryProvider generates a hidden file which can't be read. Accidentally
- // reading a garbage file is safe, but we would log an error which can be scary to
- // the end-user.
- !entry.getPath().getName().startsWith(".") &&
- prevFileSize < entry.getLen()
- } catch {
- case e: AccessControlException =>
- // Do not use "logInfo" since these messages can get pretty noisy if printed on
- // every poll.
- logDebug(s"No permission to read $entry, ignoring.")
- false
- }
+ val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+ !entry.isDirectory() &&
+ // FsHistoryProvider generates a hidden file which can't be read. Accidentally
+ // reading a garbage file is safe, but we would log an error which can be scary to
+ // the end-user.
+ !entry.getPath().getName().startsWith(".") &&
+ prevFileSize < entry.getLen() &&
+ SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
}
.flatMap { entry => Some(entry) }
.sortWith { case (entry1, entry2) =>
@@ -445,7 +439,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
- private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
+ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
val newAttempts = try {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||