1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.parquet.Log
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
import org.apache.parquet.hadoop.util.ContextUtil
/**
* An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
* like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
* destination folder. This can be useful for data stored in S3, where directory operations are
* relatively expensive.
*
* To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
* property via Hadoop [[Configuration]]. Not that this property overrides
* "spark.sql.sources.outputCommitterClass".
*
* *NOTE*
*
* NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left empty).
*/
private[datasources] class DirectParquetOutputCommitter(
outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
override def getWorkPath: Path = outputPath
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
override def setupJob(jobContext: JobContext): Unit = {}
override def setupTask(taskContext: TaskAttemptContext): Unit = {}
override def commitJob(jobContext: JobContext) {
val configuration = ContextUtil.getConfiguration(jobContext)
val fileSystem = outputPath.getFileSystem(configuration)
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
try {
val outputStatus = fileSystem.getFileStatus(outputPath)
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
} catch { case e: Exception =>
LOG.warn("could not write summary file for " + outputPath, e)
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true)
}
}
} catch {
case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
}
}
if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
try {
val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
fileSystem.create(successPath).close()
} catch {
case e: Exception => LOG.warn("could not write success file for " + outputPath, e)
}
}
}
}
|