aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-06-28 13:01:18 +0800
committerCheng Lian <lian@databricks.com>2016-06-28 13:01:18 +0800
commita0da854fb3748aca0128377f0955600cb7a2b5bc (patch)
tree6570664b9cd0c9cf53f6f4949484dabd6164c915
parent50fdd866b55cb9b51427095e56b2aafea12a7c23 (diff)
downloadspark-a0da854fb3748aca0128377f0955600cb7a2b5bc.tar.gz
spark-a0da854fb3748aca0128377f0955600cb7a2b5bc.tar.bz2
spark-a0da854fb3748aca0128377f0955600cb7a2b5bc.zip
[SPARK-16221][SQL] Redirect Parquet JUL logger via SLF4J for WRITE operations
## What changes were proposed in this pull request? [SPARK-8118](https://github.com/apache/spark/pull/8196) implements redirecting Parquet JUL logger via SLF4J, but it is currently applied only when READ operations occurs. If users use only WRITE operations, there occurs many Parquet logs. This PR makes the redirection work on WRITE operations, too. **Before** ```scala scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Jun 26, 2016 9:04:38 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: Compression: SNAPPY ............ about 70 lines Parquet Log ............. scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") ............ about 70 lines Parquet Log ............. ``` **After** ```scala scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") ``` This PR also fixes some typos. ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13918 from dongjoon-hyun/SPARK-16221.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala17
1 files changed, 12 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2cce3db9a6..80002d4204 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -129,6 +129,8 @@ private[sql] class ParquetFileFormat
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
}
+ ParquetFileFormat.redirectParquetLogs()
+
new OutputWriterFactory {
override def newInstance(
path: String,
@@ -468,9 +470,9 @@ private[sql] class ParquetOutputWriterFactory(
override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter {
// Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter
- private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
+ private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
private val hadoopAttemptContext = new TaskAttemptContextImpl(
- serializableConf.value, hadoopTaskAttempId)
+ serializableConf.value, hadoopTaskAttemptId)
// Instance of ParquetRecordWriter that does not use OutputCommitter
private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext)
@@ -505,7 +507,7 @@ private[sql] class ParquetOutputWriterFactory(
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
throw new UnsupportedOperationException(
- "this verison of newInstance not supported for " +
+ "this version of newInstance not supported for " +
"ParquetOutputWriterFactory")
}
}
@@ -665,7 +667,7 @@ private[sql] object ParquetFileFormat extends Logging {
Some(Try(DataType.fromJson(serializedSchema.get))
.recover { case _: Throwable =>
logInfo(
- s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
LegacyTypeStringParser.parse(serializedSchema.get)
}
@@ -880,7 +882,7 @@ private[sql] object ParquetFileFormat extends Logging {
Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover {
case _: Throwable =>
logInfo(
- s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
+ "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " +
"falling back to the deprecated DataType.fromCaseClassString parser.")
LegacyTypeStringParser.parse(schemaString).asInstanceOf[StructType]
}.recoverWith {
@@ -926,4 +928,9 @@ private[sql] object ParquetFileFormat extends Logging {
// should be removed after this issue is fixed.
}
}
+
+ /**
+ * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`.
+ */
+ def redirectParquetLogs(): Unit = {}
}