diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-06-28 13:01:18 +0800 |
---|---|---|
committer | Cheng Lian <lian@databricks.com> | 2016-06-28 13:01:18 +0800 |
commit | a0da854fb3748aca0128377f0955600cb7a2b5bc (patch) | |
tree | 6570664b9cd0c9cf53f6f4949484dabd6164c915 /sql/core | |
parent | 50fdd866b55cb9b51427095e56b2aafea12a7c23 (diff) | |
download | spark-a0da854fb3748aca0128377f0955600cb7a2b5bc.tar.gz spark-a0da854fb3748aca0128377f0955600cb7a2b5bc.tar.bz2 spark-a0da854fb3748aca0128377f0955600cb7a2b5bc.zip |
[SPARK-16221][SQL] Redirect Parquet JUL logger via SLF4J for WRITE operations
## What changes were proposed in this pull request?
[SPARK-8118](https://github.com/apache/spark/pull/8196) implements redirecting Parquet JUL logger via SLF4J, but it is currently applied only when READ operations occurs. If users use only WRITE operations, there occurs many Parquet logs.
This PR makes the redirection work on WRITE operations, too.
**Before**
```scala
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Jun 26, 2016 9:04:38 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: Compression: SNAPPY
............ about 70 lines Parquet Log .............
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
............ about 70 lines Parquet Log .............
```
**After**
```scala
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p")
```
This PR also fixes some typos.
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13918 from dongjoon-hyun/SPARK-16221.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2cce3db9a6..80002d4204 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -129,6 +129,8 @@ private[sql] class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } + ParquetFileFormat.redirectParquetLogs() + new OutputWriterFactory { override def newInstance( path: String, @@ -468,9 +470,9 @@ private[sql] class ParquetOutputWriterFactory( override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter { // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter - private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) + private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) private val hadoopAttemptContext = new TaskAttemptContextImpl( - serializableConf.value, hadoopTaskAttempId) + serializableConf.value, hadoopTaskAttemptId) // Instance of ParquetRecordWriter that does not use OutputCommitter private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) @@ -505,7 +507,7 @@ private[sql] class ParquetOutputWriterFactory( dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { throw new UnsupportedOperationException( - "this verison of newInstance not supported for " + + "this version of newInstance not supported for " + "ParquetOutputWriterFactory") } } @@ -665,7 +667,7 @@ private[sql] object ParquetFileFormat extends Logging { Some(Try(DataType.fromJson(serializedSchema.get)) .recover { case _: Throwable => logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + "falling back to the deprecated DataType.fromCaseClassString parser.") LegacyTypeStringParser.parse(serializedSchema.get) } @@ -880,7 +882,7 @@ private[sql] object ParquetFileFormat extends Logging { Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { case _: Throwable => logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + "falling back to the deprecated DataType.fromCaseClassString parser.") LegacyTypeStringParser.parse(schemaString).asInstanceOf[StructType] }.recoverWith { @@ -926,4 +928,9 @@ private[sql] object ParquetFileFormat extends Logging { // should be removed after this issue is fixed. } } + + /** + * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`. + */ + def redirectParquetLogs(): Unit = {} } |