aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-01 17:03:48 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-01 17:03:48 -0800
commitc1d928a897f8daed5d7e74f4af476b67046f348d (patch)
tree0c9fa7f5f2b4ec581ac608ad3ae96b78470bff05
parent9a0ff721c9e4c8f52aadfdde6ac2764d3cba9797 (diff)
parentf8d245bdfc703eefa4fd34795739a1a851031f5b (diff)
downloadspark-c1d928a897f8daed5d7e74f4af476b67046f348d.tar.gz
spark-c1d928a897f8daed5d7e74f4af476b67046f348d.tar.bz2
spark-c1d928a897f8daed5d7e74f4af476b67046f348d.zip
Merge pull request #312 from pwendell/log4j-fix-2
SPARK-1008: Logging improvments 1. Adds a default log4j file that gets loaded if users haven't specified a log4j file. 2. Isolates use of the tools assembly jar. I found this produced SLF4J warnings after building with SBT (and I've seen similar warnings on the mailing list).
-rw-r--r--core/src/main/resources/org/apache/spark/default-log4j.properties8
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala2
-rwxr-xr-xspark-class9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala3
19 files changed, 52 insertions, 37 deletions
diff --git a/core/src/main/resources/org/apache/spark/default-log4j.properties b/core/src/main/resources/org/apache/spark/default-log4j.properties
new file mode 100644
index 0000000000..d72dbadc39
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/default-log4j.properties
@@ -0,0 +1,8 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index cdfc9dd54e..69a738dc44 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
+ logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 6a973ea495..d519fc5a29 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -17,8 +17,8 @@
package org.apache.spark
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
+import org.apache.log4j.{LogManager, PropertyConfigurator}
+import org.slf4j.{Logger, LoggerFactory}
/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
@@ -33,6 +33,7 @@ trait Logging {
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
+ initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
@@ -89,7 +90,37 @@ trait Logging {
log.isTraceEnabled
}
- // Method for ensuring that logging is initialized, to avoid having multiple
- // threads do it concurrently (as SLF4J initialization is not thread safe).
- protected def initLogging() { log }
+ private def initializeIfNecessary() {
+ if (!Logging.initialized) {
+ Logging.initLock.synchronized {
+ if (!Logging.initialized) {
+ initializeLogging()
+ }
+ }
+ }
+ }
+
+ private def initializeLogging() {
+ // If Log4j doesn't seem initialized, load a default properties file
+ val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ val defaultLogProps = "org/apache/spark/default-log4j.properties"
+ val classLoader = this.getClass.getClassLoader
+ Option(classLoader.getResource(defaultLogProps)) match {
+ case Some(url) => PropertyConfigurator.configure(url)
+ case None => System.err.println(s"Spark was unable to load $defaultLogProps")
+ }
+ log.info(s"Using Spark's default log4j profile: $defaultLogProps")
+ }
+ Logging.initialized = true
+
+ // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
+ // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+ log
+ }
+}
+
+object Logging {
+ @volatile private var initialized = false
+ val initLock = new Object()
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7514ce58fb..d3903ce9e3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -88,9 +88,6 @@ class SparkContext(
scala.collection.immutable.Map())
extends Logging {
- // Ensure logging is initialized before we spawn any threads
- initLogging()
-
// Set Spark driver host and port system properties
if (System.getProperty("spark.driver.host") == null) {
System.setProperty("spark.driver.host", Utils.localHostName())
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0f19d7a96b..782be9a429 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -48,8 +48,6 @@ private[spark] class Executor(
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
- initLogging()
-
// No ip or host:port - just hostname
Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
// must not have port specified.
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index caab748d60..6f9f29969e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -26,7 +26,6 @@ import scala.util.matching.Regex
import org.apache.spark.Logging
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
- initLogging()
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index bec0c83be8..8e038ce98c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -63,7 +63,6 @@ import org.apache.spark.metrics.source.Source
* [options] is the specific property of this source or sink.
*/
private[spark] class MetricsSystem private (val instance: String) extends Logging {
- initLogging()
val confFile = System.getProperty("spark.metrics.conf")
val metricsConfig = new MetricsConfig(Option(confFile))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 21022e1cfb..e0eb02ce81 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
private val akkaTimeout = AkkaUtils.askTimeout
- initLogging()
-
val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 0c66addf9d..21f003609b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -30,7 +30,6 @@ import org.apache.spark.util.Utils
* TODO: Use event model.
*/
private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
- initLogging()
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
@@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
private[spark] object BlockManagerWorker extends Logging {
private var blockManagerWorker: BlockManagerWorker = null
- initLogging()
-
def startBlockManagerWorker(manager: BlockManager) {
blockManagerWorker = new BlockManagerWorker(manager)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 6ce9127c74..a06f50a0ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
def length = blockMessages.length
- initLogging()
-
def set(bufferMessage: BufferMessage) {
val startTime = System.currentTimeMillis
val newBlockMessages = new ArrayBuffer[BlockMessage]()
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index f25d921d3f..70bfb81661 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -26,8 +26,6 @@ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
- initLogging()
-
var checkpointDir: File = _
val partitioner = new HashPartitioner(2)
diff --git a/spark-class b/spark-class
index 802e4aa104..1858ea6247 100755
--- a/spark-class
+++ b/spark-class
@@ -129,11 +129,16 @@ fi
# Compute classpath using external script
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
-CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
+
+if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then
+ CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
+fi
if $cygwin; then
CLASSPATH=`cygpath -wp $CLASSPATH`
- export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR`
+ if [ "$1" == "org.apache.spark.tools.JavaAPICompletenessChecker" ]; then
+ export SPARK_TOOLS_JAR=`cygpath -w $SPARK_TOOLS_JAR`
+ fi
fi
export CLASSPATH
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index a78d3965ee..8ebe09da0d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] (
@transient protected[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
- initLogging()
-
// =======================================================================
// Methods that should be implemented by subclasses of DStream
// =======================================================================
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index daed7ff7c3..a09b891956 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -24,7 +24,6 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.scheduler.Job
final private[streaming] class DStreamGraph extends Serializable with Logging {
- initLogging()
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8898fdcb7f..659da2665a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -94,8 +94,6 @@ class StreamingContext private (
*/
def this(path: String) = this(null, CheckpointReader.read(path), null)
- initLogging()
-
if (sc_ == null && cp_ == null) {
throw new Exception("Spark Streaming cannot be initialized with " +
"both SparkContext and checkpoint as null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index 5add20871e..8c7f42306d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
*/
abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
- initLogging()
-
lazy protected val env = SparkEnv.get
lazy protected val actor = env.actorSystem.actorOf(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 921a33a4cb..2eb3b3989e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -36,8 +36,6 @@ private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
- initLogging()
-
val ssc = jobScheduler.ssc
val graph = ssc.graph
val eventProcessorActor = ssc.env.actorSystem.actorOf(Props(new Actor {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9511ccfbed..488cc2f401 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -30,8 +30,6 @@ import org.apache.spark.streaming._
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
- initLogging()
-
val jobSets = new ConcurrentHashMap[Time, JobSet]
val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 4a3993e3e3..1559f7a9f7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration
private[streaming]
object MasterFailureTest extends Logging {
- initLogging()
@volatile var killed = false
@volatile var killCount = 0
@@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag](
*/
private[streaming]
class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
- initLogging()
override def run() {
try {
@@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread
private[streaming]
class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
extends Thread with Logging {
- initLogging()
override def run() {
val localTestDir = Files.createTempDir()