diff options
author | Nathan Howell <nhowell@godaddy.com> | 2016-12-01 21:40:49 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-12-01 21:40:49 -0800 |
commit | c82f16c15e0d4bfc54fb890a667d9164a088b5c6 (patch) | |
tree | 64d4c58b9c780e27135e33bfc9667abbc5d85ea0 /mllib | |
parent | d3c90b74edecc527ee468bead41d1cca0b667668 (diff) | |
download | spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.gz spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.bz2 spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.zip |
[SPARK-18658][SQL] Write text records directly to a FileOutputStream
## What changes were proposed in this pull request?
This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering.
The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric.
## How was this patch tested?
Existing unit tests.
Author: Nathan Howell <nhowell@godaddy.com>
Closes #16089 from NathanHowell/SPARK-18658.
Diffstat (limited to 'mllib')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 28 |
1 files changed, 8 insertions, 20 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index cb3ca1b6c4..b5aa7ce4e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -21,9 +21,7 @@ import java.io.IOException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{NullWritable, Text} -import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext import org.apache.spark.ml.feature.LabeledPoint @@ -35,7 +33,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.text.TextOutputWriter import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -46,30 +43,21 @@ private[libsvm] class LibSVMOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private[this] val buffer = new Text() - - private val recordWriter: RecordWriter[NullWritable, Text] = { - new TextOutputFormat[NullWritable, Text]() { - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - new Path(path) - } - }.getRecordWriter(context) - } + private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) override def write(row: Row): Unit = { val label = row.get(0) val vector = row.get(1).asInstanceOf[Vector] - val sb = new StringBuilder(label.toString) + writer.write(label.toString) vector.foreachActive { case (i, v) => - sb += ' ' - sb ++= s"${i + 1}:$v" + writer.write(s" ${i + 1}:$v") } - buffer.set(sb.mkString) - recordWriter.write(NullWritable.get(), buffer) + + writer.write('\n') } override def close(): Unit = { - recordWriter.close(context) + writer.close() } } @@ -136,7 +124,7 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour } override def getFileExtension(context: TaskAttemptContext): String = { - ".libsvm" + TextOutputWriter.getCompressionExtension(context) + ".libsvm" + CodecStreams.getCompressionExtension(context) } } } |