diff options
author | Matei Zaharia <matei@databricks.com> | 2013-12-29 15:38:46 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2013-12-29 15:38:46 -0500 |
commit | 0bd1900cbce5946999c38293852d8ccd4f838930 (patch) | |
tree | 97d64682ce016cb3702b3bce456f60de7f4de9c0 /core | |
parent | b4ceed40d6e511a1d475b3f4fbcdd2ad24c02b5a (diff) | |
download | spark-0bd1900cbce5946999c38293852d8ccd4f838930.tar.gz spark-0bd1900cbce5946999c38293852d8ccd4f838930.tar.bz2 spark-0bd1900cbce5946999c38293852d8ccd4f838930.zip |
Fix a few settings that were being read as system properties after merge
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 18 |
2 files changed, 13 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 56a038dc69..bffd990e16 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -47,10 +47,12 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode */ private[spark] class TaskSchedulerImpl( val sc: SparkContext, - val maxTaskFailures: Int = System.getProperty("spark.task.maxFailures", "4").toInt, + val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging { + def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt) + val conf = sc.conf // How often to check for speculative tasks diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 9b95e418d8..d752e6f111 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -54,12 +54,14 @@ private[spark] class TaskSetManager( clock: Clock = SystemClock) extends Schedulable with Logging { + val conf = sched.sc.conf + // CPUs to request per task - val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt + val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt // Quantile of tasks at which to start speculation - val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble - val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble + val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble + val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble // Serializer for closures and tasks. val env = SparkEnv.get @@ -118,7 +120,7 @@ private[spark] class TaskSetManager( // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = - System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong + conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception @@ -682,14 +684,14 @@ private[spark] class TaskSetManager( } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { - val defaultWait = System.getProperty("spark.locality.wait", "3000") + val defaultWait = conf.getOrElse("spark.locality.wait", "3000") level match { case TaskLocality.PROCESS_LOCAL => - System.getProperty("spark.locality.wait.process", defaultWait).toLong + conf.getOrElse("spark.locality.wait.process", defaultWait).toLong case TaskLocality.NODE_LOCAL => - System.getProperty("spark.locality.wait.node", defaultWait).toLong + conf.getOrElse("spark.locality.wait.node", defaultWait).toLong case TaskLocality.RACK_LOCAL => - System.getProperty("spark.locality.wait.rack", defaultWait).toLong + conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong case TaskLocality.ANY => 0L } |