aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-31 22:23:38 -0700
committerReynold Xin <rxin@databricks.com>2016-10-31 22:23:38 -0700
commitd9d1465009fb40550467089ede315496552374c5 (patch)
tree47b2d98da776c7bb414676ffde825b34ee1a7ff2 /mllib
parent7d6c87155c740cf622c2c600a8ca64154d24c422 (diff)
downloadspark-d9d1465009fb40550467089ede315496552374c5.tar.gz
spark-d9d1465009fb40550467089ede315496552374c5.tar.bz2
spark-d9d1465009fb40550467089ede315496552374c5.zip
[SPARK-18024][SQL] Introduce an internal commit protocol API
## What changes were proposed in this pull request? This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits. ## How was this patch tested? Should be covered by existing write tests. Author: Reynold Xin <rxin@databricks.com> Author: Eric Liang <ekl@databricks.com> Closes #15707 from rxin/SPARK-18024-2.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala17
1 files changed, 7 insertions, 10 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 5e9e6ff1a5..cb3ca1b6c4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -41,17 +41,11 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
private[libsvm] class LibSVMOutputWriter(
- stagingDir: String,
- fileNamePrefix: String,
+ path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
- override val path: String = {
- val compressionExtension = TextOutputWriter.getCompressionExtension(context)
- new Path(stagingDir, fileNamePrefix + ".libsvm" + compressionExtension).toString
- }
-
private[this] val buffer = new Text()
private val recordWriter: RecordWriter[NullWritable, Text] = {
@@ -135,11 +129,14 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour
dataSchema: StructType): OutputWriterFactory = {
new OutputWriterFactory {
override def newInstance(
- stagingDir: String,
- fileNamePrefix: String,
+ path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new LibSVMOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
+ new LibSVMOutputWriter(path, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ ".libsvm" + TextOutputWriter.getCompressionExtension(context)
}
}
}