From c82f16c15e0d4bfc54fb890a667d9164a088b5c6 Mon Sep 17 00:00:00 2001 From: Nathan Howell Date: Thu, 1 Dec 2016 21:40:49 -0800 Subject: [SPARK-18658][SQL] Write text records directly to a FileOutputStream ## What changes were proposed in this pull request? This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering. The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric. ## How was this patch tested? Existing unit tests. Author: Nathan Howell Closes #16089 from NathanHowell/SPARK-18658. --- .../spark/sql/sources/SimpleTextRelation.scala | 27 +++++----------------- 1 file changed, 6 insertions(+), 21 deletions(-) (limited to 'sql/hive/src/test') diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index cecfd99098..5fdf615259 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -17,14 +17,9 @@ package org.apache.spark.sql.sources -import java.text.NumberFormat -import java.util.Locale - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{NullWritable, Text} -import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.sql.{sources, Row, SparkSession} import org.apache.spark.sql.catalyst.{expressions, InternalRow} @@ -125,29 +120,19 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { - private val recordWriter: RecordWriter[NullWritable, Text] = - new AppendingTextOutputFormat(path).getRecordWriter(context) + private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path)) override def write(row: Row): Unit = { val serialized = row.toSeq.map { v => if (v == null) "" else v.toString }.mkString(",") - recordWriter.write(null, new Text(serialized)) - } - override def close(): Unit = { - recordWriter.close(context) + writer.write(serialized) + writer.write('\n') } -} -class AppendingTextOutputFormat(path: String) extends TextOutputFormat[NullWritable, Text] { - - val numberFormat = NumberFormat.getInstance(Locale.US) - numberFormat.setMinimumIntegerDigits(5) - numberFormat.setGroupingUsed(false) - - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - new Path(path) + override def close(): Unit = { + writer.close() } } -- cgit v1.2.3