aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-01-01 21:25:00 -0500
committerMatei Zaharia <matei@databricks.com>2014-01-01 21:25:00 -0500
commit45ff8f413d9959b7f464176cd20dc56db3f711af (patch)
tree25e5f1c900a0d8f955f93d1601bddec003d072e4 /core
parent0e5b2adb5c4f2ab1bba275bb3cb2dc2643828f4f (diff)
parentc1d928a897f8daed5d7e74f4af476b67046f348d (diff)
downloadspark-45ff8f413d9959b7f464176cd20dc56db3f711af.tar.gz
spark-45ff8f413d9959b7f464176cd20dc56db3f711af.tar.bz2
spark-45ff8f413d9959b7f464176cd20dc56db3f711af.zip
Merge remote-tracking branch 'apache/master' into conf2
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/resources/org/apache/spark/default-log4j.properties8
-rw-r--r--core/src/main/scala/org/apache/spark/HttpServer.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala2
11 files changed, 45 insertions, 21 deletions
diff --git a/core/src/main/resources/org/apache/spark/default-log4j.properties b/core/src/main/resources/org/apache/spark/default-log4j.properties
new file mode 100644
index 0000000000..d72dbadc39
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/default-log4j.properties
@@ -0,0 +1,8 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index cdfc9dd54e..69a738dc44 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
+ logInfo("Starting HTTP Server")
server = new Server()
val connector = new SocketConnector
connector.setMaxIdleTime(60*1000)
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 6a973ea495..d519fc5a29 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -17,8 +17,8 @@
package org.apache.spark
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
+import org.apache.log4j.{LogManager, PropertyConfigurator}
+import org.slf4j.{Logger, LoggerFactory}
/**
* Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
@@ -33,6 +33,7 @@ trait Logging {
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
+ initializeIfNecessary()
var className = this.getClass.getName
// Ignore trailing $'s in the class names for Scala objects
if (className.endsWith("$")) {
@@ -89,7 +90,37 @@ trait Logging {
log.isTraceEnabled
}
- // Method for ensuring that logging is initialized, to avoid having multiple
- // threads do it concurrently (as SLF4J initialization is not thread safe).
- protected def initLogging() { log }
+ private def initializeIfNecessary() {
+ if (!Logging.initialized) {
+ Logging.initLock.synchronized {
+ if (!Logging.initialized) {
+ initializeLogging()
+ }
+ }
+ }
+ }
+
+ private def initializeLogging() {
+ // If Log4j doesn't seem initialized, load a default properties file
+ val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ val defaultLogProps = "org/apache/spark/default-log4j.properties"
+ val classLoader = this.getClass.getClassLoader
+ Option(classLoader.getResource(defaultLogProps)) match {
+ case Some(url) => PropertyConfigurator.configure(url)
+ case None => System.err.println(s"Spark was unable to load $defaultLogProps")
+ }
+ log.info(s"Using Spark's default log4j profile: $defaultLogProps")
+ }
+ Logging.initialized = true
+
+ // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
+ // and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
+ log
+ }
+}
+
+object Logging {
+ @volatile private var initialized = false
+ val initLock = new Object()
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fbc7a78bf5..46874c41a2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -137,9 +137,6 @@ class SparkContext(
val isLocal = (master == "local" || master.startsWith("local["))
- // Ensure logging is initialized before we spawn any threads
- initLogging()
-
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.create(
conf,
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2400154648..5b70165c35 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -48,8 +48,6 @@ private[spark] class Executor(
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
- initLogging()
-
// No ip or host:port - just hostname
Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
// must not have port specified.
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index caab748d60..6f9f29969e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -26,7 +26,6 @@ import scala.util.matching.Regex
import org.apache.spark.Logging
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
- initLogging()
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index ac29816f19..0e41c73ce7 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -64,7 +64,6 @@ import org.apache.spark.metrics.source.Source
*/
private[spark] class MetricsSystem private (val instance: String,
conf: SparkConf) extends Logging {
- initLogging()
val confFile = conf.getOrElse("spark.metrics.conf", null)
val metricsConfig = new MetricsConfig(Option(confFile))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 73a1da2de6..dbbeeb39eb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
private val akkaTimeout = AkkaUtils.askTimeout(conf)
- initLogging()
-
val slaveTimeout = conf.getOrElse("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequency(conf) * 3)).toLong
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 0c66addf9d..21f003609b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -30,7 +30,6 @@ import org.apache.spark.util.Utils
* TODO: Use event model.
*/
private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
- initLogging()
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
@@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
private[spark] object BlockManagerWorker extends Logging {
private var blockManagerWorker: BlockManagerWorker = null
- initLogging()
-
def startBlockManagerWorker(manager: BlockManager) {
blockManagerWorker = new BlockManagerWorker(manager)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
index 6ce9127c74..a06f50a0ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockMessageArray.scala
@@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
def length = blockMessages.length
- initLogging()
-
def set(bufferMessage: BufferMessage) {
val startTime = System.currentTimeMillis
val newBlockMessages = new ArrayBuffer[BlockMessage]()
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index f25d921d3f..70bfb81661 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -26,8 +26,6 @@ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
- initLogging()
-
var checkpointDir: File = _
val partitioner = new HashPartitioner(2)