aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBill Chambers <bill@databricks.com>2016-05-11 17:42:13 -0700
committerAndrew Or <andrew@databricks.com>2016-05-11 17:42:13 -0700
commit603f4453a16825cc5773cfe24d6ae4cee5ec949a (patch)
tree4213331a044ee4881c130a8bed4d96fe1825662b
parentf14c4ba001fbdbcc9faa46896f1f9d08a7d06609 (diff)
downloadspark-603f4453a16825cc5773cfe24d6ae4cee5ec949a.tar.gz
spark-603f4453a16825cc5773cfe24d6ae4cee5ec949a.tar.bz2
spark-603f4453a16825cc5773cfe24d6ae4cee5ec949a.zip
[SPARK-15264][SPARK-15274][SQL] CSV Reader Error on Blank Column Names
## What changes were proposed in this pull request? When a CSV begins with: - `,,` OR - `"","",` meaning that the first column names are either empty or blank strings and `header` is specified to be `true`, then the column name is replaced with `C` + the index number of that given column. For example, if you were to read in the CSV: ``` "","second column" "hello", "there" ``` Then column names would become `"C0", "second column"`. This behavior aligns with what currently happens when `header` is specified to be `false` in recent versions of Spark. ### Current Behavior in Spark <=1.6 In Spark <=1.6, a CSV with a blank column name becomes a blank string, `""`, meaning that this column cannot be accessed. However the CSV reads in without issue. ### Current Behavior in Spark 2.0 Spark throws a NullPointerError and will not read in the file. #### Reproduction in 2.0 https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/346304/2828750690305044/484361/latest.html ## How was this patch tested? A new test was added to `CSVSuite` to account for this issue. We then have asserts that test for being able to select both the empty column names as well as the regular column names. Author: Bill Chambers <bill@databricks.com> Author: Bill Chambers <wchambers@ischool.berkeley.edu> Closes #13041 from anabranch/master.
-rw-r--r--python/pyspark/sql/readwriter.py2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala6
-rw-r--r--sql/core/src/test/resources/cars-blank-column-name.csv3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala16
4 files changed, 22 insertions, 5 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 7fd7583972..5cb186016e 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -358,7 +358,7 @@ class DataFrameReader(object):
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
- [('C0', 'string'), ('C1', 'string')]
+ [('_c0', 'string'), ('_c1', 'string')]
"""
if schema is not None:
self.schema(schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 948fac0d58..f47ed76cba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -61,9 +61,11 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine)
val header = if (csvOptions.headerFlag) {
- firstRow
+ firstRow.zipWithIndex.map { case (value, index) =>
+ if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value
+ }
} else {
- firstRow.zipWithIndex.map { case (value, index) => s"C$index" }
+ firstRow.zipWithIndex.map { case (value, index) => s"_c$index" }
}
val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
diff --git a/sql/core/src/test/resources/cars-blank-column-name.csv b/sql/core/src/test/resources/cars-blank-column-name.csv
new file mode 100644
index 0000000000..0b804b1614
--- /dev/null
+++ b/sql/core/src/test/resources/cars-blank-column-name.csv
@@ -0,0 +1,3 @@
+"",,make,customer,comment
+2012,"Tesla","S","bill","blank"
+2013,"Tesla","S","c","something"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b6cdc8cfab..ae91e0f606 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -38,6 +38,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
private val carsAltFile = "cars-alternative.csv"
private val carsUnbalancedQuotesFile = "cars-unbalanced-quotes.csv"
private val carsNullFile = "cars-null.csv"
+ private val carsBlankColName = "cars-blank-column-name.csv"
private val emptyFile = "empty.csv"
private val commentsFile = "comments.csv"
private val disableCommentsFile = "disable_comments.csv"
@@ -71,14 +72,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
if (withHeader) {
assert(df.schema.fieldNames === Array("year", "make", "model", "comment", "blank"))
} else {
- assert(df.schema.fieldNames === Array("C0", "C1", "C2", "C3", "C4"))
+ assert(df.schema.fieldNames === Array("_c0", "_c1", "_c2", "_c3", "_c4"))
}
}
if (checkValues) {
val yearValues = List("2012", "1997", "2015")
val actualYears = if (!withHeader) "year" :: yearValues else yearValues
- val years = if (withHeader) df.select("year").collect() else df.select("C0").collect()
+ val years = if (withHeader) df.select("year").collect() else df.select("_c0").collect()
years.zipWithIndex.foreach { case (year, index) =>
if (checkTypes) {
@@ -224,6 +225,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(cars.select("year").collect().size === 2)
}
+ test("test for blank column names on read and select columns") {
+ val cars = spark.read
+ .format("csv")
+ .options(Map("header" -> "true", "inferSchema" -> "true"))
+ .load(testFile(carsBlankColName))
+
+ assert(cars.select("customer").collect().size == 2)
+ assert(cars.select("_c0").collect().size == 2)
+ assert(cars.select("_c1").collect().size == 2)
+ }
+
test("test for FAILFAST parsing mode") {
val exception = intercept[SparkException]{
spark.read