aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/spark/Utils.scala7
-rw-r--r--core/src/main/scala/spark/deploy/SparkHadoopUtil.scala2
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala6
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala14
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/InputFormatInfo.scala9
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala3
9 files changed, 32 insertions, 28 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 80c65dfebd..f020b2554b 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -58,7 +58,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
-import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
@@ -241,7 +241,8 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
- val conf = SparkHadoopUtil.newConfiguration()
+ val env = SparkEnv.get
+ val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
@@ -629,10 +630,11 @@ class SparkContext(
logWarning("null specified as parameter to addJar",
new SparkException("null specified as parameter to addJar"))
} else {
+ val env = SparkEnv.get
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" =>
- if (SparkHadoopUtil.isYarnMode()) {
+ if (env.hadoop.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
return
}
@@ -815,8 +817,9 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
+ val env = SparkEnv.get
val path = new Path(dir)
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 0adbf1d96e..73990f0423 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -25,6 +25,7 @@ import akka.remote.RemoteActorRefProvider
import spark.broadcast.BroadcastManager
import spark.metrics.MetricsSystem
+import spark.deploy.SparkHadoopUtil
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
@@ -60,6 +61,7 @@ class SparkEnv (
// If executorId is NOT found, return defaultHostPort
var executorIdToHostPort: Option[(String, String) => String]) {
+ val hadoop = new SparkHadoopUtil
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
def stop() {
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 673f9a810d..7ea9b0c28a 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -266,8 +266,9 @@ private object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
+ val env = SparkEnv.get
val uri = new URI(url)
- val conf = SparkHadoopUtil.newConfiguration()
+ val conf = env.hadoop.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
@@ -433,10 +434,6 @@ private object Utils extends Logging {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
}
- def getUserNameFromEnvironment(): String = {
- SparkHadoopUtil.getUserNameFromEnvironment
- }
-
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
diff --git a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
index 617954cb98..c4ed0bb17e 100644
--- a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.mapred.JobConf
/**
* Contains util methods to interact with Hadoop from spark.
*/
-object SparkHadoopUtil {
+class SparkHadoopUtil {
def getUserNameFromEnvironment(): String = {
// defaulting to -D ...
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index e47fe50021..a9e06f8d54 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -22,9 +22,8 @@ import java.nio.ByteBuffer
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import spark.{Logging, Utils}
+import spark.{Logging, Utils, SparkEnv}
import spark.TaskState.TaskState
-import spark.deploy.SparkHadoopUtil
import spark.scheduler.cluster.StandaloneClusterMessages._
import spark.util.AkkaUtils
@@ -82,7 +81,8 @@ private[spark] class StandaloneExecutorBackend(
private[spark] object StandaloneExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
- SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
+ val env = SparkEnv.get
+ env.hadoop.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
}
// This will be run 'as' the user
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 6794e0e201..1ad5fe6539 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path
import java.io.{File, IOException, EOFException}
import java.text.NumberFormat
-import spark.deploy.SparkHadoopUtil
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -82,8 +81,9 @@ private[spark] object CheckpointRDD extends Logging {
}
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+ val env = SparkEnv.get
val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -101,7 +101,7 @@ private[spark] object CheckpointRDD extends Logging {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
- val serializer = SparkEnv.get.serializer.newInstance()
+ val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
@@ -121,10 +121,11 @@ private[spark] object CheckpointRDD extends Logging {
}
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val env = SparkEnv.get
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
- val serializer = SparkEnv.get.serializer.newInstance()
+ val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// Register an on-task-completion callback to close the input stream.
@@ -140,10 +141,11 @@ private[spark] object CheckpointRDD extends Logging {
import spark._
val Array(cluster, hdfsPath) = args
+ val env = SparkEnv.get
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index fd00d59c77..6c41b97780 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -32,8 +32,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import spark.deploy.SparkHadoopUtil
-import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
@@ -68,7 +67,8 @@ class HadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getPartitions: Array[Partition] = {
- SparkHadoopUtil.addCredentials(conf);
+ val env = SparkEnv.get
+ env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
index 65f8c3200e..8f1b9b29b5 100644
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
@@ -17,7 +17,7 @@
package spark.scheduler
-import spark.Logging
+import spark.{Logging, SparkEnv}
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
-import spark.deploy.SparkHadoopUtil
/**
@@ -88,8 +87,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
val conf = new JobConf(configuration)
- SparkHadoopUtil.addCredentials(conf);
+ env.hadoop.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -108,8 +108,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
val jobConf = new JobConf(configuration)
- SparkHadoopUtil.addCredentials(jobConf);
+ env.hadoop.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
index ef6e09a8e8..43c9115664 100644
--- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -21,7 +21,6 @@ import java.util.Random
import scala.math.exp
import spark.util.Vector
import spark._
-import spark.deploy.SparkHadoopUtil
import spark.scheduler.InputFormatInfo
/**
@@ -52,7 +51,7 @@ object SparkHdfsLR {
System.exit(1)
}
val inputPath = args(1)
- val conf = SparkHadoopUtil.newConfiguration()
+ val conf = SparkEnv.get.hadoop.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
InputFormatInfo.computePreferredLocations(