aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorEren Avsarogullari <erenavsarogullari@gmail.com>2017-02-10 08:32:36 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2017-02-10 08:32:36 -0800
commitdadff5f0789cce7cf3728a8adaab42118e5dc019 (patch)
tree306a697d5c9d12b139666f817efd4a7bfda4d525 /core/src/main/scala/org
parentc5a66356d431dc07dbd44540a495264fb19bd5d9 (diff)
downloadspark-dadff5f0789cce7cf3728a8adaab42118e5dc019.tar.gz
spark-dadff5f0789cce7cf3728a8adaab42118e5dc019.tar.bz2
spark-dadff5f0789cce7cf3728a8adaab42118e5dc019.zip
[SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Logging
Fair Scheduler Logging for the following cases can be useful for the user. 1. If **valid** `spark.scheduler.allocation.file` property is set, user can be informed and aware which scheduler file is processed when `SparkContext` initializes. 2. If **invalid** `spark.scheduler.allocation.file` property is set, currently, the following stacktrace is shown to user. In addition to this, more meaningful message can be shown to user by emphasizing the problem at building level of fair scheduler. Also other potential issues can be covered at this level as **Fair Scheduler can not be built. + exception stacktrace** ``` Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at java.io.FileInputStream.<init>(FileInputStream.java:93) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76) at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75) ``` 3. If `spark.scheduler.allocation.file` property is not set and **default** fair scheduler file (**fairscheduler.xml**) is found in classpath, it will be loaded but currently, user is not informed for using default file so logging can be useful as **Fair Scheduler file: fairscheduler.xml is found successfully and will be parsed.** 4. If **spark.scheduler.allocation.file** property is not set and **default** fair scheduler file does not exist in classpath, currently, user is not informed so logging can be useful as **No Fair Scheduler file found.** Also this PR is related with https://github.com/apache/spark/pull/15237 to emphasize fileName in warning logs when fair scheduler file has invalid minShare, weight or schedulingMode values. ## How was this patch tested? Added new Unit Tests. Author: erenavsarogullari <erenavsarogullari@gmail.com> Closes #16813 from erenavsarogullari/SPARK-19466.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala67
1 files changed, 46 insertions, 21 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index f8bee3eea5..e53c4fb5b4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
+import scala.util.control.NonFatal
import scala.xml.{Node, XML}
import org.apache.spark.SparkConf
@@ -55,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
+ val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
+ val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
@@ -69,19 +71,35 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val DEFAULT_WEIGHT = 1
override def buildPools() {
- var is: Option[InputStream] = None
+ var fileData: Option[(InputStream, String)] = None
try {
- is = Option {
- schedulerAllocFile.map { f =>
- new FileInputStream(f)
- }.getOrElse {
- Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+ fileData = schedulerAllocFile.map { f =>
+ val fis = new FileInputStream(f)
+ logInfo(s"Creating Fair Scheduler pools from $f")
+ Some((fis, f))
+ }.getOrElse {
+ val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+ if (is != null) {
+ logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
+ Some((is, DEFAULT_SCHEDULER_FILE))
+ } else {
+ logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
+ s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
+ s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
+ None
}
}
- is.foreach { i => buildFairSchedulerPool(i) }
+ fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
+ } catch {
+ case NonFatal(t) =>
+ val defaultMessage = "Error while building the fair scheduler pools"
+ val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
+ .getOrElse(defaultMessage)
+ logError(message, t)
+ throw t
} finally {
- is.foreach(_.close())
+ fileData.foreach { case (is, fileName) => is.close() }
}
// finally create "default" pool
@@ -93,24 +111,27 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
- logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
- private def buildFairSchedulerPool(is: InputStream) {
+ private def buildFairSchedulerPool(is: InputStream, fileName: String) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
- val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
- val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
+ val schedulingMode = getSchedulingModeValue(poolNode, poolName,
+ DEFAULT_SCHEDULING_MODE, fileName)
+ val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY,
+ DEFAULT_MINIMUM_SHARE, fileName)
+ val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
+ DEFAULT_WEIGHT, fileName)
rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
@@ -118,11 +139,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getSchedulingModeValue(
poolNode: Node,
poolName: String,
- defaultValue: SchedulingMode): SchedulingMode = {
+ defaultValue: SchedulingMode,
+ fileName: String): SchedulingMode = {
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
- val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
- s"schedulingMode: $defaultValue for pool: $poolName"
+ val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
+ s"Fair Scheduler configuration file: $fileName, using " +
+ s"the default schedulingMode: $defaultValue for pool: $poolName"
try {
if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
SchedulingMode.withName(xmlSchedulingMode)
@@ -140,14 +163,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
private def getIntValue(
poolNode: Node,
poolName: String,
- propertyName: String, defaultValue: Int): Int = {
+ propertyName: String,
+ defaultValue: Int,
+ fileName: String): Int = {
val data = (poolNode \ propertyName).text.trim
try {
data.toInt
} catch {
case e: NumberFormatException =>
- logWarning(s"Error while loading scheduler allocation file. " +
+ logWarning(s"Error while loading fair scheduler configuration from $fileName: " +
s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
s"$defaultValue for pool: $poolName")
defaultValue
@@ -166,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
- logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+ logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}