aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-18 20:15:33 +0800
committerCheng Lian <lian@databricks.com>2015-08-18 20:15:33 +0800
commit5723d26d7e677b89383de3fcf2c9a821b68a65b7 (patch)
tree6e1820a777b971ce02d5968b9edc379543cf4968
parentc34e9ff0eac2032283b959fe63b47cc30f28d21c (diff)
downloadspark-5723d26d7e677b89383de3fcf2c9a821b68a65b7.tar.gz
spark-5723d26d7e677b89383de3fcf2c9a821b68a65b7.tar.bz2
spark-5723d26d7e677b89383de3fcf2c9a821b68a65b7.zip
[SPARK-8118] [SQL] Redirects Parquet JUL logger via SLF4J
Parquet hard coded a JUL logger which always writes to stdout. This PR redirects it via SLF4j JUL bridge handler, so that we can control Parquet logs via `log4j.properties`. This solution is inspired by https://github.com/Parquet/parquet-mr/issues/390#issuecomment-46064909. Author: Cheng Lian <lian@databricks.com> Closes #8196 from liancheng/spark-8118/redirect-parquet-jul.
-rw-r--r--conf/log4j.properties.template2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala77
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala3
-rw-r--r--sql/hive/src/test/resources/log4j.properties7
5 files changed, 47 insertions, 43 deletions
diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template
index 27006e45e9..74c5cea944 100644
--- a/conf/log4j.properties.template
+++ b/conf/log4j.properties.template
@@ -10,6 +10,8 @@ log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 52fac18ba1..68169d48ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
-import java.util.logging.{Level, Logger => JLogger}
+import java.util.logging.{Logger => JLogger}
import java.util.{List => JList}
import scala.collection.JavaConversions._
@@ -31,22 +31,22 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _}
import org.apache.parquet.schema.MessageType
-import org.apache.parquet.{Log => ParquetLog}
+import org.apache.parquet.{Log => ApacheParquetLog}
+import org.slf4j.bridge.SLF4JBridgeHandler
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
-import org.apache.spark.rdd.RDD._
+import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
@@ -759,38 +759,39 @@ private[sql] object ParquetRelation extends Logging {
}.toOption
}
- def enableLogForwarding() {
- // Note: the org.apache.parquet.Log class has a static initializer that
- // sets the java.util.logging Logger for "org.apache.parquet". This
- // checks first to see if there's any handlers already set
- // and if not it creates them. If this method executes prior
- // to that class being loaded then:
- // 1) there's no handlers installed so there's none to
- // remove. But when it IS finally loaded the desired affect
- // of removing them is circumvented.
- // 2) The parquet.Log static initializer calls setUseParentHandlers(false)
- // undoing the attempt to override the logging here.
- //
- // Therefore we need to force the class to be loaded.
- // This should really be resolved by Parquet.
- Utils.classForName(classOf[ParquetLog].getName)
-
- // Note: Logger.getLogger("parquet") has a default logger
- // that appends to Console which needs to be cleared.
- val parquetLogger = JLogger.getLogger(classOf[ParquetLog].getPackage.getName)
- parquetLogger.getHandlers.foreach(parquetLogger.removeHandler)
- parquetLogger.setUseParentHandlers(true)
-
- // Disables a WARN log message in ParquetOutputCommitter. We first ensure that
- // ParquetOutputCommitter is loaded and the static LOG field gets initialized.
- // See https://issues.apache.org/jira/browse/SPARK-5968 for details
- Utils.classForName(classOf[ParquetOutputCommitter].getName)
- JLogger.getLogger(classOf[ParquetOutputCommitter].getName).setLevel(Level.OFF)
-
- // Similar as above, disables a unnecessary WARN log message in ParquetRecordReader.
- // See https://issues.apache.org/jira/browse/PARQUET-220 for details
- Utils.classForName(classOf[ParquetRecordReader[_]].getName)
- JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF)
+ // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
+ // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
+ // references to loggers in both parquet-mr <= 1.6 and >= 1.7
+ val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
+ val parquetLogger: JLogger = JLogger.getLogger("parquet")
+
+ // Parquet initializes its own JUL logger in a static block which always prints to stdout. Here
+ // we redirect the JUL logger via SLF4J JUL bridge handler.
+ val redirectParquetLogsViaSLF4J: Unit = {
+ def redirect(logger: JLogger): Unit = {
+ logger.getHandlers.foreach(logger.removeHandler)
+ logger.setUseParentHandlers(false)
+ logger.addHandler(new SLF4JBridgeHandler)
+ }
+
+ // For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
+ // scalastyle:off classforname
+ Class.forName(classOf[ApacheParquetLog].getName)
+ // scalastyle:on classforname
+ redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))
+
+ // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
+ // namespace.
+ try {
+ // scalastyle:off classforname
+ Class.forName("parquet.Log")
+ // scalastyle:on classforname
+ redirect(JLogger.getLogger("parquet"))
+ } catch { case _: Throwable =>
+ // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly jar
+ // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
+ // should be removed after this issue is fixed.
+ }
}
// The parquet compression short names
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
index 3191cf3d12..ed89aa27aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
@@ -52,7 +52,6 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
}
log.debug(s"write support initialized for requested schema $attributes")
- ParquetRelation.enableLogForwarding()
new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
index 019db34fc6..42376ef7a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
@@ -104,7 +104,6 @@ private[parquet] object ParquetTypesConverter extends Logging {
extraMetadata,
"Spark")
- ParquetRelation.enableLogForwarding()
ParquetFileWriter.writeMetadataFile(
conf,
path,
@@ -140,8 +139,6 @@ private[parquet] object ParquetTypesConverter extends Logging {
(name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
}
- ParquetRelation.enableLogForwarding()
-
// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
// single row group to read schema related metadata. Notice that we are making assumptions that
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
index 92eaf1f279..fea3404769 100644
--- a/sql/hive/src/test/resources/log4j.properties
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -48,9 +48,14 @@ log4j.logger.hive.log=OFF
log4j.additivity.parquet.hadoop.ParquetRecordReader=false
log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
+log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false
+log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF
+
+log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false
+log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF
+
log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF
log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
-