diff options
author | erenavsarogullari <erenavsarogullari@gmail.com> | 2017-02-06 08:24:17 -0600 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2017-02-06 08:24:17 -0600 |
commit | 7beb227cc8a4674e24cb1aaa278287ecc8194e5d (patch) | |
tree | 69465e065152f63260a660807011a93af6a1dfa1 /core/src/main/scala | |
parent | 7730426cb95eec2652a9ea979ae2c4faf7e585f2 (diff) | |
download | spark-7beb227cc8a4674e24cb1aaa278287ecc8194e5d.tar.gz spark-7beb227cc8a4674e24cb1aaa278287ecc8194e5d.tar.bz2 spark-7beb227cc8a4674e24cb1aaa278287ecc8194e5d.zip |
[SPARK-17663][CORE] SchedulableBuilder should handle invalid data access via scheduler.allocation.file
## What changes were proposed in this pull request?
If `spark.scheduler.allocation.file` has invalid `minShare` or/and `weight` values, these cause :
- `NumberFormatException` due to `toInt` function
- `SparkContext` can not be initialized.
- It does not show meaningful error message to user.
In a nutshell, this functionality can be more robust by selecting one of the following flows :
**1-** Currently, if `schedulingMode` has an invalid value, a warning message is logged and default value is set as `FIFO`. Same pattern can be used for `minShare`(default: 0) and `weight`(default: 1) as well
**2-** Meaningful error message can be shown to the user for all invalid cases.
PR offers :
- `schedulingMode` handles just empty values. It also needs to be supported for **whitespace**, **non-uppercase**(fair, FaIr etc...) or `SchedulingMode.NONE` cases by setting default value(`FIFO`)
- `minShare` and `weight` handle just empty values. They also need to be supported for **non-integer** cases by setting default values.
- Some refactoring of `PoolSuite`.
**Code to Reproduce :**
```
val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local")
conf.set("spark.scheduler.mode", "FAIR")
conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml")
val sc = new SparkContext(conf)
```
**fairscheduler-invalid-data.xml :**
```
<allocations>
<pool name="production">
<schedulingMode>FIFO</schedulingMode>
<weight>invalid_weight</weight>
<minShare>2</minShare>
</pool>
</allocations>
```
**Stacktrace :**
```
Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127)
at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102)
```
## How was this patch tested?
Added Unit Test Case.
Author: erenavsarogullari <erenavsarogullari@gmail.com>
Closes #15237 from erenavsarogullari/SPARK-17663.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala | 70 |
1 files changed, 45 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 96325a0329..f8bee3eea5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -20,10 +20,11 @@ package org.apache.spark.scheduler import java.io.{FileInputStream, InputStream} import java.util.{NoSuchElementException, Properties} -import scala.xml.XML +import scala.xml.{Node, XML} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils /** @@ -102,38 +103,57 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text - var schedulingMode = DEFAULT_SCHEDULING_MODE - var minShare = DEFAULT_MINIMUM_SHARE - var weight = DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try { - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } catch { - case e: NoSuchElementException => - logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " + - s"using the default schedulingMode: $schedulingMode") - } - } - val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text - if (xmlMinShare != "") { - minShare = xmlMinShare.toInt - } + val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE) + val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE) + val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT) - val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } + rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight)) - val pool = new Pool(poolName, schedulingMode, minShare, weight) - rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } } + private def getSchedulingModeValue( + poolNode: Node, + poolName: String, + defaultValue: SchedulingMode): SchedulingMode = { + + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase + val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " + + s"schedulingMode: $defaultValue for pool: $poolName" + try { + if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) { + SchedulingMode.withName(xmlSchedulingMode) + } else { + logWarning(warningMessage) + defaultValue + } + } catch { + case e: NoSuchElementException => + logWarning(warningMessage) + defaultValue + } + } + + private def getIntValue( + poolNode: Node, + poolName: String, + propertyName: String, defaultValue: Int): Int = { + + val data = (poolNode \ propertyName).text.trim + try { + data.toInt + } catch { + case e: NumberFormatException => + logWarning(s"Error while loading scheduler allocation file. " + + s"$propertyName is blank or invalid: $data, using the default $propertyName: " + + s"$defaultValue for pool: $poolName") + defaultValue + } + } + override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName) |