aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala58
-rw-r--r--sql/core/src/test/resources/log4j.properties4
-rw-r--r--sql/hive/src/test/resources/log4j.properties4
4 files changed, 90 insertions, 48 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
new file mode 100644
index 0000000000..7a7f32ee1e
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.Serializable;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
+
+import org.apache.parquet.Log;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using
+// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly
+final class ParquetLogRedirector implements Serializable {
+ // Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is
+ // especially important for Serializable classes where fields are set but constructors are
+ // ignored
+ static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector();
+
+ // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
+ // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
+ // references to loggers in both parquet-mr <= 1.6 and 1.7/1.8
+ private static final Logger apacheParquetLogger =
+ Logger.getLogger(Log.class.getPackage().getName());
+ private static final Logger parquetLogger = Logger.getLogger("parquet");
+
+ static {
+ // For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace.
+ try {
+ Class.forName(Log.class.getName());
+ redirect(Logger.getLogger(Log.class.getPackage().getName()));
+ } catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
+ // namespace.
+ try {
+ Class.forName("parquet.Log");
+ redirect(Logger.getLogger("parquet"));
+ } catch (Throwable t) {
+ // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
+ // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
+ // should be removed after this issue is fixed.
+ }
+ }
+
+ private ParquetLogRedirector() {
+ }
+
+ private static void redirect(Logger logger) {
+ for (Handler handler : logger.getHandlers()) {
+ logger.removeHandler(handler);
+ }
+ logger.setUseParentHandlers(false);
+ logger.addHandler(new SLF4JBridgeHandler());
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b8ea7f40c4..031a0fe578 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
-import java.util.logging.{Logger => JLogger}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -29,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
-import org.slf4j.bridge.SLF4JBridgeHandler
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
@@ -56,6 +53,11 @@ class ParquetFileFormat
with DataSourceRegister
with Logging
with Serializable {
+ // Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
+ // ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
+ // is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
+ // here.
+ private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
override def shortName(): String = "parquet"
@@ -129,10 +131,14 @@ class ParquetFileFormat
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
}
- ParquetFileFormat.redirectParquetLogs()
-
new OutputWriterFactory {
- override def newInstance(
+ // This OutputWriterFactory instance is deserialized when writing Parquet files on the
+ // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
+ // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
+ // initialized.
+ private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
+
+ override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
@@ -673,44 +679,4 @@ object ParquetFileFormat extends Logging {
Failure(cause)
}.toOption
}
-
- // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
- // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
- // references to loggers in both parquet-mr <= 1.6 and >= 1.7
- val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
- val parquetLogger: JLogger = JLogger.getLogger("parquet")
-
- // Parquet initializes its own JUL logger in a static block which always prints to stdout. Here
- // we redirect the JUL logger via SLF4J JUL bridge handler.
- val redirectParquetLogsViaSLF4J: Unit = {
- def redirect(logger: JLogger): Unit = {
- logger.getHandlers.foreach(logger.removeHandler)
- logger.setUseParentHandlers(false)
- logger.addHandler(new SLF4JBridgeHandler)
- }
-
- // For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
- // scalastyle:off classforname
- Class.forName(classOf[ApacheParquetLog].getName)
- // scalastyle:on classforname
- redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))
-
- // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
- // namespace.
- try {
- // scalastyle:off classforname
- Class.forName("parquet.Log")
- // scalastyle:on classforname
- redirect(JLogger.getLogger("parquet"))
- } catch { case _: Throwable =>
- // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
- // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
- // should be removed after this issue is fixed.
- }
- }
-
- /**
- * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
- */
- def redirectParquetLogs(): Unit = {}
}
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index 33b9ecf1e2..25b8173821 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -53,5 +53,5 @@ log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF
# Parquet related logging
-log4j.logger.org.apache.parquet.hadoop=WARN
-log4j.logger.org.apache.spark.sql.parquet=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
index fea3404769..072bb25d30 100644
--- a/sql/hive/src/test/resources/log4j.properties
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -59,3 +59,7 @@ log4j.logger.hive.ql.metadata.Hive=OFF
log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
+
+# Parquet related logging
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR