From 5723d26d7e677b89383de3fcf2c9a821b68a65b7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 18 Aug 2015 20:15:33 +0800 Subject: [SPARK-8118] [SQL] Redirects Parquet JUL logger via SLF4J Parquet hard coded a JUL logger which always writes to stdout. This PR redirects it via SLF4j JUL bridge handler, so that we can control Parquet logs via `log4j.properties`. This solution is inspired by https://github.com/Parquet/parquet-mr/issues/390#issuecomment-46064909. Author: Cheng Lian Closes #8196 from liancheng/spark-8118/redirect-parquet-jul. --- conf/log4j.properties.template | 2 + .../datasources/parquet/ParquetRelation.scala | 77 +++++++++++----------- .../datasources/parquet/ParquetTableSupport.scala | 1 - .../parquet/ParquetTypesConverter.scala | 3 - sql/hive/src/test/resources/log4j.properties | 7 +- 5 files changed, 47 insertions(+), 43 deletions(-) diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template index 27006e45e9..74c5cea944 100644 --- a/conf/log4j.properties.template +++ b/conf/log4j.properties.template @@ -10,6 +10,8 @@ log4j.logger.org.spark-project.jetty=WARN log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 52fac18ba1..68169d48ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.net.URI -import java.util.logging.{Level, Logger => JLogger} +import java.util.logging.{Logger => JLogger} import java.util.{List => JList} import scala.collection.JavaConversions._ @@ -31,22 +31,22 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _} import org.apache.parquet.schema.MessageType -import org.apache.parquet.{Log => ParquetLog} +import org.apache.parquet.{Log => ApacheParquetLog} +import org.slf4j.bridge.SLF4JBridgeHandler -import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD} -import org.apache.spark.rdd.RDD._ +import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { @@ -759,38 +759,39 @@ private[sql] object ParquetRelation extends Logging { }.toOption } - def enableLogForwarding() { - // Note: the org.apache.parquet.Log class has a static initializer that - // sets the java.util.logging Logger for "org.apache.parquet". This - // checks first to see if there's any handlers already set - // and if not it creates them. If this method executes prior - // to that class being loaded then: - // 1) there's no handlers installed so there's none to - // remove. But when it IS finally loaded the desired affect - // of removing them is circumvented. - // 2) The parquet.Log static initializer calls setUseParentHandlers(false) - // undoing the attempt to override the logging here. - // - // Therefore we need to force the class to be loaded. - // This should really be resolved by Parquet. - Utils.classForName(classOf[ParquetLog].getName) - - // Note: Logger.getLogger("parquet") has a default logger - // that appends to Console which needs to be cleared. - val parquetLogger = JLogger.getLogger(classOf[ParquetLog].getPackage.getName) - parquetLogger.getHandlers.foreach(parquetLogger.removeHandler) - parquetLogger.setUseParentHandlers(true) - - // Disables a WARN log message in ParquetOutputCommitter. We first ensure that - // ParquetOutputCommitter is loaded and the static LOG field gets initialized. - // See https://issues.apache.org/jira/browse/SPARK-5968 for details - Utils.classForName(classOf[ParquetOutputCommitter].getName) - JLogger.getLogger(classOf[ParquetOutputCommitter].getName).setLevel(Level.OFF) - - // Similar as above, disables a unnecessary WARN log message in ParquetRecordReader. - // See https://issues.apache.org/jira/browse/PARQUET-220 for details - Utils.classForName(classOf[ParquetRecordReader[_]].getName) - JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF) + // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC. + // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep + // references to loggers in both parquet-mr <= 1.6 and >= 1.7 + val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName) + val parquetLogger: JLogger = JLogger.getLogger("parquet") + + // Parquet initializes its own JUL logger in a static block which always prints to stdout. Here + // we redirect the JUL logger via SLF4J JUL bridge handler. + val redirectParquetLogsViaSLF4J: Unit = { + def redirect(logger: JLogger): Unit = { + logger.getHandlers.foreach(logger.removeHandler) + logger.setUseParentHandlers(false) + logger.addHandler(new SLF4JBridgeHandler) + } + + // For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace. + // scalastyle:off classforname + Class.forName(classOf[ApacheParquetLog].getName) + // scalastyle:on classforname + redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)) + + // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet` + // namespace. + try { + // scalastyle:off classforname + Class.forName("parquet.Log") + // scalastyle:on classforname + redirect(JLogger.getLogger("parquet")) + } catch { case _: Throwable => + // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly jar + // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block + // should be removed after this issue is fixed. + } } // The parquet compression short names diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala index 3191cf3d12..ed89aa27aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala @@ -52,7 +52,6 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo } log.debug(s"write support initialized for requested schema $attributes") - ParquetRelation.enableLogForwarding() new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala index 019db34fc6..42376ef7a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala @@ -104,7 +104,6 @@ private[parquet] object ParquetTypesConverter extends Logging { extraMetadata, "Spark") - ParquetRelation.enableLogForwarding() ParquetFileWriter.writeMetadataFile( conf, path, @@ -140,8 +139,6 @@ private[parquet] object ParquetTypesConverter extends Logging { (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE } - ParquetRelation.enableLogForwarding() - // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row // groups. Since Parquet schema is replicated among all row groups, we only need to touch a // single row group to read schema related metadata. Notice that we are making assumptions that diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties index 92eaf1f279..fea3404769 100644 --- a/sql/hive/src/test/resources/log4j.properties +++ b/sql/hive/src/test/resources/log4j.properties @@ -48,9 +48,14 @@ log4j.logger.hive.log=OFF log4j.additivity.parquet.hadoop.ParquetRecordReader=false log4j.logger.parquet.hadoop.ParquetRecordReader=OFF +log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false +log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF + +log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false +log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF + log4j.additivity.hive.ql.metadata.Hive=false log4j.logger.hive.ql.metadata.Hive=OFF log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR - -- cgit v1.2.3