diff options
author | Reynold Xin <rxin@databricks.com> | 2016-10-21 17:27:18 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-10-21 17:27:18 -0700 |
commit | 3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7 (patch) | |
tree | 48eff0c683b5930b7117ade0987da64a00593919 /mllib | |
parent | c9720b2195a465653690b3e221ce789142217b0d (diff) | |
download | spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.gz spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.bz2 spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.zip |
[SPARK-18042][SQL] OutputWriter should expose file path written
## What changes were proposed in this pull request?
This patch adds a new "path" method on OutputWriter that returns the path of the file written by the OutputWriter. This is part of the necessary work to consolidate structured streaming and batch write paths.
The batch write path has a nice feature that each data source can define the extension of the files, and allow Spark to specify the staging directory and the prefix for the files. However, in the streaming path we need to collect the list of files written, and there is no interface right now to do that.
## How was this patch tested?
N/A - there is no behavior change and this should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #15580 from rxin/SPARK-18042.
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index fff86686b5..5e9e6ff1a5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextOutputWriter import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -46,12 +47,17 @@ private[libsvm] class LibSVMOutputWriter( context: TaskAttemptContext) extends OutputWriter { + override val path: String = { + val compressionExtension = TextOutputWriter.getCompressionExtension(context) + new Path(stagingDir, fileNamePrefix + ".libsvm" + compressionExtension).toString + } + private[this] val buffer = new Text() private val recordWriter: RecordWriter[NullWritable, Text] = { new TextOutputFormat[NullWritable, Text]() { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - new Path(stagingDir, fileNamePrefix + extension) + new Path(path) } }.getRecordWriter(context) } |