aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
diff options
context:
space:
mode:
authorNathan Howell <nhowell@godaddy.com>2016-12-01 21:40:49 -0800
committerReynold Xin <rxin@databricks.com>2016-12-01 21:40:49 -0800
commitc82f16c15e0d4bfc54fb890a667d9164a088b5c6 (patch)
tree64d4c58b9c780e27135e33bfc9667abbc5d85ea0 /sql/hive/src/test
parentd3c90b74edecc527ee468bead41d1cca0b667668 (diff)
downloadspark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.gz
spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.tar.bz2
spark-c82f16c15e0d4bfc54fb890a667d9164a088b5c6.zip
[SPARK-18658][SQL] Write text records directly to a FileOutputStream
## What changes were proposed in this pull request? This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering. The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric. ## How was this patch tested? Existing unit tests. Author: Nathan Howell <nhowell@godaddy.com> Closes #16089 from NathanHowell/SPARK-18658.
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala27
1 files changed, 6 insertions, 21 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index cecfd99098..5fdf615259 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -17,14 +17,9 @@
package org.apache.spark.sql.sources
-import java.text.NumberFormat
-import java.util.Locale
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.{NullWritable, Text}
-import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.sql.{sources, Row, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
@@ -125,29 +120,19 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
class SimpleTextOutputWriter(path: String, context: TaskAttemptContext)
extends OutputWriter {
- private val recordWriter: RecordWriter[NullWritable, Text] =
- new AppendingTextOutputFormat(path).getRecordWriter(context)
+ private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
override def write(row: Row): Unit = {
val serialized = row.toSeq.map { v =>
if (v == null) "" else v.toString
}.mkString(",")
- recordWriter.write(null, new Text(serialized))
- }
- override def close(): Unit = {
- recordWriter.close(context)
+ writer.write(serialized)
+ writer.write('\n')
}
-}
-class AppendingTextOutputFormat(path: String) extends TextOutputFormat[NullWritable, Text] {
-
- val numberFormat = NumberFormat.getInstance(Locale.US)
- numberFormat.setMinimumIntegerDigits(5)
- numberFormat.setGroupingUsed(false)
-
- override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- new Path(path)
+ override def close(): Unit = {
+ writer.close()
}
}