diff options
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 27 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala | 16 |
2 files changed, 43 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9bb369549d..d63579ff82 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -271,6 +271,33 @@ private[spark] class Client( appContext.setResource(capability) } + sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern => + try { + val logAggregationContext = Records.newRecord( + Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) + .asInstanceOf[Object] + + val setRolledLogsIncludePatternMethod = + logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String]) + setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern) + + sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern => + val setRolledLogsExcludePatternMethod = + logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String]) + setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern) + } + + val setLogAggregationContextMethod = + appContext.getClass.getMethod("setLogAggregationContext", + Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext")) + setLogAggregationContextMethod.invoke(appContext, logAggregationContext) + } catch { + case NonFatal(e) => + logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " + + s"does not support it", e) + } + } + appContext } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index ad2412e025..49c0177ab2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -243,6 +243,22 @@ package object config { .toSequence .createWithDefault(Nil) + /* Rolled log aggregation configuration. */ + + private[spark] val ROLLED_LOG_INCLUDE_PATTERN = + ConfigBuilder("spark.yarn.rolledLog.includePattern") + .doc("Java Regex to filter the log files which match the defined include pattern and those " + + "log files will be aggregated in a rolling fashion.") + .stringConf + .createOptional + + private[spark] val ROLLED_LOG_EXCLUDE_PATTERN = + ConfigBuilder("spark.yarn.rolledLog.excludePattern") + .doc("Java Regex to filter the log files which match the defined exclude pattern and those " + + "log files will not be aggregated in a rolling fashion.") + .stringConf + .createOptional + /* Private configs. */ private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") |