aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorNathan Howell <nhowell@godaddy.com>2016-12-01 21:40:49 -0800
committerReynold Xin <rxin@databricks.com>2016-12-01 21:40:49 -0800
commitc82f16c15e0d4bfc54fb890a667d9164a088b5c6 (patch)
tree64d4c58b9c780e27135e33bfc9667abbc5d85ea0 /mllib
parentd3c90b74edecc527ee468bead41d1cca0b667668 (diff)
downloadspark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.gz
spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.bz2
spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.zip
[SPARK-18658][SQL] Write text records directly to a FileOutputStream
## What changes were proposed in this pull request? This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering. The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric. ## How was this patch tested? Existing unit tests. Author: Nathan Howell <nhowell@godaddy.com> Closes #16089 from NathanHowell/SPARK-18658.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala28
1 files changed, 8 insertions, 20 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index cb3ca1b6c4..b5aa7ce4e1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -21,9 +21,7 @@ import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.{NullWritable, Text}
-import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.TaskContext
import org.apache.spark.ml.feature.LabeledPoint
@@ -35,7 +33,6 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
@@ -46,30 +43,21 @@ private[libsvm] class LibSVMOutputWriter(
context: TaskAttemptContext)
extends OutputWriter {
- private[this] val buffer = new Text()
-
- private val recordWriter: RecordWriter[NullWritable, Text] = {
- new TextOutputFormat[NullWritable, Text]() {
- override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- new Path(path)
- }
- }.getRecordWriter(context)
- }
+ private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
override def write(row: Row): Unit = {
val label = row.get(0)
val vector = row.get(1).asInstanceOf[Vector]
- val sb = new StringBuilder(label.toString)
+ writer.write(label.toString)
vector.foreachActive { case (i, v) =>
- sb += ' '
- sb ++= s"${i + 1}:$v"
+ writer.write(s" ${i + 1}:$v")
}
- buffer.set(sb.mkString)
- recordWriter.write(NullWritable.get(), buffer)
+
+ writer.write('\n')
}
override def close(): Unit = {
- recordWriter.close(context)
+ writer.close()
}
}
@@ -136,7 +124,7 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour
}
override def getFileExtension(context: TaskAttemptContext): String = {
- ".libsvm" + TextOutputWriter.getCompressionExtension(context)
+ ".libsvm" + CodecStreams.getCompressionExtension(context)
}
}
}