aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala5
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala6
-rw-r--r--core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala5
-rw-r--r--core/src/main/scala/spark/SparkContext.scala14
-rw-r--r--core/src/main/scala/spark/Utils.scala8
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala7
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala10
-rw-r--r--project/SparkBuild.scala10
8 files changed, 45 insertions, 20 deletions
diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
index d4badbc5c4..a0fb4fe25d 100644
--- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,4 +1,6 @@
package spark.deploy
+import org.apache.hadoop.conf.Configuration
+
/**
* Contains util methods to interact with Hadoop from spark.
@@ -15,4 +17,7 @@ object SparkHadoopUtil {
// Add support, if exists - for now, simply run func !
func(args)
}
+
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ def newConfiguration(): Configuration = new Configuration()
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
index 66e5ad8491..ab1ab9d8a7 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
@@ -12,7 +12,7 @@ import java.security.PrivilegedExceptionAction
*/
object SparkHadoopUtil {
- val yarnConf = new YarnConfiguration(new Configuration())
+ val yarnConf = newConfiguration()
def getUserNameFromEnvironment(): String = {
// defaulting to env if -D is not present ...
@@ -56,4 +56,8 @@ object SparkHadoopUtil {
def setYarnMode(env: HashMap[String, String]) {
env("SPARK_YARN_MODE") = "true"
}
+
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ // Always create a new config, dont reuse yarnConf.
+ def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
}
diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
index d4badbc5c4..a0fb4fe25d 100644
--- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
@@ -1,4 +1,6 @@
package spark.deploy
+import org.apache.hadoop.conf.Configuration
+
/**
* Contains util methods to interact with Hadoop from spark.
@@ -15,4 +17,7 @@ object SparkHadoopUtil {
// Add support, if exists - for now, simply run func !
func(args)
}
+
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ def newConfiguration(): Configuration = new Configuration()
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index e853bce2c4..5f5ec0b0f4 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.mesos.MesosNativeLibrary
-import spark.deploy.LocalSparkCluster
+import spark.deploy.{SparkHadoopUtil, LocalSparkCluster}
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
@@ -102,7 +102,9 @@ class SparkContext(
// Add each JAR given through the constructor
- if (jars != null) jars.foreach { addJar(_) }
+ if (jars != null) {
+ jars.foreach { addJar(_) }
+ }
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
@@ -114,7 +116,9 @@ class SparkContext(
executorEnvs(key) = value
}
}
- if (environment != null) executorEnvs ++= environment
+ if (environment != null) {
+ executorEnvs ++= environment
+ }
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
@@ -207,7 +211,7 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
- val conf = new Configuration()
+ val conf = SparkHadoopUtil.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
@@ -711,7 +715,7 @@ class SparkContext(
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val path = new Path(dir)
- val fs = path.getFileSystem(new Configuration())
+ val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 3e54fa7a7e..9f48cbe490 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -4,7 +4,6 @@ import java.io._
import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConversions._
@@ -208,7 +207,7 @@ private object Utils extends Logging {
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val uri = new URI(url)
- val conf = new Configuration()
+ val conf = SparkHadoopUtil.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
@@ -317,7 +316,6 @@ private object Utils extends Logging {
* Get the local machine's hostname.
*/
def localHostName(): String = {
- // customHostname.getOrElse(InetAddress.getLocalHost.getHostName)
customHostname.getOrElse(localIpAddressHostname)
}
@@ -337,6 +335,7 @@ private object Utils extends Logging {
retval
}
+ /*
// Used by DEBUG code : remove when all testing done
private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$")
def checkHost(host: String, message: String = "") {
@@ -358,12 +357,11 @@ private object Utils extends Logging {
Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message)
}
}
+ */
// Once testing is complete in various modes, replace with this ?
- /*
def checkHost(host: String, message: String = "") {}
def checkHostPort(hostPort: String, message: String = "") {}
- */
def getUserNameFromEnvironment(): String = {
SparkHadoopUtil.getUserNameFromEnvironment
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 24d527f38f..79d00edee7 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -8,6 +8,7 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path
import java.io.{File, IOException, EOFException}
import java.text.NumberFormat
+import spark.deploy.SparkHadoopUtil
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -65,7 +66,7 @@ private[spark] object CheckpointRDD extends Logging {
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(new Configuration())
+ val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration())
val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -103,7 +104,7 @@ private[spark] object CheckpointRDD extends Logging {
}
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
- val fs = path.getFileSystem(new Configuration())
+ val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
@@ -125,7 +126,7 @@ private[spark] object CheckpointRDD extends Logging {
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(new Configuration())
+ val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
index 0f42f405a0..3d080a0257 100644
--- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -4,6 +4,8 @@ import java.util.Random
import scala.math.exp
import spark.util.Vector
import spark._
+import spark.deploy.SparkHadoopUtil
+import spark.scheduler.InputFormatInfo
/**
* Logistic regression based classification.
@@ -32,9 +34,13 @@ object SparkHdfsLR {
System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
System.exit(1)
}
+ val inputPath = args(1)
+ val conf = SparkHadoopUtil.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
- val lines = sc.textFile(args(1))
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
+ InputFormatInfo.computePreferredLocations(
+ Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath))))
+ val lines = sc.textFile(inputPath)
val points = lines.map(parsePoint _).cache()
val ITERATIONS = args(2).toInt
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 91e3123bc5..0a5b89d927 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -47,10 +47,8 @@ object SparkBuild extends Build {
scalacOptions := Seq("-unchecked", "-optimize", "-deprecation"),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
- // retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
- // For some reason this fails on some nodes and works on others - not yet debugged why
- // testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
+ testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
// shared between both core and streaming.
resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),
@@ -170,7 +168,11 @@ object SparkBuild extends Build {
Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION)
}),
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ /
- ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") "src/hadoop2-yarn/scala" else "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" )
+ ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") {
+ "src/hadoop2-yarn/scala"
+ } else {
+ "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala"
+ } )
}
) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings