diff options
author | Reynold Xin <rxin@databricks.com> | 2016-10-31 22:23:38 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-10-31 22:23:38 -0700 |
commit | d9d1465009fb40550467089ede315496552374c5 (patch) | |
tree | 47b2d98da776c7bb414676ffde825b34ee1a7ff2 /mllib | |
parent | 7d6c87155c740cf622c2c600a8ca64154d24c422 (diff) | |
download | spark-d9d1465009fb40550467089ede315496552374c5.tar.gz spark-d9d1465009fb40550467089ede315496552374c5.tar.bz2 spark-d9d1465009fb40550467089ede315496552374c5.zip |
[SPARK-18024][SQL] Introduce an internal commit protocol API
## What changes were proposed in this pull request?
This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits.
## How was this patch tested?
Should be covered by existing write tests.
Author: Reynold Xin <rxin@databricks.com>
Author: Eric Liang <ekl@databricks.com>
Closes #15707 from rxin/SPARK-18024-2.
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 5e9e6ff1a5..cb3ca1b6c4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -41,17 +41,11 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration private[libsvm] class LibSVMOutputWriter( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext) extends OutputWriter { - override val path: String = { - val compressionExtension = TextOutputWriter.getCompressionExtension(context) - new Path(stagingDir, fileNamePrefix + ".libsvm" + compressionExtension).toString - } - private[this] val buffer = new Text() private val recordWriter: RecordWriter[NullWritable, Text] = { @@ -135,11 +129,14 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour dataSchema: StructType): OutputWriterFactory = { new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new LibSVMOutputWriter(stagingDir, fileNamePrefix, dataSchema, context) + new LibSVMOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + ".libsvm" + TextOutputWriter.getCompressionExtension(context) } } } |