aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMichel Lemay <mlemay@gmail.com>2015-08-12 16:17:58 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-12 16:41:35 -0700
commitab7e721cfec63155641e81e72b4ad43cf6a7d4c7 (patch)
tree2721b5b0b563fdbb3f6f6a2d6bca6ff838620e8d /streaming
parent738f353988dbf02704bd63f5e35d94402c59ed79 (diff)
downloadspark-ab7e721cfec63155641e81e72b4ad43cf6a7d4c7.tar.gz
spark-ab7e721cfec63155641e81e72b4ad43cf6a7d4c7.tar.bz2
spark-ab7e721cfec63155641e81e72b4ad43cf6a7d4c7.zip
[SPARK-9826] [CORE] Fix cannot use custom classes in log4j.properties
Refactor Utils class and create ShutdownHookManager. NOTE: Wasn't able to run /dev/run-tests on windows machine. Manual tests were conducted locally using custom log4j.properties file with Redis appender and logstash formatter (bundled in the fat-jar submitted to spark) ex: log4j.rootCategory=WARN,console,redis log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.spark.graphx.Pregel=INFO log4j.appender.redis=com.ryantenney.log4j.FailoverRedisAppender log4j.appender.redis.endpoints=hostname:port log4j.appender.redis.key=mykey log4j.appender.redis.alwaysBatch=false log4j.appender.redis.layout=net.logstash.log4j.JSONEventLayoutV1 Author: michellemay <mlemay@gmail.com> Closes #8109 from michellemay/SPARK-9826.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 177e710ace..b496d1f341 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -604,7 +604,7 @@ class StreamingContext private[streaming] (
}
StreamingContext.setActiveContext(this)
}
- shutdownHookRef = Utils.addShutdownHook(
+ shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
@@ -691,7 +691,7 @@ class StreamingContext private[streaming] (
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
- Utils.removeShutdownHook(shutdownHookRef)
+ ShutdownHookManager.removeShutdownHook(shutdownHookRef)
}
logInfo("StreamingContext stopped successfully")
}
@@ -725,7 +725,7 @@ object StreamingContext extends Logging {
*/
private val ACTIVATION_LOCK = new Object()
- private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
+ private val SHUTDOWN_HOOK_PRIORITY = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
private val activeContext = new AtomicReference[StreamingContext](null)