aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-21 17:27:18 -0700
committerReynold Xin <rxin@databricks.com>2016-10-21 17:27:18 -0700
commit3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7 (patch)
tree48eff0c683b5930b7117ade0987da64a00593919 /mllib
parentc9720b2195a465653690b3e221ce789142217b0d (diff)
downloadspark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.gz
spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.tar.bz2
spark-3fbf5a58c236fc5d5fee39cb29e7f5c7e01c0ee7.zip
[SPARK-18042][SQL] OutputWriter should expose file path written
## What changes were proposed in this pull request? This patch adds a new "path" method on OutputWriter that returns the path of the file written by the OutputWriter. This is part of the necessary work to consolidate structured streaming and batch write paths. The batch write path has a nice feature that each data source can define the extension of the files, and allow Spark to specify the staging directory and the prefix for the files. However, in the streaming path we need to collect the list of files written, and there is no interface right now to do that. ## How was this patch tested? N/A - there is no behavior change and this should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #15580 from rxin/SPARK-18042.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala8
1 files changed, 7 insertions, 1 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index fff86686b5..5e9e6ff1a5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -46,12 +47,17 @@ private[libsvm] class LibSVMOutputWriter(
context: TaskAttemptContext)
extends OutputWriter {
+ override val path: String = {
+ val compressionExtension = TextOutputWriter.getCompressionExtension(context)
+ new Path(stagingDir, fileNamePrefix + ".libsvm" + compressionExtension).toString
+ }
+
private[this] val buffer = new Text()
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- new Path(stagingDir, fileNamePrefix + extension)
+ new Path(path)
}
}.getRecordWriter(context)
}