aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Allman <michael@videoamp.com>2016-11-10 13:41:13 -0800
committerReynold Xin <rxin@databricks.com>2016-11-10 13:41:13 -0800
commitb533fa2b205544b42dcebe0a6fee9d8275f6da7d (patch)
tree4bdbe8b190f81c68a6570b56c357bb3265d5e643 /sql
parent16eaad9daed0b633e6a714b5704509aa7107d6e5 (diff)
downloadspark-b533fa2b205544b42dcebe0a6fee9d8275f6da7d.tar.gz
spark-b533fa2b205544b42dcebe0a6fee9d8275f6da7d.tar.bz2
spark-b533fa2b205544b42dcebe0a6fee9d8275f6da7d.zip
[SPARK-17993][SQL] Fix Parquet log output redirection
(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17993) ## What changes were proposed in this pull request? PR #14690 broke parquet log output redirection for converted partitioned Hive tables. For example, when querying parquet files written by Parquet-mr 1.6.0 Spark prints a torrent of (harmless) warning messages from the Parquet reader: ``` Oct 18, 2016 7:42:18 PM WARNING: org.apache.parquet.CorruptStatistics: Ignoring statistics because created_by could not be parsed (see PARQUET-251): parquet-mr version 1.6.0 org.apache.parquet.VersionParser$VersionParseException: Could not parse created_by: parquet-mr version 1.6.0 using format: (.+) version ((.*) )?\(build ?(.*)\) at org.apache.parquet.VersionParser.parse(VersionParser.java:112) at org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:60) at org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:263) at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:583) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:513) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:270) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:225) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:162) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:372) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` This only happens during execution, not planning, and it doesn't matter what log level the `SparkContext` is set to. That's because Parquet (versions < 1.9) doesn't use slf4j for logging. Note, you can tell that log redirection is not working here because the log message format does not conform to the default Spark log message format. This is a regression I noted as something we needed to fix as a follow up. It appears that the problem arose because we removed the call to `inferSchema` during Hive table conversion. That call is what triggered the output redirection. ## How was this patch tested? I tested this manually in four ways: 1. Executing `spark.sqlContext.range(10).selectExpr("id as a").write.mode("overwrite").parquet("test")`. 2. Executing `spark.read.format("parquet").load(legacyParquetFile).show` for a Parquet file `legacyParquetFile` written using Parquet-mr 1.6.0. 3. Executing `select * from legacy_parquet_table limit 1` for some unpartitioned Parquet-based Hive table written using Parquet-mr 1.6.0. 4. Executing `select * from legacy_partitioned_parquet_table where partcol=x limit 1` for some partitioned Parquet-based Hive table written using Parquet-mr 1.6.0. I ran each test with a new instance of `spark-shell` or `spark-sql`. Incidentally, I found that test case 3 was not a regression—redirection was not occurring in the master codebase prior to #14690. I spent some time working on a unit test, but based on my experience working on this ticket I feel that automated testing here is far from feasible. cc ericl dongjoon-hyun Author: Michael Allman <michael@videoamp.com> Closes #15538 from mallman/spark-17993-fix_parquet_log_redirection.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala58
-rw-r--r--sql/core/src/test/resources/log4j.properties4
-rw-r--r--sql/hive/src/test/resources/log4j.properties4
4 files changed, 90 insertions, 48 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
new file mode 100644
index 0000000000..7a7f32ee1e
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetLogRedirector.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.Serializable;
+import java.util.logging.Handler;
+import java.util.logging.Logger;
+
+import org.apache.parquet.Log;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using
+// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly
+final class ParquetLogRedirector implements Serializable {
+ // Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is
+ // especially important for Serializable classes where fields are set but constructors are
+ // ignored
+ static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector();
+
+ // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
+ // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
+ // references to loggers in both parquet-mr <= 1.6 and 1.7/1.8
+ private static final Logger apacheParquetLogger =
+ Logger.getLogger(Log.class.getPackage().getName());
+ private static final Logger parquetLogger = Logger.getLogger("parquet");
+
+ static {
+ // For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace.
+ try {
+ Class.forName(Log.class.getName());
+ redirect(Logger.getLogger(Log.class.getPackage().getName()));
+ } catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
+ // namespace.
+ try {
+ Class.forName("parquet.Log");
+ redirect(Logger.getLogger("parquet"));
+ } catch (Throwable t) {
+ // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
+ // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
+ // should be removed after this issue is fixed.
+ }
+ }
+
+ private ParquetLogRedirector() {
+ }
+
+ private static void redirect(Logger logger) {
+ for (Handler handler : logger.getHandlers()) {
+ logger.removeHandler(handler);
+ }
+ logger.setUseParentHandlers(false);
+ logger.addHandler(new SLF4JBridgeHandler());
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index b8ea7f40c4..031a0fe578 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
-import java.util.logging.{Logger => JLogger}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -29,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
-import org.slf4j.bridge.SLF4JBridgeHandler
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
@@ -56,6 +53,11 @@ class ParquetFileFormat
with DataSourceRegister
with Logging
with Serializable {
+ // Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
+ // ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
+ // is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
+ // here.
+ private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
override def shortName(): String = "parquet"
@@ -129,10 +131,14 @@ class ParquetFileFormat
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
}
- ParquetFileFormat.redirectParquetLogs()
-
new OutputWriterFactory {
- override def newInstance(
+ // This OutputWriterFactory instance is deserialized when writing Parquet files on the
+ // executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
+ // another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
+ // initialized.
+ private val parquetLogRedirector = ParquetLogRedirector.INSTANCE
+
+ override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
@@ -673,44 +679,4 @@ object ParquetFileFormat extends Logging {
Failure(cause)
}.toOption
}
-
- // JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
- // However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
- // references to loggers in both parquet-mr <= 1.6 and >= 1.7
- val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
- val parquetLogger: JLogger = JLogger.getLogger("parquet")
-
- // Parquet initializes its own JUL logger in a static block which always prints to stdout. Here
- // we redirect the JUL logger via SLF4J JUL bridge handler.
- val redirectParquetLogsViaSLF4J: Unit = {
- def redirect(logger: JLogger): Unit = {
- logger.getHandlers.foreach(logger.removeHandler)
- logger.setUseParentHandlers(false)
- logger.addHandler(new SLF4JBridgeHandler)
- }
-
- // For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
- // scalastyle:off classforname
- Class.forName(classOf[ApacheParquetLog].getName)
- // scalastyle:on classforname
- redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))
-
- // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
- // namespace.
- try {
- // scalastyle:off classforname
- Class.forName("parquet.Log")
- // scalastyle:on classforname
- redirect(JLogger.getLogger("parquet"))
- } catch { case _: Throwable =>
- // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
- // when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
- // should be removed after this issue is fixed.
- }
- }
-
- /**
- * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
- */
- def redirectParquetLogs(): Unit = {}
}
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
index 33b9ecf1e2..25b8173821 100644
--- a/sql/core/src/test/resources/log4j.properties
+++ b/sql/core/src/test/resources/log4j.properties
@@ -53,5 +53,5 @@ log4j.additivity.hive.ql.metadata.Hive=false
log4j.logger.hive.ql.metadata.Hive=OFF
# Parquet related logging
-log4j.logger.org.apache.parquet.hadoop=WARN
-log4j.logger.org.apache.spark.sql.parquet=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
index fea3404769..072bb25d30 100644
--- a/sql/hive/src/test/resources/log4j.properties
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -59,3 +59,7 @@ log4j.logger.hive.ql.metadata.Hive=OFF
log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
+
+# Parquet related logging
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR