aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala99
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ThreadingSuite.scala45
-rw-r--r--docs/_config.yml2
-rw-r--r--docs/building-with-maven.md6
-rwxr-xr-xec2/spark_ec2.py8
-rw-r--r--examples/pom.xml2
-rw-r--r--pom.xml3
-rw-r--r--project/SparkBuild.scala16
-rwxr-xr-xspark-class12
12 files changed, 138 insertions, 78 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 882bc506b4..ef97fa85fa 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -258,7 +258,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new ThreadLocal[Properties]
+ private val localProperties = new InheritableThreadLocal[Properties] {
+ override protected def childValue(parent: Properties): Properties = new Properties(parent)
+ }
def initLocalProperties() {
localProperties.set(new Properties())
@@ -275,6 +277,9 @@ class SparkContext(
}
}
+ def getLocalProperty(key: String): String =
+ Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
index f80823317b..114617c51a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,14 +17,12 @@
package org.apache.spark.scheduler.cluster
-import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
-import java.util.Properties
-
-import scala.xml.XML
+import java.io.{FileInputStream, InputStream}
+import java.util.{NoSuchElementException, Properties}
import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import scala.xml.XML
/**
* An interface to build Schedulable tree
@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
+ val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+ val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
- if (schedulerAllocFile != null) {
- val file = new File(schedulerAllocFile)
- if (file.exists()) {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
- val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
- }
- }
-
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
-
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
-
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
- poolName, schedulingMode, minShare, weight))
+ var is: Option[InputStream] = None
+ try {
+ is = Option {
+ schedulerAllocFile.map { f =>
+ new FileInputStream(f)
+ }.getOrElse {
+ getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
- } else {
- throw new java.io.FileNotFoundException(
- "Fair scheduler allocation file not found: " + schedulerAllocFile)
}
+
+ is.foreach { i => buildFairSchedulerPool(i) }
+ } finally {
+ is.foreach(_.close())
}
// finally create "default" pool
+ buildDefaultPool()
+ }
+
+ private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}
+ private def buildFairSchedulerPool(is: InputStream) {
+ val xml = XML.load(is)
+ for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+ var schedulingMode = DEFAULT_SCHEDULING_MODE
+ var minShare = DEFAULT_MINIMUM_SHARE
+ var weight = DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ } catch {
+ case e: NoSuchElementException =>
+ logWarning("Error xml schedulingMode, using default schedulingMode")
+ }
+ }
+
+ val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShare != "") {
+ minShare = xmlMinShare.toInt
+ }
+
+ val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName, schedulingMode, minShare, weight)
+ rootPool.addSchedulable(pool)
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ poolName, schedulingMode, minShare, weight))
+ }
+ }
+
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 6ec124da9c..459e257d79 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -40,17 +40,17 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
}
def resetSparkContext() = {
- if (sc != null) {
- LocalSparkContext.stop(sc)
- sc = null
- }
+ LocalSparkContext.stop(sc)
+ sc = null
}
}
object LocalSparkContext {
def stop(sc: SparkContext) {
- sc.stop()
+ if (sc != null) {
+ sc.stop()
+ }
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 97cbca09bf..288aa14eeb 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -33,10 +33,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll() {
- if (_sc != null) {
- LocalSparkContext.stop(_sc)
- _sc = null
- }
+ LocalSparkContext.stop(_sc)
+ _sc = null
super.afterAll()
}
}
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 69383ddfb8..75d6493e33 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -40,7 +40,7 @@ object ThreadingSuiteState {
}
class ThreadingSuite extends FunSuite with LocalSparkContext {
-
+
test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}
+
+ test("set local properties in different thread") {
+ sc = new SparkContext("local", "test")
+ val sem = new Semaphore(0)
+
+ val threads = (1 to 5).map { i =>
+ new Thread() {
+ override def run() {
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ sem.release()
+ }
+ }
+ }
+
+ threads.foreach(_.start())
+
+ sem.acquire(5)
+ assert(sc.getLocalProperty("test") === null)
+ }
+
+ test("set and get local properties in parent-children thread") {
+ sc = new SparkContext("local", "test")
+ sc.setLocalProperty("test", "parent")
+ val sem = new Semaphore(0)
+
+ val threads = (1 to 5).map { i =>
+ new Thread() {
+ override def run() {
+ assert(sc.getLocalProperty("test") === "parent")
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ sem.release()
+ }
+ }
+ }
+
+ threads.foreach(_.start())
+
+ sem.acquire(5)
+ assert(sc.getLocalProperty("test") === "parent")
+ assert(sc.getLocalProperty("Foo") === null)
+ }
}
diff --git a/docs/_config.yml b/docs/_config.yml
index e7a96c0c70..ad851673a5 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -6,5 +6,5 @@ markdown: kramdown
SPARK_VERSION: 0.8.0-SNAPSHOT
SPARK_VERSION_SHORT: 0.8.0
SCALA_VERSION: 2.10
-MESOS_VERSION: 0.9.0-incubating
+MESOS_VERSION: 0.13.0
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
index 7ecb601ddd..19c01e179f 100644
--- a/docs/building-with-maven.md
+++ b/docs/building-with-maven.md
@@ -37,13 +37,13 @@ For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop versions wit
# Cloudera CDH 4.2.0 with MapReduce v1
$ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
-For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should also enable the "hadoop2-yarn" profile:
+For Apache Hadoop 2.x, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, you should enable the "hadoop2-yarn" profile and set the "yarn.version" property:
# Apache Hadoop 2.0.5-alpha
- $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -DskipTests clean package
+ $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package
# Cloudera CDH 4.2.0 with MapReduce v2
- $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -DskipTests clean package
+ $ mvn -Phadoop2-yarn -Dhadoop.version=2.0.0-cdh4.2.0 -Dyarn.version=2.0.0-chd4.2.0 -DskipTests clean package
## Spark Tests in Maven ##
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 419d0fe13f..6b7d202a88 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -364,12 +364,12 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
slave_nodes = []
for res in reservations:
active = [i for i in res.instances if is_active(i)]
- if len(active) > 0:
- group_names = [g.name for g in res.groups]
+ for inst in active:
+ group_names = [g.name for g in inst.groups]
if group_names == [cluster_name + "-master"]:
- master_nodes += res.instances
+ master_nodes.append(inst)
elif group_names == [cluster_name + "-slaves"]:
- slave_nodes += res.instances
+ slave_nodes.append(inst)
if any((master_nodes, slave_nodes)):
print ("Found %d master(s), %d slaves" %
(len(master_nodes), len(slave_nodes)))
diff --git a/examples/pom.xml b/examples/pom.xml
index b44a126546..ca06a9ad8d 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -93,7 +93,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
- <version>1.2.5</version>
+ <version>1.2.6</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
diff --git a/pom.xml b/pom.xml
index 4033956968..b9e872255e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,9 +76,10 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.5</java.version>
+
<scala-short.version>2.10</scala-short.version>
<scala.version>2.10.2</scala.version>
- <mesos.version>0.12.1</mesos.version>
+ <mesos.version>0.13.0</mesos.version>
<akka.version>2.2.1</akka.version>
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ffa9c93925..b5e65a1856 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -81,7 +81,7 @@ object SparkBuild extends Build {
organization := "org.apache.spark",
version := "0.8.0-SNAPSHOT",
scalaVersion := "2.10.2",
-// scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION),
+ scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION),
javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
@@ -195,7 +195,7 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"com.ning" % "compress-lzf" % "0.8.4",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
- "commons-daemon" % "commons-daemon" % "1.0.10",
+ "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.22",
@@ -204,7 +204,7 @@ object SparkBuild extends Build {
"net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
- "org.apache.mesos" % "mesos" % "0.12.1",
+ "org.apache.mesos" % "mesos" % "0.13.0",
"net.java.dev.jets3t" % "jets3t" % "0.7.1",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
@@ -215,9 +215,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1",
- "com.twitter" % "chill-java" % "0.3.1",
- "org.scala-lang" % "jline" % "2.10.2",
- "org.scala-lang" % "scala-reflect" % "2.10.2"
+ "com.twitter" % "chill-java" % "0.3.1"
)
)
@@ -227,7 +225,9 @@ object SparkBuild extends Build {
def replSettings = sharedSettings ++ Seq(
name := "spark-repl",
- libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _)
+ libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v ),
+ libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "jline" % v ),
+ libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
)
@@ -237,7 +237,7 @@ object SparkBuild extends Build {
"com.twitter" %% "algebird-core" % "0.1.11",
"org.apache.hbase" % "hbase" % "0.94.6" excludeAll(excludeNetty, excludeAsm),
"org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm),
- "org.apache.cassandra" % "cassandra-all" % "1.2.5"
+ "org.apache.cassandra" % "cassandra-all" % "1.2.6"
exclude("com.google.guava", "guava")
exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru")
exclude("com.ning","compress-lzf")
diff --git a/spark-class b/spark-class
index 1b2388e8c4..5305b3d025 100755
--- a/spark-class
+++ b/spark-class
@@ -37,7 +37,7 @@ fi
# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
# values for that; it doesn't need a lot
-if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
+if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
# Do not overwrite SPARK_JAVA_OPTS environment variable in this script
@@ -49,19 +49,19 @@ fi
# Add java opts for master, worker, executor. The opts maybe null
case "$1" in
- 'spark.deploy.master.Master')
+ 'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
;;
- 'spark.deploy.worker.Worker')
+ 'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
;;
- 'spark.executor.StandaloneExecutorBackend')
+ 'org.apache.spark.executor.StandaloneExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
- 'spark.executor.MesosExecutorBackend')
+ 'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
- 'spark.repl.Main')
+ 'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
;;
esac