aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-02-28 13:34:33 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-28 13:34:33 -0800
commit7e5359be5ca038fdb579712b18e7f226d705c276 (patch)
tree6fca55568b53c2ded63bcbf846a8463ffcafc92a /python/pyspark/sql/tests.py
parentce233f18e381fa1ea00be74ca26e97d35baa6c9c (diff)
downloadspark-7e5359be5ca038fdb579712b18e7f226d705c276.tar.gz
spark-7e5359be5ca038fdb579712b18e7f226d705c276.tar.bz2
spark-7e5359be5ca038fdb579712b18e7f226d705c276.zip
[SPARK-19610][SQL] Support parsing multiline CSV files
## What changes were proposed in this pull request? This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file). So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory. ## How was this patch tested? Unit tests in `CSVSuite` and `tests.py` Manual tests with a single 9GB CSV file in local file system, for example, ```scala spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count() ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16976 from HyukjinKwon/SPARK-19610.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py9
1 files changed, 8 insertions, 1 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fd083e4868..e943f8da3d 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -437,12 +437,19 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(res.collect(), [Row(id=0, copy=0)])
def test_wholefile_json(self):
- from pyspark.sql.types import StringType
people1 = self.spark.read.json("python/test_support/sql/people.json")
people_array = self.spark.read.json("python/test_support/sql/people_array.json",
wholeFile=True)
self.assertEqual(people1.collect(), people_array.collect())
+ def test_wholefile_csv(self):
+ ages_newlines = self.spark.read.csv(
+ "python/test_support/sql/ages_newlines.csv", wholeFile=True)
+ expected = [Row(_c0=u'Joe', _c1=u'20', _c2=u'Hi,\nI am Jeo'),
+ Row(_c0=u'Tom', _c1=u'30', _c2=u'My name is Tom'),
+ Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
+ self.assertEqual(ages_newlines.collect(), expected)
+
def test_udf_with_input_file_name(self):
from pyspark.sql.functions import udf, input_file_name
from pyspark.sql.types import StringType