aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-21 17:27:18 -0700
committerReynold Xin <rxin@databricks.com>2016-10-21 17:27:18 -0700
commit3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7 (patch)
tree48eff0c683b5930b7117ade0987da64a00593919 /sql/hive/src/main
parentc9720b2195a465653690b3e221ce789142217b0d (diff)
downloadspark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.gz
spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.bz2
spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.zip
[SPARK-18042][SQL] OutputWriter should expose file path written
## What changes were proposed in this pull request? This patch adds a new "path" method on OutputWriter that returns the path of the file written by the OutputWriter. This is part of the necessary work to consolidate structured streaming and batch write paths. The batch write path has a nice feature that each data source can define the extension of the files, and allow Spark to specify the staging directory and the prefix for the files. However, in the streaming path we need to collect the list of files written, and there is no interface right now to do that. ## How was this patch tested? N/A - there is no behavior change and this should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #15580 from rxin/SPARK-18042.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala29
1 files changed, 14 insertions, 15 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 1ceacb458a..eba7aa386a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -216,9 +216,18 @@ private[orc] class OrcOutputWriter(
context: TaskAttemptContext)
extends OutputWriter {
- private[this] val conf = context.getConfiguration
+ override val path: String = {
+ val compressionExtension: String = {
+ val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION)
+ OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
+ }
+ // It has the `.orc` extension at the end because (de)compression tools
+ // such as gunzip would not be able to decompress this as the compression
+ // is not applied on this whole file but on each "stream" in ORC format.
+ new Path(stagingDir, fileNamePrefix + compressionExtension + ".orc").toString
+ }
- private[this] val serializer = new OrcSerializer(dataSchema, conf)
+ private[this] val serializer = new OrcSerializer(dataSchema, context.getConfiguration)
// `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this
// flag to decide whether `OrcRecordWriter.close()` needs to be called.
@@ -226,20 +235,10 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
-
- val compressionExtension = {
- val name = conf.get(OrcRelation.ORC_COMPRESSION)
- OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
- }
- // It has the `.orc` extension at the end because (de)compression tools
- // such as gunzip would not be able to decompress this as the compression
- // is not applied on this whole file but on each "stream" in ORC format.
- val filename = s"$fileNamePrefix$compressionExtension.orc"
-
new OrcOutputFormat().getRecordWriter(
- new Path(stagingDir, filename).getFileSystem(conf),
- conf.asInstanceOf[JobConf],
- new Path(stagingDir, filename).toString,
+ new Path(path).getFileSystem(context.getConfiguration),
+ context.getConfiguration.asInstanceOf[JobConf],
+ path,
Reporter.NULL
).asInstanceOf[RecordWriter[NullWritable, Writable]]
}