aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala99
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/SharedSparkContext.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ThreadingSuite.scala45
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala1
-rwxr-xr-xspark-class12
7 files changed, 119 insertions, 61 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 72540c712a..6bab1f31d0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -256,7 +256,9 @@ class SparkContext(
private[spark] var checkpointDir: Option[String] = None
// Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new ThreadLocal[Properties]
+ private val localProperties = new InheritableThreadLocal[Properties] {
+ override protected def childValue(parent: Properties): Properties = new Properties(parent)
+ }
def initLocalProperties() {
localProperties.set(new Properties())
@@ -273,6 +275,9 @@ class SparkContext(
}
}
+ def getLocalProperty(key: String): String =
+ Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
+
/** Set a human readable description of the current job. */
def setJobDescription(value: String) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
index f80823317b..114617c51a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,14 +17,12 @@
package org.apache.spark.scheduler.cluster
-import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
-import java.util.Properties
-
-import scala.xml.XML
+import java.io.{FileInputStream, InputStream}
+import java.util.{NoSuchElementException, Properties}
import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import scala.xml.XML
/**
* An interface to build Schedulable tree
@@ -51,7 +49,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = System.getProperty("spark.scheduler.allocation.file")
+ val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+ val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -64,48 +63,26 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
val DEFAULT_WEIGHT = 1
override def buildPools() {
- if (schedulerAllocFile != null) {
- val file = new File(schedulerAllocFile)
- if (file.exists()) {
- val xml = XML.loadFile(file)
- for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
- val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
- }
- }
-
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
-
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
-
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
- poolName, schedulingMode, minShare, weight))
+ var is: Option[InputStream] = None
+ try {
+ is = Option {
+ schedulerAllocFile.map { f =>
+ new FileInputStream(f)
+ }.getOrElse {
+ getClass.getClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
- } else {
- throw new java.io.FileNotFoundException(
- "Fair scheduler allocation file not found: " + schedulerAllocFile)
}
+
+ is.foreach { i => buildFairSchedulerPool(i) }
+ } finally {
+ is.foreach(_.close())
}
// finally create "default" pool
+ buildDefaultPool()
+ }
+
+ private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
@@ -115,6 +92,42 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
}
}
+ private def buildFairSchedulerPool(is: InputStream) {
+ val xml = XML.load(is)
+ for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+ val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+ var schedulingMode = DEFAULT_SCHEDULING_MODE
+ var minShare = DEFAULT_MINIMUM_SHARE
+ var weight = DEFAULT_WEIGHT
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+ if (xmlSchedulingMode != "") {
+ try {
+ schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+ } catch {
+ case e: NoSuchElementException =>
+ logWarning("Error xml schedulingMode, using default schedulingMode")
+ }
+ }
+
+ val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+ if (xmlMinShare != "") {
+ minShare = xmlMinShare.toInt
+ }
+
+ val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+ if (xmlWeight != "") {
+ weight = xmlWeight.toInt
+ }
+
+ val pool = new Pool(poolName, schedulingMode, minShare, weight)
+ rootPool.addSchedulable(pool)
+ logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ poolName, schedulingMode, minShare, weight))
+ }
+ }
+
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 6ec124da9c..459e257d79 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -40,17 +40,17 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
}
def resetSparkContext() = {
- if (sc != null) {
- LocalSparkContext.stop(sc)
- sc = null
- }
+ LocalSparkContext.stop(sc)
+ sc = null
}
}
object LocalSparkContext {
def stop(sc: SparkContext) {
- sc.stop()
+ if (sc != null) {
+ sc.stop()
+ }
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 97cbca09bf..288aa14eeb 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -33,10 +33,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
}
override def afterAll() {
- if (_sc != null) {
- LocalSparkContext.stop(_sc)
- _sc = null
- }
+ LocalSparkContext.stop(_sc)
+ _sc = null
super.afterAll()
}
}
diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
index 69383ddfb8..75d6493e33 100644
--- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala
@@ -40,7 +40,7 @@ object ThreadingSuiteState {
}
class ThreadingSuite extends FunSuite with LocalSparkContext {
-
+
test("accessing SparkContext form a different thread") {
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
@@ -149,4 +149,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
fail("One or more threads didn't see runningThreads = 4")
}
}
+
+ test("set local properties in different thread") {
+ sc = new SparkContext("local", "test")
+ val sem = new Semaphore(0)
+
+ val threads = (1 to 5).map { i =>
+ new Thread() {
+ override def run() {
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ sem.release()
+ }
+ }
+ }
+
+ threads.foreach(_.start())
+
+ sem.acquire(5)
+ assert(sc.getLocalProperty("test") === null)
+ }
+
+ test("set and get local properties in parent-children thread") {
+ sc = new SparkContext("local", "test")
+ sc.setLocalProperty("test", "parent")
+ val sem = new Semaphore(0)
+
+ val threads = (1 to 5).map { i =>
+ new Thread() {
+ override def run() {
+ assert(sc.getLocalProperty("test") === "parent")
+ sc.setLocalProperty("test", i.toString)
+ assert(sc.getLocalProperty("test") === i.toString)
+ sem.release()
+ }
+ }
+ }
+
+ threads.foreach(_.start())
+
+ sem.acquire(5)
+ assert(sc.getLocalProperty("test") === "parent")
+ assert(sc.getLocalProperty("Foo") === null)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index c1df5e151e..016db8d57e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -25,7 +25,6 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.parallel.mutable
import org.apache.spark._
-import org.apache.spark.rdd.CoalescedRDDPartition
class RDDSuite extends FunSuite with SharedSparkContext {
diff --git a/spark-class b/spark-class
index 037abda3b7..e111ef6da7 100755
--- a/spark-class
+++ b/spark-class
@@ -37,7 +37,7 @@ fi
# If this is a standalone cluster daemon, reset SPARK_JAVA_OPTS and SPARK_MEM to reasonable
# values for that; it doesn't need a lot
-if [ "$1" = "spark.deploy.master.Master" -o "$1" = "spark.deploy.worker.Worker" ]; then
+if [ "$1" = "org.apache.spark.deploy.master.Master" -o "$1" = "org.apache.spark.deploy.worker.Worker" ]; then
SPARK_MEM=${SPARK_DAEMON_MEMORY:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
# Do not overwrite SPARK_JAVA_OPTS environment variable in this script
@@ -49,19 +49,19 @@ fi
# Add java opts for master, worker, executor. The opts maybe null
case "$1" in
- 'spark.deploy.master.Master')
+ 'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_MASTER_OPTS"
;;
- 'spark.deploy.worker.Worker')
+ 'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_WORKER_OPTS"
;;
- 'spark.executor.StandaloneExecutorBackend')
+ 'org.apache.spark.executor.StandaloneExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
- 'spark.executor.MesosExecutorBackend')
+ 'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
;;
- 'spark.repl.Main')
+ 'org.apache.spark.repl.Main')
OUR_JAVA_OPTS="$OUR_JAVA_OPTS $SPARK_REPL_OPTS"
;;
esac