aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-06-21 21:58:38 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-21 21:58:38 -0700
commit7580f3041a1a3757a0b14b9d8afeb720f261fff6 (patch)
treea4da4d014c9a95d4f57e82e9f8081e2a21eedd44 /sql
parentd77c4e6e2eef24f4276c38b3add8c29bb885f4db (diff)
downloadspark-7580f3041a1a3757a0b14b9d8afeb720f261fff6.tar.gz
spark-7580f3041a1a3757a0b14b9d8afeb720f261fff6.tar.bz2
spark-7580f3041a1a3757a0b14b9d8afeb720f261fff6.zip
[SPARK-16104] [SQL] Do not creaate CSV writer object for every flush when writing
## What changes were proposed in this pull request? This PR let `CsvWriter` object is not created for each time but able to be reused. This way was taken after from JSON data source. Original `CsvWriter` was being created for each row but it was enhanced in https://github.com/apache/spark/pull/13229. However, it still creates `CsvWriter` object for each `flush()` in `LineCsvWriter`. It seems it does not have to close the object and re-create this for every flush. It follows the original logic as it is but `CsvWriter` is reused by reseting `CharArrayWriter`. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #13809 from HyukjinKwon/write-perf.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala1
2 files changed, 10 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
index b06f12369d..2103262580 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala
@@ -17,8 +17,7 @@
package org.apache.spark.sql.execution.datasources.csv
-import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader}
-import java.nio.charset.StandardCharsets
+import java.io.{CharArrayWriter, StringReader}
import com.univocity.parsers.csv._
@@ -77,10 +76,8 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten
writerSettings.setHeaders(headers: _*)
writerSettings.setQuoteEscapingEnabled(params.escapeQuotes)
- private var buffer = new ByteArrayOutputStream()
- private var writer = new CsvWriter(
- new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
- writerSettings)
+ private val buffer = new CharArrayWriter()
+ private val writer = new CsvWriter(buffer, writerSettings)
def writeRow(row: Seq[String], includeHeader: Boolean): Unit = {
if (includeHeader) {
@@ -90,14 +87,15 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten
}
def flush(): String = {
- writer.close()
+ writer.flush()
val lines = buffer.toString.stripLineEnd
- buffer = new ByteArrayOutputStream()
- writer = new CsvWriter(
- new OutputStreamWriter(buffer, StandardCharsets.UTF_8),
- writerSettings)
+ buffer.reset()
lines
}
+
+ def close(): Unit = {
+ writer.close()
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 083ac3350e..e8c0134d38 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -223,6 +223,7 @@ private[sql] class CsvOutputWriter(
override def close(): Unit = {
flush()
+ csvWriter.close()
recordWriter.close(context)
}
}