aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-07-06 15:26:19 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-07-06 15:26:19 -0700
commit94871e4703d8566359a405da58c60f411b14857a (patch)
tree072691bb7b2c8f93221d786b5cfaf80de9533fea
parent3f918b33f8e9cdb3e56f9ee7c88bc760ac9848bb (diff)
parent923cf929003c67963e273fcdcd5b01baf68df8b5 (diff)
downloadspark-94871e4703d8566359a405da58c60f411b14857a.tar.gz
spark-94871e4703d8566359a405da58c60f411b14857a.tar.bz2
spark-94871e4703d8566359a405da58c60f411b14857a.zip
Merge pull request #655 from tgravescs/master
Add support for running Spark on Yarn on a secure Hadoop Cluster
-rw-r--r--core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala20
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala17
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala58
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala8
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala39
-rw-r--r--core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala4
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala1
-rw-r--r--core/src/main/scala/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/InputFormatInfo.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala3
12 files changed, 109 insertions, 52 deletions
diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
index a0fb4fe25d..f1c86de4cc 100644
--- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,5 +1,6 @@
package spark.deploy
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
/**
@@ -20,4 +21,7 @@ object SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {}
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
index ab1ab9d8a7..301a57fffa 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,6 +1,7 @@
package spark.deploy
import collection.mutable.HashMap
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
@@ -27,18 +28,7 @@ object SparkHadoopUtil {
}
def runAsUser(func: (Product) => Unit, args: Product, user: String) {
-
- // println("running as user " + jobUserName)
-
- UserGroupInformation.setConfiguration(yarnConf)
- val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(user)
- appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
- def run: AnyRef = {
- func(args)
- // no return value ...
- null
- }
- })
+ func(args)
}
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
@@ -60,4 +50,10 @@ object SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {
+ val jobCreds = conf.getCredentials();
+ jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
+ }
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
index aa72c1e5fe..f19648ec68 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -30,23 +30,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
def run() {
- // Initialization
- val jobUserName = Utils.getUserNameFromEnvironment()
- logInfo("running as user " + jobUserName)
-
- // run as user ...
- UserGroupInformation.setConfiguration(yarnConf)
- val appMasterUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(jobUserName)
- appMasterUgi.doAs(new PrivilegedExceptionAction[AnyRef] {
- def run: AnyRef = {
- runImpl()
- return null
- }
- })
- }
-
- private def runImpl() {
-
appAttemptId = getApplicationAttemptId()
resourceManager = registerWithResourceManager()
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
index 7a881e26df..514c17f241 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
@@ -1,9 +1,13 @@
package spark.deploy.yarn
import java.net.{InetSocketAddress, URI}
+import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
@@ -23,6 +27,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var rpc: YarnRPC = YarnRPC.create(conf)
val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+ val credentials = UserGroupInformation.getCurrentUser().getCredentials();
def run() {
init(yarnConf)
@@ -40,8 +45,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(amContainer)
- appContext.setUser(args.amUser)
-
+ appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+
submitApp(appContext)
monitorApplication(appId)
@@ -62,14 +67,21 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
- logInfo("Max mem capabililty of resources in this cluster " + maxMem)
+ logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
- // If the cluster does not have enough memory resources, exit.
- val requestedMem = (args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + args.numWorkers * args.workerMemory
- if (requestedMem > maxMem) {
- logError("Cluster cannot satisfy memory resource request of " + requestedMem)
+ // if we have requested more then the clusters max for a single resource then exit.
+ if (args.workerMemory > maxMem) {
+ logError("the worker size is to large to run on this cluster " + args.workerMemory);
+ System.exit(1)
+ }
+ val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ if (amMem > maxMem) {
+ logError("AM size is to large to run on this cluster " + amMem)
System.exit(1)
}
+
+ // We could add checks to make sure the entire cluster has enough resources but that involves getting
+ // all the node reports and computing ourselves
}
def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
@@ -86,6 +98,15 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Upload Spark and the application JAR to the remote file system
// Add them as local resources to the AM
val fs = FileSystem.get(conf)
+
+ val delegTokenRenewer = Master.getMasterPrincipal(conf);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+ logError("Can't get Master Kerberos principal for use as renewer")
+ System.exit(1)
+ }
+ }
+
Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
@@ -97,6 +118,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
fs.copyFromLocalFile(false, true, src, dst)
val destStatus = fs.getFileStatus(dst)
+ // get tokens for anything we upload to hdfs
+ if (UserGroupInformation.isSecurityEnabled()) {
+ fs.addDelegationTokens(delegTokenRenewer, credentials);
+ }
+
val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
amJarRsrc.setType(LocalResourceType.FILE)
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
@@ -106,6 +132,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
locaResources(destName) = amJarRsrc
}
}
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
return locaResources
}
@@ -114,7 +141,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
val env = new HashMap[String, String]()
- Apps.addToEnvironment(env, Environment.USER.name, args.amUser)
// If log4j present, ensure ours overrides all others
if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
@@ -142,6 +168,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
}
+
// Add each SPARK-* key to the environment
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
return env
@@ -195,7 +222,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// Command for the ApplicationMaster
- val commands = List[String]("java " +
+ var javaCommand = "java";
+ val javaHome = System.getenv("JAVA_HOME")
+ if (javaHome != null && !javaHome.isEmpty()) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
" spark.deploy.yarn.ApplicationMaster" +
@@ -214,7 +247,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Memory for the ApplicationMaster
capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
amContainer.setResource(capability)
-
+
+ // Setup security tokens
+ val dob = new DataOutputBuffer()
+ credentials.writeTokenStorageToStream(dob)
+ amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
return amContainer
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
index 24110558e7..07e7edea36 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
@@ -13,7 +13,6 @@ class ClientArguments(val args: Array[String]) {
var workerMemory = 1024
var workerCores = 1
var numWorkers = 2
- var amUser = System.getProperty("user.name")
var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512
// TODO
@@ -58,10 +57,6 @@ class ClientArguments(val args: Array[String]) {
workerCores = value
args = tail
- case ("--user") :: value :: tail =>
- amUser = value
- args = tail
-
case ("--queue") :: value :: tail =>
amQueue = value
args = tail
@@ -96,8 +91,7 @@ class ClientArguments(val args: Array[String]) {
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" +
- " --user USERNAME Run the ApplicationMaster (and slaves) as a different user\n"
+ " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
)
System.exit(exitCode)
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
index a2bf0af762..cc6f3344a1 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
@@ -1,9 +1,12 @@
package spark.deploy.yarn
import java.net.URI
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
@@ -11,7 +14,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
+import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import scala.collection.JavaConversions._
@@ -76,7 +79,19 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
*/
ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
- val commands = List[String]("java " +
+
+ val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+ val dob = new DataOutputBuffer()
+ credentials.writeTokenStorageToStream(dob)
+ ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
+
+ var javaCommand = "java";
+ val javaHome = System.getenv("JAVA_HOME")
+ if (javaHome != null && !javaHome.isEmpty()) {
+ javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
+ }
+
+ val commands = List[String](javaCommand +
" -server " +
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
@@ -143,8 +158,6 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]()
- // should we add this ?
- Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment())
// If log4j present, ensure ours overrides all others
if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
@@ -165,7 +178,23 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
logInfo("Connecting to ContainerManager at " + cmHostPortStr)
- return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager]
+
+ // use doAs and remoteUser here so we can add the container token and not
+ // pollute the current users credentials with all of the individual container tokens
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString());
+ val containerToken = container.getContainerToken();
+ if (containerToken != null) {
+ user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
+ }
+
+ val proxy = user
+ .doAs(new PrivilegedExceptionAction[ContainerManager] {
+ def run: ContainerManager = {
+ return rpc.getProxy(classOf[ContainerManager],
+ cmAddress, conf).asInstanceOf[ContainerManager]
+ }
+ });
+ return proxy;
}
}
diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
index a0fb4fe25d..f1c86de4cc 100644
--- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,5 +1,6 @@
package spark.deploy
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
/**
@@ -20,4 +21,7 @@ object SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
+
+ // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ def addCredentials(conf: JobConf) {}
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 7630fe7803..8b313c645f 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
+import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
import spark.partial.PartialResult
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 366afb2a2a..228e831dff 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index cbf5512e24..07c103503c 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -15,6 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
+import spark.deploy.SparkHadoopUtil
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
@@ -50,6 +51,7 @@ class HadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getPartitions: Array[Partition] = {
+ SparkHadoopUtil.addCredentials(conf);
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
index 287f731787..17d0ea4f80 100644
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
@@ -3,11 +3,13 @@ package spark.scheduler
import spark.Logging
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
+import spark.deploy.SparkHadoopUtil
/**
@@ -70,6 +72,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
val conf = new JobConf(configuration)
+ SparkHadoopUtil.addCredentials(conf);
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -89,6 +92,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
val jobConf = new JobConf(configuration)
+ SparkHadoopUtil.addCredentials(jobConf);
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 3ec922957d..8d0a83d439 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -14,6 +14,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
@@ -470,7 +471,7 @@ extends Serializable {
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf
- ) {
+ ) {
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)