aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorerenavsarogullari <erenavsarogullari@gmail.com>2017-02-06 08:24:17 -0600
committerImran Rashid <irashid@cloudera.com>2017-02-06 08:24:17 -0600
commit7beb227cc8a4674e24cb1aaa278287ecc8194e5d (patch)
tree69465e065152f63260a660807011a93af6a1dfa1 /core/src/main/scala
parent7730426cb95eec2652a9ea979ae2c4faf7e585f2 (diff)
downloadspark-7beb227cc8a4674e24cb1aaa278287ecc8194e5d.tar.gz
spark-7beb227cc8a4674e24cb1aaa278287ecc8194e5d.tar.bz2
spark-7beb227cc8a4674e24cb1aaa278287ecc8194e5d.zip
[SPARK-17663][CORE] SchedulableBuilder should handle invalid data access via scheduler.allocation.file
## What changes were proposed in this pull request? If `spark.scheduler.allocation.file` has invalid `minShare` or/and `weight` values, these cause : - `NumberFormatException` due to `toInt` function - `SparkContext` can not be initialized. - It does not show meaningful error message to user. In a nutshell, this functionality can be more robust by selecting one of the following flows : **1-** Currently, if `schedulingMode` has an invalid value, a warning message is logged and default value is set as `FIFO`. Same pattern can be used for `minShare`(default: 0) and `weight`(default: 1) as well **2-** Meaningful error message can be shown to the user for all invalid cases. PR offers : - `schedulingMode` handles just empty values. It also needs to be supported for **whitespace**, **non-uppercase**(fair, FaIr etc...) or `SchedulingMode.NONE` cases by setting default value(`FIFO`) - `minShare` and `weight` handle just empty values. They also need to be supported for **non-integer** cases by setting default values. - Some refactoring of `PoolSuite`. **Code to Reproduce :** ``` val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml") val sc = new SparkContext(conf) ``` **fairscheduler-invalid-data.xml :** ``` <allocations> <pool name="production"> <schedulingMode>FIFO</schedulingMode> <weight>invalid_weight</weight> <minShare>2</minShare> </pool> </allocations> ``` **Stacktrace :** ``` Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102) ``` ## How was this patch tested? Added Unit Test Case. Author: erenavsarogullari <erenavsarogullari@gmail.com> Closes #15237 from erenavsarogullari/SPARK-17663.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala70
1 files changed, 45 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 96325a0329..f8bee3eea5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,10 +20,11 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
-import scala.xml.XML
+import scala.xml.{Node, XML}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
/**
@@ -102,38 +103,57 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: NoSuchElementException =>
- logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
- s"using the default schedulingMode: $schedulingMode")
- }
- }
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
+ val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
+ val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
+ val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
+ rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
+ private def getSchedulingModeValue(
+ poolNode: Node,
+ poolName: String,
+ defaultValue: SchedulingMode): SchedulingMode = {
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
+ val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
+ s"schedulingMode: $defaultValue for pool: $poolName"
+ try {
+ if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
+ SchedulingMode.withName(xmlSchedulingMode)
+ } else {
+ logWarning(warningMessage)
+ defaultValue
+ }
+ } catch {
+ case e: NoSuchElementException =>
+ logWarning(warningMessage)
+ defaultValue
+ }
+ }
+
+ private def getIntValue(
+ poolNode: Node,
+ poolName: String,
+ propertyName: String, defaultValue: Int): Int = {
+
+ val data = (poolNode \ propertyName).text.trim
+ try {
+ data.toInt
+ } catch {
+ case e: NumberFormatException =>
+ logWarning(s"Error while loading scheduler allocation file. " +
+ s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
+ s"$defaultValue for pool: $poolName")
+ defaultValue
+ }
+ }
+
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)