diff options
author | Reynold Xin <rxin@databricks.com> | 2016-10-21 17:27:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-10-21 17:27:18 -0700 |
commit | 3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7 (patch) | |
tree | 48eff0c683b5930b7117ade0987da64a00593919 /sql/hive/src/main | |
parent | c9720b2195a465653690b3e221ce789142217b0d (diff) | |
download | spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.gz spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.bz2 spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.zip |
[SPARK-18042][SQL] OutputWriter should expose file path written
## What changes were proposed in this pull request?
This patch adds a new "path" method on OutputWriter that returns the path of the file written by the OutputWriter. This is part of the necessary work to consolidate structured streaming and batch write paths.
The batch write path has a nice feature that each data source can define the extension of the files, and allow Spark to specify the staging directory and the prefix for the files. However, in the streaming path we need to collect the list of files written, and there is no interface right now to do that.
## How was this patch tested?
N/A - there is no behavior change and this should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #15580 from rxin/SPARK-18042.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 29 |
1 files changed, 14 insertions, 15 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 1ceacb458a..eba7aa386a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -216,9 +216,18 @@ private[orc] class OrcOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private[this] val conf = context.getConfiguration + override val path: String = { + val compressionExtension: String = { + val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION) + OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") + } + // It has the `.orc` extension at the end because (de)compression tools + // such as gunzip would not be able to decompress this as the compression + // is not applied on this whole file but on each "stream" in ORC format. + new Path(stagingDir, fileNamePrefix + compressionExtension + ".orc").toString + } - private[this] val serializer = new OrcSerializer(dataSchema, conf) + private[this] val serializer = new OrcSerializer(dataSchema, context.getConfiguration) // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this // flag to decide whether `OrcRecordWriter.close()` needs to be called. @@ -226,20 +235,10 @@ private[orc] class OrcOutputWriter( private lazy val recordWriter: RecordWriter[NullWritable, Writable] = { recordWriterInstantiated = true - - val compressionExtension = { - val name = conf.get(OrcRelation.ORC_COMPRESSION) - OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") - } - // It has the `.orc` extension at the end because (de)compression tools - // such as gunzip would not be able to decompress this as the compression - // is not applied on this whole file but on each "stream" in ORC format. - val filename = s"$fileNamePrefix$compressionExtension.orc" - new OrcOutputFormat().getRecordWriter( - new Path(stagingDir, filename).getFileSystem(conf), - conf.asInstanceOf[JobConf], - new Path(stagingDir, filename).toString, + new Path(path).getFileSystem(context.getConfiguration), + context.getConfiguration.asInstanceOf[JobConf], + path, Reporter.NULL ).asInstanceOf[RecordWriter[NullWritable, Writable]] } |