aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-06-29 08:17:27 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-06-29 08:17:27 -0500
commit272a2f78f3ff801b94a81fa8fcc6633190eaa2f4 (patch)
tree116630944456f0dbc3599d82694239c4e2a67c54 /yarn
parent393db655c3c43155305fbba1b2f8c48a95f18d93 (diff)
downloadspark-272a2f78f3ff801b94a81fa8fcc6633190eaa2f4.tar.gz
spark-272a2f78f3ff801b94a81fa8fcc6633190eaa2f4.tar.bz2
spark-272a2f78f3ff801b94a81fa8fcc6633190eaa2f4.zip
[SPARK-15990][YARN] Add rolling log aggregation support for Spark on yarn
## What changes were proposed in this pull request? Yarn supports rolling log aggregation since 2.6, previously log will only be aggregated to HDFS after application is finished, it is quite painful for long running applications like Spark Streaming, thriftserver. Also out of disk problem will be occurred when log file is too large. So here propose to add support of rolling log aggregation for Spark on yarn. One limitation for this is that log4j should be set to change to file appender, now in Spark itself uses console appender by default, in which file will not be created again once removed after aggregation. But I think lots of production users should have changed their log4j configuration instead of default on, so this is not a big problem. ## How was this patch tested? Manually verified with Hadoop 2.7.1. Author: jerryshao <sshao@hortonworks.com> Closes #13712 from jerryshao/SPARK-15990.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala27
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala16
2 files changed, 43 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9bb369549d..d63579ff82 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -271,6 +271,33 @@ private[spark] class Client(
appContext.setResource(capability)
}
+ sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
+ try {
+ val logAggregationContext = Records.newRecord(
+ Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
+ .asInstanceOf[Object]
+
+ val setRolledLogsIncludePatternMethod =
+ logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
+ setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)
+
+ sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
+ val setRolledLogsExcludePatternMethod =
+ logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
+ setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
+ }
+
+ val setLogAggregationContextMethod =
+ appContext.getClass.getMethod("setLogAggregationContext",
+ Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
+ setLogAggregationContextMethod.invoke(appContext, logAggregationContext)
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
+ s"does not support it", e)
+ }
+ }
+
appContext
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index ad2412e025..49c0177ab2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -243,6 +243,22 @@ package object config {
.toSequence
.createWithDefault(Nil)
+ /* Rolled log aggregation configuration. */
+
+ private[spark] val ROLLED_LOG_INCLUDE_PATTERN =
+ ConfigBuilder("spark.yarn.rolledLog.includePattern")
+ .doc("Java Regex to filter the log files which match the defined include pattern and those " +
+ "log files will be aggregated in a rolling fashion.")
+ .stringConf
+ .createOptional
+
+ private[spark] val ROLLED_LOG_EXCLUDE_PATTERN =
+ ConfigBuilder("spark.yarn.rolledLog.excludePattern")
+ .doc("Java Regex to filter the log files which match the defined exclude pattern and those " +
+ "log files will not be aggregated in a rolling fashion.")
+ .stringConf
+ .createOptional
+
/* Private configs. */
private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file")