From eeb5f64c67c98f7252aab0fe2c55b86976463c74 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Tue, 29 Oct 2013 17:12:16 -0500 Subject: Remove SparkHadoopUtil stuff from SparkEnv --- .../main/scala/org/apache/spark/SparkContext.scala | 11 +++++------ core/src/main/scala/org/apache/spark/SparkEnv.scala | 14 -------------- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 21 +++++++++++++++++++++ .../scala/org/apache/spark/rdd/CheckpointRDD.scala | 7 ++++--- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 7 ++++--- .../apache/spark/scheduler/InputFormatInfo.scala | 7 +++---- .../main/scala/org/apache/spark/util/Utils.scala | 3 +-- 7 files changed, 38 insertions(+), 32 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c9bc01cba5..c1d74dc60c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -51,7 +51,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.deploy.LocalSparkCluster +import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ @@ -245,7 +245,7 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { val env = SparkEnv.get - val conf = env.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { @@ -379,7 +379,7 @@ class SparkContext( minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. - SparkEnv.get.hadoop.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -697,7 +697,7 @@ class SparkContext( val uri = new URI(path) key = uri.getScheme match { case null | "file" => - if (env.hadoop.isYarnMode()) { + if (SparkHadoopUtil.get.isYarnMode()) { // In order for this to work on yarn the user must specify the --addjars option to // the client to upload the file into the distributed cache to make it show up in the // current working directory. @@ -932,9 +932,8 @@ class SparkContext( * prevent accidental overriding of checkpoint files in the existing directory. */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { - val env = SparkEnv.get val path = new Path(dir) - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index aaab717bcf..270d8d4b2d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -25,7 +25,6 @@ import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster} import org.apache.spark.network.ConnectionManager import org.apache.spark.serializer.{Serializer, SerializerManager} @@ -58,19 +57,6 @@ class SparkEnv ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() - val hadoop = { - val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if(yarnMode) { - try { - Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] - } catch { - case th: Throwable => throw new SparkException("Unable to load YARN support", th) - } - } else { - new SparkHadoopUtil - } - } - def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 83cd3df5fa..8c0c111d61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -22,6 +22,8 @@ import org.apache.hadoop.mapred.JobConf import com.google.common.collect.MapMaker +import org.apache.spark.SparkException + /** * Contains util methods to interact with Hadoop from Spark. @@ -47,3 +49,22 @@ class SparkHadoopUtil { def isYarnMode(): Boolean = { false } } + +object SparkHadoopUtil { + private val hadoop = { + val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if (yarnMode) { + try { + Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + } catch { + case th: Throwable => throw new SparkException("Unable to load YARN support", th) + } + } else { + new SparkHadoopUtil + } + } + + def get: SparkHadoopUtil = { + hadoop + } +} diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index ccaaecb85b..d3033ea4a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.rdd import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{NullWritable, BytesWritable} @@ -83,7 +84,7 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(env.hadoop.newConfiguration()) + val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration()) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -122,7 +123,7 @@ private[spark] object CheckpointRDD extends Logging { def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() @@ -145,7 +146,7 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(env.hadoop.newConfiguration()) + val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index fad042c7ae..0e8eaf4be6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} @@ -198,10 +199,10 @@ private[spark] object HadoopRDD { * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ - def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + def getCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.get(key) - def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + def containsCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.containsKey(key) def putCachedMetadata(key: String, value: Any) = - SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) + SparkHadoopUtil.get.hadoopJobMetadata.put(key, value) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 370ccd183c..1791ee660d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.deploy.SparkHadoopUtil import scala.collection.immutable.Set import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.hadoop.security.UserGroupInformation @@ -87,9 +88,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { - val env = SparkEnv.get val conf = new JobConf(configuration) - env.hadoop.addCredentials(conf) + SparkHadoopUtil.get.addCredentials(conf) FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -108,9 +108,8 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { - val env = SparkEnv.get val jobConf = new JobConf(configuration) - env.hadoop.addCredentials(jobConf) + SparkHadoopUtil.get.addCredentials(jobConf) FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a3b3968c5e..fd2811e44c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -280,9 +280,8 @@ private[spark] object Utils extends Logging { } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others - val env = SparkEnv.get val uri = new URI(url) - val conf = env.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) -- cgit v1.2.3 From f231aaa24c56402ee364ac27142c3c35567d64f2 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Wed, 30 Oct 2013 11:46:12 -0500 Subject: move the hadoopJobMetadata back into SparkEnv --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 5 +++++ core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 7 ------- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +++--- 3 files changed, 8 insertions(+), 10 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 270d8d4b2d..ff2df8fb6a 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -31,6 +31,7 @@ import org.apache.spark.serializer.{Serializer, SerializerManager} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.api.python.PythonWorkerFactory +import com.google.common.collect.MapMaker /** * Holds all the runtime environment objects for a running Spark instance (either master or worker), @@ -57,6 +58,10 @@ class SparkEnv ( private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + // A general, soft-reference map for metadata needed during HadoopRDD split computation + // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). + private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() + def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8c0c111d61..6bc846aa92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -20,19 +20,13 @@ package org.apache.spark.deploy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf -import com.google.common.collect.MapMaker - import org.apache.spark.SparkException - /** * Contains util methods to interact with Hadoop from Spark. */ private[spark] class SparkHadoopUtil { - // A general, soft-reference map for metadata needed during HadoopRDD split computation - // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). - private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop @@ -47,7 +41,6 @@ class SparkHadoopUtil { def addCredentials(conf: JobConf) {} def isYarnMode(): Boolean = { false } - } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0e8eaf4be6..32901a508f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -199,10 +199,10 @@ private[spark] object HadoopRDD { * The three methods below are helpers for accessing the local map, a property of the SparkEnv of * the local process. */ - def getCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.get(key) + def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key) - def containsCachedMetadata(key: String) = SparkHadoopUtil.get.hadoopJobMetadata.containsKey(key) + def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key) def putCachedMetadata(key: String, value: Any) = - SparkHadoopUtil.get.hadoopJobMetadata.put(key, value) + SparkEnv.get.hadoopJobMetadata.put(key, value) } -- cgit v1.2.3