aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2013-12-29 15:38:46 -0500
committerMatei Zaharia <matei@databricks.com>2013-12-29 15:38:46 -0500
commit0bd1900cbce5946999c38293852d8ccd4f838930 (patch)
tree97d64682ce016cb3702b3bce456f60de7f4de9c0 /core
parentb4ceed40d6e511a1d475b3f4fbcdd2ad24c02b5a (diff)
downloadspark-0bd1900cbce5946999c38293852d8ccd4f838930.tar.gz
spark-0bd1900cbce5946999c38293852d8ccd4f838930.tar.bz2
spark-0bd1900cbce5946999c38293852d8ccd4f838930.zip
Fix a few settings that were being read as system properties after merge
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala18
2 files changed, 13 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 56a038dc69..bffd990e16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -47,10 +47,12 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
- val maxTaskFailures: Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+ val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
+ def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt)
+
val conf = sc.conf
// How often to check for speculative tasks
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 9b95e418d8..d752e6f111 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -54,12 +54,14 @@ private[spark] class TaskSetManager(
clock: Clock = SystemClock)
extends Schedulable with Logging
{
+ val conf = sched.sc.conf
+
// CPUs to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+ val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
// Quantile of tasks at which to start speculation
- val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
- val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+ val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
+ val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
@@ -118,7 +120,7 @@ private[spark] class TaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
- System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+ conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
@@ -682,14 +684,14 @@ private[spark] class TaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = System.getProperty("spark.locality.wait", "3000")
+ val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
- System.getProperty("spark.locality.wait.process", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
- System.getProperty("spark.locality.wait.node", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
- System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}