diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-04-22 08:01:13 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-04-22 08:01:13 +0530 |
commit | 7acab3ab45df421601ee9a076a61de00561a0308 (patch) | |
tree | ee2e9264e72bfd10e878c09765b5d580e382aca7 /core | |
parent | ac2e8e8720f10efd640a67ad85270719ab2d43e9 (diff) | |
download | spark-7acab3ab45df421601ee9a076a61de00561a0308.tar.gz spark-7acab3ab45df421601ee9a076a61de00561a0308.tar.bz2 spark-7acab3ab45df421601ee9a076a61de00561a0308.zip |
Fix review comments, add a new api to SparkHadoopUtil to create appropriate Configuration. Modify an example to show how to use SplitInfo
Diffstat (limited to 'core')
6 files changed, 31 insertions, 14 deletions
diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala index d4badbc5c4..a0fb4fe25d 100644 --- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,4 +1,6 @@ package spark.deploy +import org.apache.hadoop.conf.Configuration + /** * Contains util methods to interact with Hadoop from spark. @@ -15,4 +17,7 @@ object SparkHadoopUtil { // Add support, if exists - for now, simply run func ! func(args) } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + def newConfiguration(): Configuration = new Configuration() } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index 66e5ad8491..ab1ab9d8a7 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -12,7 +12,7 @@ import java.security.PrivilegedExceptionAction */ object SparkHadoopUtil { - val yarnConf = new YarnConfiguration(new Configuration()) + val yarnConf = newConfiguration() def getUserNameFromEnvironment(): String = { // defaulting to env if -D is not present ... @@ -56,4 +56,8 @@ object SparkHadoopUtil { def setYarnMode(env: HashMap[String, String]) { env("SPARK_YARN_MODE") = "true" } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Always create a new config, dont reuse yarnConf. + def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) } diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala index d4badbc5c4..a0fb4fe25d 100644 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,4 +1,6 @@ package spark.deploy +import org.apache.hadoop.conf.Configuration + /** * Contains util methods to interact with Hadoop from spark. @@ -15,4 +17,7 @@ object SparkHadoopUtil { // Add support, if exists - for now, simply run func ! func(args) } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + def newConfiguration(): Configuration = new Configuration() } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index e853bce2c4..5f5ec0b0f4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} import org.apache.mesos.MesosNativeLibrary -import spark.deploy.LocalSparkCluster +import spark.deploy.{SparkHadoopUtil, LocalSparkCluster} import spark.partial.ApproximateEvaluator import spark.partial.PartialResult import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} @@ -102,7 +102,9 @@ class SparkContext( // Add each JAR given through the constructor - if (jars != null) jars.foreach { addJar(_) } + if (jars != null) { + jars.foreach { addJar(_) } + } // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() @@ -114,7 +116,9 @@ class SparkContext( executorEnvs(key) = value } } - if (environment != null) executorEnvs ++= environment + if (environment != null) { + executorEnvs ++= environment + } // Create and start the scheduler private var taskScheduler: TaskScheduler = { @@ -207,7 +211,7 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { - val conf = new Configuration() + val conf = SparkHadoopUtil.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) @@ -711,7 +715,7 @@ class SparkContext( */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { val path = new Path(dir) - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 3e54fa7a7e..9f48cbe490 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -4,7 +4,6 @@ import java.io._ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket} import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.JavaConversions._ @@ -208,7 +207,7 @@ private object Utils extends Logging { case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others val uri = new URI(url) - val conf = new Configuration() + val conf = SparkHadoopUtil.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) @@ -317,7 +316,6 @@ private object Utils extends Logging { * Get the local machine's hostname. */ def localHostName(): String = { - // customHostname.getOrElse(InetAddress.getLocalHost.getHostName) customHostname.getOrElse(localIpAddressHostname) } @@ -337,6 +335,7 @@ private object Utils extends Logging { retval } + /* // Used by DEBUG code : remove when all testing done private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$") def checkHost(host: String, message: String = "") { @@ -358,12 +357,11 @@ private object Utils extends Logging { Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message) } } + */ // Once testing is complete in various modes, replace with this ? - /* def checkHost(host: String, message: String = "") {} def checkHostPort(hostPort: String, message: String = "") {} - */ def getUserNameFromEnvironment(): String = { SparkHadoopUtil.getUserNameFromEnvironment diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 24d527f38f..79d00edee7 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -8,6 +8,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.fs.Path import java.io.{File, IOException, EOFException} import java.text.NumberFormat +import spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -65,7 +66,7 @@ private[spark] object CheckpointRDD extends Logging { def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { val outputDir = new Path(path) - val fs = outputDir.getFileSystem(new Configuration()) + val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration()) val finalOutputName = splitIdToFile(ctx.splitId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -103,7 +104,7 @@ private[spark] object CheckpointRDD extends Logging { } def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() @@ -125,7 +126,7 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") |