aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-02-29 09:44:29 -0800
committerReynold Xin <rxin@databricks.com>2016-02-29 09:44:29 -0800
commit02aa499dfb71bc9571bebb79e6383842e4f48143 (patch)
treeae7b44767db0fb5b7c19a03c0de05aa4909c4b8e
parent916fc34f98dd731f607d9b3ed657bad6cc30df2c (diff)
downloadspark-02aa499dfb71bc9571bebb79e6383842e4f48143.tar.gz
spark-02aa499dfb71bc9571bebb79e6383842e4f48143.tar.bz2
spark-02aa499dfb71bc9571bebb79e6383842e4f48143.zip
[SPARK-13509][SPARK-13507][SQL] Support for writing CSV with a single function call
https://issues.apache.org/jira/browse/SPARK-13507 https://issues.apache.org/jira/browse/SPARK-13509 ## What changes were proposed in this pull request? This PR adds the support to write CSV data directly by a single call to the given path. Several unitests were added for each functionality. ## How was this patch tested? This was tested with unittests and with `dev/run_tests` for coding style Author: hyukjinkwon <gurwls223@gmail.com> Author: Hyukjin Kwon <gurwls223@gmail.com> Closes #11389 from HyukjinKwon/SPARK-13507-13509.
-rw-r--r--python/pyspark/sql/readwriter.py50
-rw-r--r--python/test_support/sql/ages.csv4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala3
6 files changed, 80 insertions, 10 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index b1453c637f..7f5368d8bd 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -233,6 +233,23 @@ class DataFrameReader(object):
paths = [paths]
return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
+ @since(2.0)
+ def csv(self, paths):
+ """Loads a CSV file and returns the result as a [[DataFrame]].
+
+ This function goes through the input once to determine the input schema. To avoid going
+ through the entire data once, specify the schema explicitly using [[schema]].
+
+ :param paths: string, or list of strings, for input path(s).
+
+ >>> df = sqlContext.read.csv('python/test_support/sql/ages.csv')
+ >>> df.dtypes
+ [('C0', 'string'), ('C1', 'string')]
+ """
+ if isinstance(paths, basestring):
+ paths = [paths]
+ return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
+
@since(1.5)
def orc(self, path):
"""Loads an ORC file, returning the result as a :class:`DataFrame`.
@@ -448,6 +465,11 @@ class DataFrameWriter(object):
* ``ignore``: Silently ignore this operation if data already exists.
* ``error`` (default case): Throw an exception if data already exists.
+ You can set the following JSON-specific option(s) for writing JSON files:
+ * ``compression`` (default ``None``): compression codec to use when saving to file.
+ This can be one of the known case-insensitive shorten names
+ (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
+
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)._jwrite.json(path)
@@ -476,11 +498,39 @@ class DataFrameWriter(object):
def text(self, path):
"""Saves the content of the DataFrame in a text file at the specified path.
+ :param path: the path in any Hadoop supported file system
+
The DataFrame must have only one column that is of string type.
Each row becomes a new line in the output file.
+
+ You can set the following option(s) for writing text files:
+ * ``compression`` (default ``None``): compression codec to use when saving to file.
+ This can be one of the known case-insensitive shorten names
+ (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
"""
self._jwrite.text(path)
+ @since(2.0)
+ def csv(self, path, mode=None):
+ """Saves the content of the [[DataFrame]] in CSV format at the specified path.
+
+ :param path: the path in any Hadoop supported file system
+ :param mode: specifies the behavior of the save operation when data already exists.
+
+ * ``append``: Append contents of this :class:`DataFrame` to existing data.
+ * ``overwrite``: Overwrite existing data.
+ * ``ignore``: Silently ignore this operation if data already exists.
+ * ``error`` (default case): Throw an exception if data already exists.
+
+ You can set the following CSV-specific option(s) for writing CSV files:
+ * ``compression`` (default ``None``): compression codec to use when saving to file.
+ This can be one of the known case-insensitive shorten names
+ (``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
+
+ >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
+ """
+ self.mode(mode)._jwrite.csv(path)
+
@since(1.5)
def orc(self, path, mode=None, partitionBy=None):
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.
diff --git a/python/test_support/sql/ages.csv b/python/test_support/sql/ages.csv
new file mode 100644
index 0000000000..18991feda7
--- /dev/null
+++ b/python/test_support/sql/ages.csv
@@ -0,0 +1,4 @@
+Joe,20
+Tom,30
+Hyukjin,25
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index d6bdd3d825..093504c765 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -453,6 +453,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* format("json").save(path)
* }}}
*
+ * You can set the following JSON-specific option(s) for writing JSON files:
+ * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
+ * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
+ *
* @since 1.4.0
*/
def json(path: String): Unit = format("json").save(path)
@@ -492,10 +496,29 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* df.write().text("/path/to/output")
* }}}
*
+ * You can set the following option(s) for writing text files:
+ * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
+ * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
+ *
* @since 1.6.0
*/
def text(path: String): Unit = format("text").save(path)
+ /**
+ * Saves the content of the [[DataFrame]] in CSV format at the specified path.
+ * This is equivalent to:
+ * {{{
+ * format("csv").save(path)
+ * }}}
+ *
+ * You can set the following CSV-specific option(s) for writing CSV files:
+ * <li>`compression` (default `null`): compression codec to use when saving to file. This can be
+ * one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
+ *
+ * @since 2.0.0
+ */
+ def csv(path: String): Unit = format("csv").save(path)
+
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 31a95ed461..e59dbd6b3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -48,10 +48,7 @@ private[sql] class JSONOptions(
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
- val compressionCodec = {
- val name = parameters.get("compression").orElse(parameters.get("codec"))
- name.map(CompressionCodecs.getCodecClassName)
- }
+ val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index 60155b3234..8f3f6335e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -115,10 +115,7 @@ private[sql] class TextRelation(
/** Write path. */
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = job.getConfiguration
- val compressionCodec = {
- val name = parameters.get("compression").orElse(parameters.get("codec"))
- name.map(CompressionCodecs.getCodecClassName)
- }
+ val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 5d57d77ab0..3ecbb14f2e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -268,9 +268,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.load(testFile(carsFile))
cars.coalesce(1).write
- .format("csv")
.option("header", "true")
- .save(csvDir)
+ .csv(csvDir)
val carsCopy = sqlContext.read
.format("csv")